1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

SRT: support rtmp to srt

This commit is contained in:
hondaxiao 2022-04-20 22:23:46 +08:00 committed by winlin
parent 7da792f19d
commit e13d16439e
14 changed files with 453 additions and 161 deletions

View file

@ -2723,7 +2723,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "play" && n != "publish" && n != "cluster"
&& n != "security" && n != "http_remux" && n != "dash"
&& n != "http_static" && n != "hds" && n != "exec"
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc") {
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "srt") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str());
}
// for each sub directives of vhost.
@ -2880,6 +2880,13 @@ srs_error_t SrsConfig::check_normal_config()
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
} else if (n == "srt") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "rtmp_to_srt" && m != "srt_to_rtmp") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.srt.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
}
}
}
@ -6757,20 +6764,6 @@ unsigned short SrsConfig::get_srt_listen_port()
return (unsigned short)atoi(conf->arg0().c_str());
}
bool SrsConfig::get_srt_mix_correct() {
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("srt_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("mix_correct");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
int SrsConfig::get_srto_maxbw() {
static int64_t DEFAULT = -1;
SrsConfDirective* conf = root->get("srt_server");
@ -6971,6 +6964,79 @@ string SrsConfig::get_default_app_name() {
return conf->arg0();
}
bool SrsConfig::get_srt_mix_correct() {
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("srt_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("mix_correct");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_srt(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
return conf? conf->get("srt") : NULL;
}
bool SrsConfig::get_srt_enabled(std::string vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = get_srt(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_srt_to_rtmp(std::string vhost)
{
static bool DEFAULT = true;
SrsConfDirective* conf = get_srt(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("srt_to_rtmp");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_srt_from_rtmp(std::string vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = get_srt(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("rtmp_to_srt");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_http_stream_enabled()
{
SrsConfDirective* conf = root->get("http_server");

View file

@ -662,6 +662,11 @@ public:
virtual std::string get_default_app_name();
// Get the mix_correct
virtual bool get_srt_mix_correct();
public:
SrsConfDirective* get_srt(std::string vhost);
bool get_srt_enabled(std::string vhost);
bool get_srt_to_rtmp(std::string vhost);
bool get_srt_from_rtmp(std::string vhost);
// http_hooks section
private:

View file

@ -270,7 +270,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new source, stream_url=%s", stream_url.c_str());
srs_trace("new rtc source, stream_url=%s", stream_url.c_str());
source = new SrsRtcSource();
if ((err = source->initialize(r)) != srs_success) {

View file

@ -39,6 +39,7 @@ using namespace std;
#include <srs_protocol_utility.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_srt_source.hpp>
// the timeout in srs_utime_t to wait encoder to republish
// if timeout, close the connection.
@ -950,6 +951,31 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
if (!source->can_publish(info->edge)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
}
#ifdef SRS_SRT
if (_srs_config->get_rtc_from_rtmp(req->vhost)) {
SrsSrtSource *srt = NULL;
if (!info->edge) {
if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if (!srt->can_publish()) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "srt stream %s busy", req->get_stream_url().c_str());
}
}
if (srt) {
SrsSrtFromRtmpBridge *bridger = new SrsSrtFromRtmpBridge(srt);
if ((err = bridger->initialize(req)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "bridger init");
}
source->set_bridger(bridger);
}
}
#endif
// Check whether RTC stream is busy.
#ifdef SRS_RTC

View file

@ -1802,7 +1802,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new source, stream_url=%s", stream_url.c_str());
srs_trace("new live source, stream_url=%s", stream_url.c_str());
source = new SrsLiveSource();
if ((err = source->initialize(r, h)) != srs_success) {
@ -1929,7 +1929,7 @@ SrsLiveSource::SrsLiveSource()
handler = NULL;
bridge_ = NULL;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
@ -2635,7 +2635,7 @@ void SrsLiveSource::on_unpublish()
bridge_->on_unpublish();
srs_freep(bridge_);
}
// no consumer, stream is die.
if (consumers.empty()) {
die_at = srs_get_system_time();

View file

@ -468,6 +468,23 @@ public:
// Global singleton instance.
extern SrsLiveSourceManager* _srs_sources;
// Destination type.
enum SrsBridgeDestType {
SrsBridgeDestTypeRtmp = 1,
SrsBridgeDestTypeRTC = 2,
SrsBridgeDestTypeSRT = 3,
};
class ISrsBridge {
public:
ISrsBridge(SrsBridgeDestType type);
virtual ~ISrsBridge();
public:
SrsBridgeDestType get_type() const;
protected:
SrsBridgeDestType type_;
};
// For RTMP2RTC, bridge SrsLiveSource to SrsRtcSource
class ISrsLiveSourceBridge
{

View file

@ -87,6 +87,67 @@ srs_error_t SrsSrtConnection::writev(const iovec *iov, int iov_size, ssize_t* nw
return srs_error_new(ERROR_SRT_CONN, "unsupport method");
}
SrsSrtRecvThread::SrsSrtRecvThread(SrsSrtConnection* srt_conn)
{
srt_conn_ = srt_conn;
trd_ = new SrsSTCoroutine("srt-recv", this, _srs_context->get_id());
recv_err_ = srs_success;
}
SrsSrtRecvThread::~SrsSrtRecvThread()
{
srs_freep(trd_);
srs_error_reset(recv_err_);
}
srs_error_t SrsSrtRecvThread::cycle()
{
srs_error_t err = srs_success;
if ((err = do_cycle()) != srs_success) {
recv_err_ = srs_error_copy(err);
}
return err;
}
srs_error_t SrsSrtRecvThread::do_cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "srt: thread quit");
}
char buf[1316];
ssize_t nb = 0;
if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {
if (srs_error_code(err) != ERROR_SRT_TIMEOUT) {
return srs_error_wrap(err, "srt read");
}
}
}
return err;
}
srs_error_t SrsSrtRecvThread::start()
{
srs_error_t err = srs_success;
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start srt recv thread");
}
return err;
}
srs_error_t SrsSrtRecvThread::get_recv_err()
{
return srs_error_copy(recv_err_);
}
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port)
{
// Create a identify for this client.
@ -178,7 +239,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
{
srs_error_t err = srs_success;
if ((err != fetch_or_create_source()) != srs_success) {
if ((err = fetch_or_create_source()) != srs_success) {
return srs_error_wrap(err, "fetch or create srt source");
}
@ -216,6 +277,10 @@ srs_error_t SrsMpegtsSrtConn::fetch_or_create_source()
return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str());
}
if (! _srs_config->get_srt_enabled(req_->vhost)) {
return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str());
}
srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s",
streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str());
@ -267,23 +332,25 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish()
return srs_error_new(ERROR_SRT_SOURCE_BUSY, "srt stream %s busy", req_->get_stream_url().c_str());
}
// Check rtmp stream is busy.
SrsLiveSource *live_source = _srs_sources->fetch(req_);
if (live_source && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str());
}
if (_srs_config->get_srt_to_rtmp(req_->vhost)) {
// Check rtmp stream is busy.
SrsLiveSource *live_source = _srs_sources->fetch(req_);
if (live_source && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str());
}
if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
SrsRtmpFromTsBridge *bridger = new SrsRtmpFromTsBridge(live_source);
if ((err = bridger->initialize(req_)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
}
SrsRtmpFromSrtBridge *bridger = new SrsRtmpFromSrtBridge(live_source);
if ((err = bridger->initialize(req_)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
}
srt_source_->set_bridger(bridger);
srt_source_->set_bridger(bridger);
}
if ((err = srt_source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "srt source publish");
@ -297,83 +364,6 @@ void SrsMpegtsSrtConn::release_publish()
srt_source_->on_unpublish();
}
/*
srs_error_t SrsMpegtsSrtConn::do_cycle()
{
srs_error_t err = srs_success;
string streamid = "";
if ((err = srs_srt_get_streamid(srt_fd_, streamid)) != srs_success) {
return srs_error_wrap(err, "get srt streamid");
}
// Must have streamid, because srt ts packet will convert to rtmp or rtc.
if (streamid.empty()) {
return srs_error_new(ERROR_SRT_CONN, "empty srt streamid");
}
// Detect streamid of srt to request.
if (! srs_srt_streamid_to_request(streamid, mode_, req_)) {
return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str());
}
srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s",
streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str());
if ((err = _srs_srt_sources->fetch_or_create(req_, &srt_source_)) != srs_success) {
return srs_error_wrap(err, "fetch srt source");
}
if (mode_ == SrtModePush) {
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "srt: callback on publish");
}
// Do srt publish.
if (! srt_source_->can_publish()) {
return srs_error_new(ERROR_SRT_SOURCE_BUSY, "srt stream %s busy", req_->get_stream_url().c_str());
}
SrsLiveSource *live_source = _srs_sources->fetch(req_);
if (live_source && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str());
}
if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
SrsRtmpFromTsBridge *bridger = new SrsRtmpFromTsBridge(live_source);
if ((err = bridger->initialize(req_)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
}
srt_source_->set_bridger(bridger);
if ((err = srt_source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "srt source publish");
}
err = do_publish_cycle();
srt_source_->on_unpublish();
http_hooks_on_unpublish();
} else if (mode_ == SrtModePull) {
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "srt: callback on play");
}
// Do srt play.
err = do_play_cycle();
http_hooks_on_stop();
} else {
srs_assert(false);
}
return err;
}
*/
srs_error_t SrsMpegtsSrtConn::do_publishing()
{
srs_error_t err = srs_success;
@ -394,7 +384,18 @@ srs_error_t SrsMpegtsSrtConn::do_publishing()
// reportable
if (pprint->can_print()) {
SRT_TRACEBSTATS srt_stats;
srs_error_t err_tmp = srs_srt_get_stats(srt_fd_, &srt_stats, true);
if (err_tmp != srs_success) {
srs_freep(err_tmp);
} else {
srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " Transport Stats # "
"pktRecv=%ld, pktRcvLoss=%d, pktRcvRetrans=%d, pktRcvDrop=%d",
srt_stats.pktRecv, srt_stats.pktRcvLoss, srt_stats.pktRcvRetrans, srt_stats.pktRcvDrop);
}
kbps_->sample();
srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%d, packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
(int)pprint->age(), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());
@ -436,6 +437,11 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
SrsPithyPrint* pprint = SrsPithyPrint::create_srt_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsSrtRecvThread srt_recv_trd(srt_conn_);
if ((err = srt_recv_trd.start()) != srs_success) {
return srs_error_wrap(err, "start srt recv trd");
}
int nb_packets = 0;
while (true) {
@ -443,6 +449,10 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
return srs_error_wrap(err, "srt play thread");
}
if ((err = srt_recv_trd.get_recv_err()) != srs_success) {
return srs_error_wrap(err, "srt play recv thread");
}
pprint->elapse();
// Wait for amount of packets.
@ -451,13 +461,24 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
consumer->dump_packet(&pkt);
if (!pkt) {
// TODO: FIXME: We should check the quit event.
consumer->wait(1);
consumer->wait(1, 1000 * SRS_UTIME_MILLISECONDS);
continue;
}
// reportable
if (pprint->can_print()) {
SRT_TRACEBSTATS srt_stats;
srs_error_t err_tmp = srs_srt_get_stats(srt_fd_, &srt_stats, true);
if (err_tmp != srs_success) {
srs_freep(err_tmp);
} else {
srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " Transport Stats # "
"pktSent=%ld, pktSndLoss=%d, pktRetrans=%d, pktSndDrop=%d",
srt_stats.pktSent, srt_stats.pktSndLoss, srt_stats.pktRetrans, srt_stats.pktSndDrop);
}
kbps_->sample();
srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%d, packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
(int)pprint->age(), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());

View file

@ -51,6 +51,25 @@ private:
SrsSrtSocket* srt_skt_;
};
class SrsSrtRecvThread : public ISrsCoroutineHandler
{
public:
SrsSrtRecvThread(SrsSrtConnection* srt_conn);
~SrsSrtRecvThread();
// Interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
public:
srs_error_t start();
srs_error_t get_recv_err();
private:
SrsSrtConnection* srt_conn_;
SrsCoroutine* trd_;
srs_error_t recv_err_;
};
class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler
{
public:

View file

@ -380,12 +380,14 @@ srs_error_t SrsSrtEventLoop::cycle()
return srs_error_wrap(err, "srt listener");
}
// Check events fired, return directly.
if ((err = srt_poller_->wait(0)) != srs_success) {
srs_error("srt poll wait failed, err=%s", srs_error_desc(err).c_str());
srs_error_reset(err);
}
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Schedule srt event by state-thread.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
}
return err;

View file

@ -125,7 +125,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** p
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new ts source, stream_url=%s", stream_url.c_str());
srs_trace("new srt source, stream_url=%s", stream_url.c_str());
source = new SrsSrtSource();
if ((err = source->initialize(r)) != srs_success) {
@ -218,7 +218,7 @@ srs_error_t SrsSrtConsumer::dump_packet(SrsSrtPacket** ppkt)
return err;
}
void SrsSrtConsumer::wait(int nb_msgs)
void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
{
mw_min_msgs = nb_msgs;
@ -231,18 +231,19 @@ void SrsSrtConsumer::wait(int nb_msgs)
mw_waiting = true;
// use cond block wait for high performance mode.
srs_cond_wait(mw_wait);
srs_cond_timedwait(mw_wait, timeout);
}
ISrsTsSourceBridger::ISrsTsSourceBridger()
ISrsSrtSourceBridge::ISrsSrtSourceBridge(SrsBridgeDestType type) : ISrsBridge(type)
{
}
ISrsTsSourceBridger::~ISrsTsSourceBridger()
ISrsSrtSourceBridge::~ISrsSrtSourceBridge()
{
}
SrsRtmpFromTsBridge::SrsRtmpFromTsBridge(SrsLiveSource* source)
SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source)
: ISrsSrtSourceBridge(SrsBridgeDestTypeRtmp)
{
ts_ctx_ = new SrsTsContext();
@ -254,13 +255,13 @@ SrsRtmpFromTsBridge::SrsRtmpFromTsBridge(SrsLiveSource* source)
req_ = NULL;
}
SrsRtmpFromTsBridge::~SrsRtmpFromTsBridge()
SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge()
{
srs_freep(ts_ctx_);
srs_freep(req_);
}
srs_error_t SrsRtmpFromTsBridge::on_publish()
srs_error_t SrsRtmpFromSrtBridge::on_publish()
{
srs_error_t err = srs_success;
@ -271,7 +272,7 @@ srs_error_t SrsRtmpFromTsBridge::on_publish()
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_packet(SrsSrtPacket *pkt)
srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt)
{
srs_error_t err = srs_success;
@ -297,12 +298,12 @@ srs_error_t SrsRtmpFromTsBridge::on_packet(SrsSrtPacket *pkt)
return err;
}
void SrsRtmpFromTsBridge::on_unpublish()
void SrsRtmpFromSrtBridge::on_unpublish()
{
live_source_->on_unpublish();
}
srs_error_t SrsRtmpFromTsBridge::initialize(SrsRequest* req)
srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req)
{
srs_error_t err = srs_success;
@ -312,7 +313,7 @@ srs_error_t SrsRtmpFromTsBridge::initialize(SrsRequest* req)
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_ts_message(SrsTsMessage* msg)
srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg)
{
srs_error_t err = srs_success;
@ -352,7 +353,7 @@ srs_error_t SrsRtmpFromTsBridge::on_ts_message(SrsTsMessage* msg)
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
srs_error_t SrsRtmpFromSrtBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
{
srs_error_t err = srs_success;
@ -419,7 +420,7 @@ srs_error_t SrsRtmpFromTsBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
return on_h264_frame(msg, ipb_frames);
}
srs_error_t SrsRtmpFromTsBridge::check_sps_pps_change(SrsTsMessage* msg)
srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg)
{
srs_error_t err = srs_success;
@ -464,7 +465,7 @@ srs_error_t SrsRtmpFromTsBridge::check_sps_pps_change(SrsTsMessage* msg)
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
{
srs_error_t err = srs_success;
@ -520,7 +521,7 @@ srs_error_t SrsRtmpFromTsBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<ch
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
srs_error_t SrsRtmpFromSrtBridge::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
{
srs_error_t err = srs_success;
@ -582,7 +583,7 @@ srs_error_t SrsRtmpFromTsBridge::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
return err;
}
srs_error_t SrsRtmpFromTsBridge::check_audio_sh_change(SrsTsMessage* msg, uint32_t pts)
srs_error_t SrsRtmpFromSrtBridge::check_audio_sh_change(SrsTsMessage* msg, uint32_t pts)
{
srs_error_t err = srs_success;
@ -613,7 +614,7 @@ srs_error_t SrsRtmpFromTsBridge::check_audio_sh_change(SrsTsMessage* msg, uint32
return err;
}
srs_error_t SrsRtmpFromTsBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size)
srs_error_t SrsRtmpFromSrtBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size)
{
srs_error_t err = srs_success;
@ -643,7 +644,6 @@ SrsSrtSource::SrsSrtSource()
{
req = NULL;
can_publish_ = true;
bridger_ = NULL;
}
SrsSrtSource::~SrsSrtSource()
@ -652,7 +652,11 @@ SrsSrtSource::~SrsSrtSource()
// for all consumers are auto free.
consumers.clear();
srs_freep(bridger_);
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
srs_freep(bridge);
}
bridgers_.clear();
}
srs_error_t SrsSrtSource::initialize(SrsRequest* r)
@ -702,10 +706,18 @@ void SrsSrtSource::update_auth(SrsRequest* r)
req->update_auth(r);
}
void SrsSrtSource::set_bridger(ISrsTsSourceBridger *bridger)
void SrsSrtSource::set_bridger(ISrsSrtSourceBridge *bridger)
{
srs_freep(bridger_);
bridger_ = bridger;
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsSrtSourceBridge* b = *iter;
if (b->get_type() == bridger->get_type()) {
srs_freep(b);
*iter = bridger;
return;
}
}
bridgers_.push_back(bridger);
}
srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
@ -752,8 +764,9 @@ srs_error_t SrsSrtSource::on_publish()
return srs_error_wrap(err, "source id change");
}
if (bridger_) {
if ((err = bridger_->on_publish()) != srs_success) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
if ((err = bridge->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger on publish");
}
}
@ -773,10 +786,12 @@ void SrsSrtSource::on_unpublish()
can_publish_ = true;
if (bridger_) {
bridger_->on_unpublish();
srs_freep(bridger_);
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
bridge->on_unpublish();
srs_freep(bridge);
}
bridgers_.clear();
}
srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
@ -790,8 +805,104 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
}
}
if (bridger_ && (err = bridger_->on_packet(packet)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
if ((err = bridge->on_packet(packet)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
}
}
return err;
}
SrsSrtFromRtmpBridge::SrsSrtFromRtmpBridge(SrsSrtSource* source)
: ISrsLiveSourceBridger(SrsBridgeDestTypeSRT)
{
srt_source_ = source;
ts_muxer_ = NULL;
offset_ = 0;
}
SrsSrtFromRtmpBridge::~SrsSrtFromRtmpBridge()
{
srs_freep(ts_muxer_);
}
srs_error_t SrsSrtFromRtmpBridge::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
// TODO: FIXME: check config.
req_ = r;
ts_muxer_ = new SrsTsTransmuxer();
if ((err = ts_muxer_->initialize(this)) != srs_success) {
return srs_error_wrap(err, "init ts muxer");
}
return err;
}
srs_error_t SrsSrtFromRtmpBridge::on_publish()
{
srs_error_t err = srs_success;
// TODO: FIXME: check if enable rtmp_to_srt
if ((err = srt_source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "source publish");
}
return err;
}
void SrsSrtFromRtmpBridge::on_unpublish()
{
// TODO: FIXME: check if enable rtmp_to_srt
srt_source_->on_unpublish();
}
srs_error_t SrsSrtFromRtmpBridge::on_audio(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
if ((err = ts_muxer_->write_audio(msg->timestamp, msg->payload, msg->size)) != srs_success) {
return srs_error_wrap(err, "rtmp to srt, ts mux audio");
}
return err;
}
srs_error_t SrsSrtFromRtmpBridge::on_video(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
if ((err = ts_muxer_->write_video(msg->timestamp, msg->payload, msg->size)) != srs_success) {
return srs_error_wrap(err, "rtmp to srt, ts mux video");
}
return err;
}
srs_error_t SrsSrtFromRtmpBridge::write(void* buf, size_t size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
if (size % SRS_TS_PACKET_SIZE != 0) {
return srs_error_new(ERROR_RTMP_TO_SRT, "invalid ts size=%u", size);
}
for (int i = 0; i < size; i += SRS_TS_PACKET_SIZE) {
memcpy(ts_buf_ + offset_, (const char*)buf + i, SRS_TS_PACKET_SIZE);
offset_ += SRS_TS_PACKET_SIZE;
if (offset_ >= 1316) {
offset_ = 0;
SrsSrtPacket* packet = new SrsSrtPacket();
SrsAutoFree(SrsSrtPacket, packet);
packet->wrap(ts_buf_, 1316);
srt_source_->on_packet(packet);
}
}
return err;

View file

@ -14,6 +14,7 @@
#include <srs_kernel_ts.hpp>
#include <srs_service_st.hpp>
#include <srs_app_source.hpp>
class SrsSharedPtrMessage;
class SrsRequest;
@ -86,25 +87,25 @@ public:
// For SRT, we only got one packet, because there is not many packets in queue.
virtual srs_error_t dump_packet(SrsSrtPacket** ppkt);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
virtual void wait(int nb_msgs, srs_utime_t timeout);
};
class ISrsTsSourceBridger
class ISrsSrtSourceBridge : public ISrsBridge
{
public:
ISrsTsSourceBridger();
virtual ~ISrsTsSourceBridger();
ISrsSrtSourceBridge(SrsBridgeDestType type);
virtual ~ISrsSrtSourceBridge();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0;
virtual void on_unpublish() = 0;
};
class SrsRtmpFromTsBridge : public ISrsTsSourceBridger, public ISrsTsHandler
class SrsRtmpFromSrtBridge : public ISrsSrtSourceBridge, public ISrsTsHandler
{
public:
SrsRtmpFromTsBridge(SrsLiveSource* source);
virtual ~SrsRtmpFromTsBridge();
SrsRtmpFromSrtBridge(SrsLiveSource* source);
virtual ~SrsRtmpFromSrtBridge();
public:
virtual srs_error_t on_publish();
virtual srs_error_t on_packet(SrsSrtPacket *pkt);
@ -153,7 +154,7 @@ public:
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
public:
void set_bridger(ISrsTsSourceBridger *bridger);
void set_bridger(ISrsSrtSourceBridge *bridger);
public:
// Create consumer
// @param consumer, output the create consumer.
@ -178,18 +179,14 @@ private:
// To delivery packets to clients.
std::vector<SrsSrtConsumer*> consumers;
bool can_publish_;
ISrsTsSourceBridger* bridger_;
std::vector<ISrsSrtSourceBridge*> bridgers_;
};
/*
class SrsTsFromRtmpBridger : public ISrsLiveSourceBridger
class SrsSrtFromRtmpBridge : public ISrsLiveSourceBridger, public ISrsStreamWriter
{
private:
SrsRequest* req;
SrsSrtSource* source_;
public:
SrsTsFromRtmpBridger(SrsSrtSource* source);
virtual ~SrsTsFromRtmpBridger();
SrsSrtFromRtmpBridge(SrsSrtSource* source);
virtual ~SrsSrtFromRtmpBridge();
public:
virtual srs_error_t initialize(SrsRequest* r);
// Interface for ISrsLiveSourceBridger
@ -198,8 +195,16 @@ public:
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* msg);
virtual srs_error_t on_video(SrsSharedPtrMessage* msg);
// Interface for ISrsStreamWriter
public:
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
private:
SrsRequest* req_;
SrsSrtSource* srt_source_;
SrsTsTransmuxer* ts_muxer_;
char ts_buf_[1316];
int offset_;
};
*/
#endif

