diff --git a/trunk/src/app/srs_app_dtls.cpp b/trunk/src/app/srs_app_dtls.cpp index 5ecac847f..2eb32ef6e 100644 --- a/trunk/src/app/srs_app_dtls.cpp +++ b/trunk/src/app/srs_app_dtls.cpp @@ -46,14 +46,6 @@ SrsDtls::~SrsDtls() SSL_CTX_free(dtls_ctx); } -SrsDtls* SrsDtls::instance() -{ - if (!_instance) { - _instance = new SrsDtls(); - } - return _instance; -} - // The return value of verify_callback controls the strategy of the further verification process. If verify_callback // returns 0, the verification process is immediately stopped with "verification failed" state. If SSL_VERIFY_PEER is // set, a verification failure alert is sent to the peer and the TLS/SSL handshake is terminated. If verify_callback @@ -256,3 +248,22 @@ srs_error_t SrsDtls::init(SrsRequest* r) return err; } + +SrsDtls* SrsDtls::instance() +{ + if (!_instance) { + _instance = new SrsDtls(); + } + return _instance; +} + +SSL_CTX* SrsDtls::get_dtls_ctx() +{ + return dtls_ctx; +} + +std::string SrsDtls::get_fingerprint() const +{ + return fingerprint; +} + diff --git a/trunk/src/app/srs_app_dtls.hpp b/trunk/src/app/srs_app_dtls.hpp index 560386f77..25cc80b4c 100644 --- a/trunk/src/app/srs_app_dtls.hpp +++ b/trunk/src/app/srs_app_dtls.hpp @@ -46,9 +46,9 @@ public: srs_error_t init(SrsRequest* r); public: static SrsDtls* instance(); - SSL_CTX* get_dtls_ctx() { return dtls_ctx; } + SSL_CTX* get_dtls_ctx(); public: - std::string get_fingerprint() const { return fingerprint; } + std::string get_fingerprint() const; }; #endif diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 72d62fffe..b5fab94bd 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -261,23 +261,6 @@ SrsUdpMuxSocket::~SrsUdpMuxSocket() srs_freepa(buf); } -SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() -{ - SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd); - - // Don't copy buffer - srs_freepa(sendonly->buf); - sendonly->nb_buf = 0; - sendonly->nread = 0; - sendonly->lfd = lfd; - sendonly->from = from; - sendonly->fromlen = fromlen; - sendonly->peer_ip = peer_ip; - sendonly->peer_port = peer_port; - - return sendonly; -} - int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) { fromlen = sizeof(from); @@ -341,6 +324,26 @@ socklen_t SrsUdpMuxSocket::peer_addrlen() return (socklen_t)fromlen; } +char* SrsUdpMuxSocket::data() +{ + return buf; +} + +int SrsUdpMuxSocket::size() +{ + return nread; +} + +std::string SrsUdpMuxSocket::get_peer_ip() const +{ + return peer_ip; +} + +int SrsUdpMuxSocket::get_peer_port() const +{ + return peer_port; +} + std::string SrsUdpMuxSocket::get_peer_id() { char id_buf[1024]; @@ -349,6 +352,28 @@ std::string SrsUdpMuxSocket::get_peer_id() return string(id_buf, len); } +SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() +{ + SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd); + + // Don't copy buffer + srs_freepa(sendonly->buf); + sendonly->nb_buf = 0; + sendonly->nread = 0; + sendonly->lfd = lfd; + sendonly->from = from; + sendonly->fromlen = fromlen; + sendonly->peer_ip = peer_ip; + sendonly->peer_port = peer_port; + + return sendonly; +} + +ISrsUdpSender* SrsUdpMuxSocket::sender() +{ + return handler; +} + SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p) { handler = h; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index cc20cc756..46fd11a7e 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -163,26 +163,19 @@ private: public: SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd); virtual ~SrsUdpMuxSocket(); - +public: int recvfrom(srs_utime_t timeout); srs_error_t sendto(void* data, int size, srs_utime_t timeout); - srs_netfd_t stfd(); sockaddr_in* peer_addr(); socklen_t peer_addrlen(); - - char* data() { return buf; } - int size() { return nread; } - std::string get_peer_ip() const { return peer_ip; } - int get_peer_port() const { return peer_port; } + char* data(); + int size(); + std::string get_peer_ip() const; + int get_peer_port() const; std::string get_peer_id(); -public: SrsUdpMuxSocket* copy_sendonly(); - ISrsUdpSender* sender() { return handler; }; -private: - // Don't allow copy, user copy_sendonly instead - SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs); - SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs); + ISrsUdpSender* sender(); }; class SrsUdpMuxListener : public ISrsCoroutineHandler diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 686ccbb88..177e070aa 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -3396,26 +3396,6 @@ srs_error_t SrsRtcServer::create_rtc_session( return err; } -SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) -{ - map::iterator iter = map_id_session.find(peer_id); - if (iter == map_id_session.end()) { - return NULL; - } - - return iter->second; -} - -SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username) -{ - map::iterator iter = map_username_session.find(username); - if (iter == map_username_session.end()) { - return NULL; - } - - return iter->second; -} - bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* rtc_session) { return map_id_session.insert(make_pair(peer_id, rtc_session)).second; @@ -3447,6 +3427,31 @@ void SrsRtcServer::check_and_clean_timeout_session() } } +int SrsRtcServer::nn_sessions() +{ + return (int)map_username_session.size(); +} + +SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) +{ + map::iterator iter = map_id_session.find(peer_id); + if (iter == map_id_session.end()) { + return NULL; + } + + return iter->second; +} + +SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username) +{ + map::iterator iter = map_username_session.find(username); + if (iter == map_username_session.end()) { + return NULL; + } + + return iter->second; +} + srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick) { check_and_clean_timeout_session(); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 65e959e32..a5598e28f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -444,7 +444,7 @@ public: ); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); - int nn_sessions() { return (int)map_username_session.size(); } + int nn_sessions(); private: SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index ad399ea4a..c7f1a0473 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -150,6 +150,21 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer() srs_freepa(queue_); } +uint16_t SrsRtpRingBuffer::low() +{ + return low_; +} + +uint16_t SrsRtpRingBuffer::high() +{ + return high_; +} + +void SrsRtpRingBuffer::advance_to(uint16_t seq) +{ + low_ = seq; +} + void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) { SrsRtpPacket2* p = queue_[at % capacity_]; @@ -161,6 +176,11 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) queue_[at % capacity_] = pkt; } +void SrsRtpRingBuffer::remove(uint16_t at) +{ + set(at, NULL); +} + void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) { for (uint16_t s = low; s != high; ++s) { @@ -168,6 +188,16 @@ void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) } } +bool SrsRtpRingBuffer::overflow() +{ + return high_ - low_ < capacity_; +} + +bool SrsRtpRingBuffer::is_heavy() +{ + return high_ - low_ >= capacity_ / 2; +} + uint16_t SrsRtpRingBuffer::next_start_of_frame() { if (low_ == high_) { @@ -245,6 +275,11 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui } } +SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) +{ + return queue_[seq % capacity_]; +} + SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) { nn_collected_frames = 0; @@ -396,6 +431,11 @@ void SrsRtpQueue::notify_nack_list_full() queue_->advance_to(next + 1); } +void SrsRtpQueue::request_keyframe() +{ + request_key_frame_ = true; +} + uint32_t SrsRtpQueue::get_extended_highest_sequence() { return queue_->get_extended_highest_sequence(); @@ -450,7 +490,9 @@ void SrsRtpQueue::collect_packet() { while (queue_->low() != queue_->high()) { vector frame; - for (uint16_t s = queue_->low(); s != queue_->high(); ++s) { + + uint16_t s = queue_->low(); + for (; s != queue_->high(); ++s) { SrsRtpPacket2* pkt = queue_->at(s); // In NACK, never collect frame. @@ -468,14 +510,17 @@ void SrsRtpQueue::collect_packet() frame.push_back(pkt); // Not the last packet, continue to process next one. - if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { + if (!pkt->rtp_header.get_marker() && !one_packet_per_frame_) { continue; } // Done, we got the last packet of frame. nn_collected_frames++; frames_.push_back(frame); + break; + } + if (queue_->low() != s) { // Reset the range of packets to NULL in buffer. queue_->reset(queue_->low(), s); diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 2e948d88e..523bd9f0f 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -133,17 +133,17 @@ public: virtual ~SrsRtpRingBuffer(); public: // Move the position of buffer. - uint16_t low() { return low_; } - uint16_t high() { return high_; } - void advance_to(uint16_t seq) { low_ = seq; } + uint16_t low(); + uint16_t high(); + void advance_to(uint16_t seq); // Free the packet at position. void set(uint16_t at, SrsRtpPacket2* pkt); - void remove(uint16_t at) { set(at, NULL); } + void remove(uint16_t at); // Directly reset range [low, high] to NULL. void reset(uint16_t low, uint16_t high); // Whether queue overflow or heavy(too many packets and need clear). - bool overflow() { return high_ - low_ < capacity_; } - bool is_heavy() { return high_ - low_ >= capacity_ / 2; } + bool overflow(); + bool is_heavy(); // Get the next start packet of frame. // @remark If not found, return the low_, which should never be the "next" one, // because it MAY or NOT current start packet of frame but never be the next. @@ -156,7 +156,7 @@ public: // Update the sequence, got the nack range by [low, high]. void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high); // Get the packet by seq. - SrsRtpPacket2* at(uint16_t seq) { return queue_[seq % capacity_]; } + SrsRtpPacket2* at(uint16_t seq); }; class SrsRtpQueue @@ -186,7 +186,7 @@ public: bool should_request_key_frame(); void notify_drop_seq(uint16_t seq); void notify_nack_list_full(); - void request_keyframe() { request_key_frame_ = true; } + void request_keyframe(); public: uint32_t get_extended_highest_sequence(); uint8_t get_fraction_lost(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d87c65f97..630be08e3 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -457,6 +457,11 @@ SrsConsumer::~SrsConsumer() #endif } +void SrsConsumer::enable_pass_timestamp() +{ + pass_timestamp = true; +} + void SrsConsumer::set_queue_size(srs_utime_t queue_size) { queue->set_queue_size(queue_size); @@ -2764,6 +2769,11 @@ void SrsSource::request_keyframe() } } +void SrsSource::set_rtc_publisher(SrsRtcPublisher* v) +{ + rtc_publisher = v; +} + srs_error_t SrsSource::on_rtc_audio(SrsSharedPtrMessage* audio) { // TODO: FIXME: Merge with on_audio. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index ac2bad45c..96c6252e3 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -217,7 +217,7 @@ public: virtual ~SrsConsumer(); public: // Use pass timestamp mode. - void enable_pass_timestamp() { pass_timestamp = true; } + void enable_pass_timestamp(); // Set the size of queue. virtual void set_queue_size(srs_utime_t queue_size); // when source id changed, notice client to print. @@ -658,7 +658,7 @@ public: // TODO: FIXME: Maybe we could cache the keyframe. // TODO: FIXME: Maybe we should only response for the new clients. void request_keyframe(); - void set_rtc_publisher(SrsRtcPublisher* v) { rtc_publisher = v; } + void set_rtc_publisher(SrsRtcPublisher* v); // When got RTC audio message, which is encoded in opus. // TODO: FIXME: Merge with on_audio. srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio); diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index 105f75a43..d329a43cf 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -198,6 +198,71 @@ int SrsRtpHeader::nb_bytes() return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0); } +void SrsRtpHeader::set_marker(bool v) +{ + marker = v; +} + +bool SrsRtpHeader::get_marker() const +{ + return marker; +} + +void SrsRtpHeader::set_payload_type(uint8_t v) +{ + payload_type = v; +} + +uint8_t SrsRtpHeader::get_payload_type() const +{ + return payload_type; +} + +void SrsRtpHeader::set_sequence(uint16_t v) +{ + sequence = v; +} + +uint16_t SrsRtpHeader::get_sequence() const +{ + return sequence; +} + +void SrsRtpHeader::set_timestamp(int64_t v) +{ + timestamp = (uint32_t)v; +} + +int64_t SrsRtpHeader::get_timestamp() const +{ + return timestamp; +} + +void SrsRtpHeader::set_ssrc(uint32_t v) +{ + ssrc = v; +} + +uint32_t SrsRtpHeader::get_ssrc() const +{ + return ssrc; +} + +void SrsRtpHeader::set_padding(bool v) +{ + padding = v; +} + +void SrsRtpHeader::set_padding_length(uint8_t v) +{ + padding_length = v; +} + +uint8_t SrsRtpHeader::get_padding_length() const +{ + return padding_length; +} + ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler() { } @@ -285,6 +350,17 @@ SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua() return cache_fua; } +void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h) +{ + decode_handler = h; +} + +void SrsRtpPacket2::set_original_bytes(char* buf, int nn_buf) +{ + original_bytes = buf; + nn_original_bytes = nn_buf; +} + int SrsRtpPacket2::nb_bytes() { if (!cache_payload) { diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index d97abdb6e..4b539b8fe 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -78,19 +78,19 @@ public: virtual srs_error_t encode(SrsBuffer* buf); virtual int nb_bytes(); public: - inline void set_marker(bool v) { marker = v; } - bool get_marker() const { return marker; } - inline void set_payload_type(uint8_t v) { payload_type = v; } - uint8_t get_payload_type() const { return payload_type; } - inline void set_sequence(uint16_t v) { sequence = v; } - uint16_t get_sequence() const { return sequence; } - inline void set_timestamp(int64_t v) { timestamp = (uint32_t)v; } - int64_t get_timestamp() const { return timestamp; } - inline void set_ssrc(uint32_t v) { ssrc = v; } - uint32_t get_ssrc() const { return ssrc; } - inline void set_padding(bool v) { padding = v; } - inline void set_padding_length(uint8_t v) { padding_length = v; } - uint8_t get_padding_length() const { return padding_length; } + void set_marker(bool v); + bool get_marker() const; + void set_payload_type(uint8_t v); + uint8_t get_payload_type() const; + void set_sequence(uint16_t v); + uint16_t get_sequence() const; + void set_timestamp(int64_t v); + int64_t get_timestamp() const; + void set_ssrc(uint32_t v); + uint32_t get_ssrc() const; + void set_padding(bool v); + void set_padding_length(uint8_t v); + uint8_t get_padding_length() const; }; class ISrsRtpPacketDecodeHandler @@ -150,9 +150,9 @@ public: // Reuse the cached fua message as payload. SrsRtpFUAPayload2* reuse_fua(); // Set the decode handler. - void set_decode_handler(ISrsRtpPacketDecodeHandler* h) { decode_handler = h; } + void set_decode_handler(ISrsRtpPacketDecodeHandler* h); // Set the original bytes. - void set_original_bytes(char* buf, int nn_buf) { original_bytes = buf; nn_original_bytes = nn_buf; } + void set_original_bytes(char* buf, int nn_buf); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -304,6 +304,7 @@ public: SrsRtpH264Header& operator=(const SrsRtpH264Header& rhs); }; +// TODO: FIXME: Merge it with shared message. class SrsRtpSharedPacket { private: diff --git a/trunk/src/protocol/srs_stun_stack.cpp b/trunk/src/protocol/srs_stun_stack.cpp index ed3377762..cabc681ff 100644 --- a/trunk/src/protocol/srs_stun_stack.cpp +++ b/trunk/src/protocol/srs_stun_stack.cpp @@ -98,6 +98,96 @@ SrsStunPacket::~SrsStunPacket() { } +bool SrsStunPacket::is_binding_request() const +{ + return message_type == BindingRequest; +} + +bool SrsStunPacket::is_binding_response() const +{ + return message_type == BindingResponse; +} + +uint16_t SrsStunPacket::get_message_type() const +{ + return message_type; +} + +std::string SrsStunPacket::get_username() const +{ + return username; +} + +std::string SrsStunPacket::get_local_ufrag() const +{ + return local_ufrag; +} + +std::string SrsStunPacket::get_remote_ufrag() const +{ + return remote_ufrag; +} + +std::string SrsStunPacket::get_transcation_id() const +{ + return transcation_id; +} + +uint32_t SrsStunPacket::get_mapped_address() const +{ + return mapped_address; +} + +uint16_t SrsStunPacket::get_mapped_port() const +{ + return mapped_port; +} + +bool SrsStunPacket::get_ice_controlled() const +{ + return ice_controlled; +} + +bool SrsStunPacket::get_ice_controlling() const +{ + return ice_controlling; +} + +bool SrsStunPacket::get_use_candidate() const +{ + return use_candidate; +} + +void SrsStunPacket::set_message_type(const uint16_t& m) +{ + message_type = m; +} + +void SrsStunPacket::set_local_ufrag(const std::string& u) +{ + local_ufrag = u; +} + +void SrsStunPacket::set_remote_ufrag(const std::string& u) +{ + remote_ufrag = u; +} + +void SrsStunPacket::set_transcation_id(const std::string& t) +{ + transcation_id = t; +} + +void SrsStunPacket::set_mapped_address(const uint32_t& addr) +{ + mapped_address = addr; +} + +void SrsStunPacket::set_mapped_port(const uint32_t& port) +{ + mapped_port = port; +} + srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) { srs_error_t err = srs_success; diff --git a/trunk/src/protocol/srs_stun_stack.hpp b/trunk/src/protocol/srs_stun_stack.hpp index 92242018d..1f2571138 100644 --- a/trunk/src/protocol/srs_stun_stack.hpp +++ b/trunk/src/protocol/srs_stun_stack.hpp @@ -92,28 +92,25 @@ private: public: SrsStunPacket(); virtual ~SrsStunPacket(); - - bool is_binding_request() const { return message_type == BindingRequest; } - bool is_binding_response() const { return message_type == BindingResponse; } - - uint16_t get_message_type() const { return message_type; } - std::string get_username() const { return username; } - std::string get_local_ufrag() const { return local_ufrag; } - std::string get_remote_ufrag() const { return remote_ufrag; } - std::string get_transcation_id() const { return transcation_id; } - uint32_t get_mapped_address() const { return mapped_address; } - uint16_t get_mapped_port() const { return mapped_port; } - bool get_ice_controlled() const { return ice_controlled; } - bool get_ice_controlling() const { return ice_controlling; } - bool get_use_candidate() const { return use_candidate; } - - void set_message_type(const uint16_t& m) { message_type = m; } - void set_local_ufrag(const std::string& u) { local_ufrag = u; } - void set_remote_ufrag(const std::string& u) { remote_ufrag = u; } - void set_transcation_id(const std::string& t) { transcation_id = t; } - void set_mapped_address(const uint32_t& addr) { mapped_address = addr; } - void set_mapped_port(const uint32_t& port) { mapped_port = port; } - +public: + bool is_binding_request() const; + bool is_binding_response() const; + uint16_t get_message_type() const; + std::string get_username() const; + std::string get_local_ufrag() const; + std::string get_remote_ufrag() const; + std::string get_transcation_id() const; + uint32_t get_mapped_address() const; + uint16_t get_mapped_port() const; + bool get_ice_controlled() const; + bool get_ice_controlling() const; + bool get_use_candidate() const; + void set_message_type(const uint16_t& m); + void set_local_ufrag(const std::string& u); + void set_remote_ufrag(const std::string& u); + void set_transcation_id(const std::string& t); + void set_mapped_address(const uint32_t& addr); + void set_mapped_port(const uint32_t& port); srs_error_t decode(const char* buf, const int nb_buf); srs_error_t encode(const std::string& pwd, SrsBuffer* stream); private: