1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00
srs/trunk/src/app/srs_app_srt_source.hpp
2022-06-20 19:22:25 +08:00

186 lines
5.2 KiB
C++

//
// Copyright (c) 2013-2022 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRS_APP_SRT_SOURCE_HPP
#define SRS_APP_SRT_SOURCE_HPP
#include <srs_core.hpp>
#include <map>
#include <vector>
#include <srs_kernel_ts.hpp>
#include <srs_protocol_st.hpp>
#include <srs_app_source.hpp>
class SrsSharedPtrMessage;
class SrsRequest;
class SrsLiveSource;
class SrsSrtSource;
// The SRT packet with shared message.
class SrsSrtPacket
{
public:
SrsSrtPacket();
virtual ~SrsSrtPacket();
public:
// Wrap buffer to shared_message, which is managed by us.
char* wrap(int size);
char* wrap(char* data, int size);
// Wrap the shared message, we copy it.
char* wrap(SrsSharedPtrMessage* msg);
// Copy the SRT packet.
virtual SrsSrtPacket* copy();
public:
char* data();
int size();
private:
SrsSharedPtrMessage* shared_buffer_;
// The size of SRT packet or SRT payload.
int actual_buffer_size_;
};
class SrsSrtSourceManager
{
private:
srs_mutex_t lock;
std::map<std::string, SrsSrtSource*> pool;
public:
SrsSrtSourceManager();
virtual ~SrsSrtSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSrtSource** pps);
public:
// Get the exists source, NULL when not exists.
virtual SrsSrtSource* fetch(SrsRequest* r);
};
// Global singleton instance.
extern SrsSrtSourceManager* _srs_srt_sources;
class SrsSrtConsumer
{
public:
SrsSrtConsumer(SrsSrtSource* source);
virtual ~SrsSrtConsumer();
private:
SrsSrtSource* source;
std::vector<SrsSrtPacket*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
// The cond wait for mw.
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
public:
// When source id changed, notice client to print.
void update_source_id();
// Put SRT packet into queue.
srs_error_t enqueue(SrsSrtPacket* packet);
// For SRT, we only got one packet, because there is not many packets in queue.
virtual srs_error_t dump_packet(SrsSrtPacket** ppkt);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs, srs_utime_t timeout);
};
class ISrsSrtSourceBridge
{
public:
ISrsSrtSourceBridge();
virtual ~ISrsSrtSourceBridge();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0;
virtual void on_unpublish() = 0;
};
class SrsRtmpFromSrtBridge : public ISrsSrtSourceBridge, public ISrsTsHandler
{
public:
SrsRtmpFromSrtBridge(SrsLiveSource* source);
virtual ~SrsRtmpFromSrtBridge();
public:
virtual srs_error_t on_publish();
virtual srs_error_t on_packet(SrsSrtPacket *pkt);
virtual void on_unpublish();
public:
srs_error_t initialize(SrsRequest* req);
// Interface ISrsTsHandler
public:
virtual srs_error_t on_ts_message(SrsTsMessage* msg);
private:
srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs);
srs_error_t on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs);
srs_error_t check_sps_pps_change(SrsTsMessage* msg);
srs_error_t on_h264_frame(SrsTsMessage* msg, std::vector<std::pair<char*, int> >& ipb_frames);
srs_error_t check_audio_sh_change(SrsTsMessage* msg, uint32_t pts);
srs_error_t on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size);
private:
SrsTsContext* ts_ctx_;
// Record sps/pps had changed, if change, need to generate new video sh frame.
bool sps_pps_change_;
std::string sps_;
std::string pps_;
// Record audio sepcific config had changed, if change, need to generate new audio sh frame.
bool audio_sh_change_;
std::string audio_sh_;
SrsRequest* req_;
SrsLiveSource* live_source_;
};
class SrsSrtSource
{
public:
SrsSrtSource();
virtual ~SrsSrtSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id);
// Get current source id.
virtual SrsContextId source_id();
virtual SrsContextId pre_source_id();
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
public:
void set_bridge(ISrsSrtSourceBridge *bridger);
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsSrtConsumer*& consumer);
// Dumps packets in cache to consumer.
virtual srs_error_t consumer_dumps(SrsSrtConsumer* consumer);
virtual void on_consumer_destroy(SrsSrtConsumer* consumer);
// Whether we can publish stream to the source, return false if it exists.
virtual bool can_publish();
// When start publish stream.
virtual srs_error_t on_publish();
// When stop publish stream.
virtual void on_unpublish();
public:
srs_error_t on_packet(SrsSrtPacket* packet);
private:
// Source id.
SrsContextId _source_id;
// previous source id.
SrsContextId _pre_source_id;
SrsRequest* req;
// To delivery packets to clients.
std::vector<SrsSrtConsumer*> consumers;
bool can_publish_;
ISrsSrtSourceBridge* bridge_;
};
#endif