View file

@ -358,6 +358,8 @@
#define ERROR_SRT_SOCKOPT 6005
#define ERROR_SRT_CONN 6006
#define ERROR_SRT_SOURCE_BUSY 6007
#define ERROR_RTMP_TO_SRT 6008
#define ERROR_SRT_STATS 6009
///////////////////////////////////////////////////////
// HTTP API error.

View file

@ -376,6 +376,18 @@ srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& p
return err;
}
srs_error_t srs_srt_get_stats(SRTSOCKET srt_fd, SRT_TRACEBSTATS* srt_stats, bool clear)
{
srs_error_t err = srs_success;
int ret = srt_bstats(srt_fd, srt_stats, clear);
if (ret != 0) {
return srs_error_new(ERROR_SRT_STATS, "srt_bstats");
}
return err;
}
SrsSrtPoller::SrsSrtPoller()
{
srt_epoller_fd_ = -1;
@ -395,6 +407,9 @@ srs_error_t SrsSrtPoller::initialize()
srt_epoller_fd_ = srt_epoll_create();
events_.resize(1024);
// Enable srt empty poller, avoid warning.
srt_epoll_set(srt_epoller_fd_, SRT_EPOLL_ENABLE_EMPTY);
return err;
}
@ -442,7 +457,7 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms)
// wait srt event fired, will timeout after `timeout_ms` milliseconds.
int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size(), timeout_ms);
if (ret < 0) {
return srs_error_new(ERROR_SRT_EPOLL, "srt_epoll_uwait, ret=%d", ret);
return srs_error_new(ERROR_SRT_EPOLL, "srt_epoll_uwait, ret=%d, err=%s", ret, srt_getlasterror_str());
}
for (int i = 0; i < ret; ++i) {

View file

@ -61,6 +61,9 @@ extern srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid)
extern srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port);
extern srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port);
// Get SRT stats.
extern srs_error_t srs_srt_get_stats(SRTSOCKET srt_fd, SRT_TRACEBSTATS* srt_stats, bool clear);
class SrsSrtSocket;
// Srt poller, subscribe/unsubscribed events and wait them fired.