1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Use SrsAsyncCallWorker in http hooks instead, to covert to async call. (#2542)

* Use SrsAsyncCallWorker in http hooks instead, to covert to async call.

* delete invalid function
This commit is contained in:
Haibo Chen 2021-08-27 07:44:19 +08:00 committed by winlin
parent c8bf98e8e2
commit c4a64ee470
4 changed files with 162 additions and 86 deletions

View file

@ -351,6 +351,53 @@ srs_error_t SrsRtcPLIWorker::cycle()
return err;
}
SrsRtcAsyncCallOnStop::SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest * r)
{
cid = c;
req = r->copy();
}
SrsRtcAsyncCallOnStop::~SrsRtcAsyncCallOnStop()
{
srs_freep(req);
}
srs_error_t SrsRtcAsyncCallOnStop::call()
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req);
}
return err;
}
std::string SrsRtcAsyncCallOnStop::to_string()
{
return std::string("");
}
SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
{
@ -379,9 +426,8 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
SrsRtcPlayStream::~SrsRtcPlayStream()
{
// TODO: FIXME: Use SrsAsyncCallWorker in http hooks instead, to covert to async call.
if (req_) {
http_hooks_on_stop();
session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
}
// TODO: FIXME: Should not do callback in de-constructor?
@ -871,35 +917,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
return err;
}
void SrsRtcPlayStream::http_hooks_on_stop()
{
if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost)) {
return;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req_->vhost);
if (!conf) {
return;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req_);
}
return;
}
SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
@ -979,6 +996,54 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
return err;
}
SrsRtcAsyncCallOnUnpublish::SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest * r)
{
cid = c;
req = r->copy();
}
SrsRtcAsyncCallOnUnpublish::~SrsRtcAsyncCallOnUnpublish()
{
srs_freep(req);
}
srs_error_t SrsRtcAsyncCallOnUnpublish::call()
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_unpublish(url, req);
}
return err;
}
std::string SrsRtcAsyncCallOnUnpublish::to_string()
{
return std::string("");
}
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{
cid_ = cid;
@ -988,7 +1053,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
pli_epp = new SrsErrorPithyPrint();
twcc_epp_ = new SrsErrorPithyPrint(3.0);
req = NULL;
req_ = NULL;
source = NULL;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
@ -1009,8 +1074,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
if (req) {
http_hooks_on_unpublish();
if (req_) {
session_->server_->exec_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
}
srs_freep(timer_rtcp_);
@ -1027,7 +1092,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
// it must be called after source stream unpublish (set source stream is_created=false).
// if not, it lead to republish failed.
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_publish(session_, this, req);
_srs_rtc_hijacker->on_stop_publish(session_, this, req_);
}
for (int i = 0; i < (int)video_tracks_.size(); ++i) {
@ -1045,7 +1110,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(pli_worker_);
srs_freep(twcc_epp_);
srs_freep(pli_epp);
srs_freep(req);
srs_freep(req_);
// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
@ -1056,7 +1121,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
{
srs_error_t err = srs_success;
req = r->copy();
req_ = r->copy();
if (stream_desc->audio_track_desc_) {
audio_tracks_.push_back(new SrsRtcAudioRecvTrack(session_, stream_desc->audio_track_desc_));
@ -1083,10 +1148,10 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
rtcp_twcc_.set_media_ssrc(media_ssrc);
}
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);
nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost);
twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req->vhost);
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost);
nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req_->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req_->vhost);
twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req_->vhost);
// No TWCC when negotiate, disable it.
if (twcc_id <= 0) {
@ -1107,14 +1172,14 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
}
// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req_, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->set_publish_stream(this);
// Bridge to rtmp
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost);
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
SrsLiveSource *rtmp = NULL;
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
@ -1160,14 +1225,14 @@ srs_error_t SrsRtcPublishStream::start()
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req_)) != srs_success) {
return srs_error_wrap(err, "on start publish");
}
}
// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req, session_, SrsRtcConnPublish)) != srs_success) {
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
@ -1385,7 +1450,7 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt)) != srs_success) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req_, pkt)) != srs_success) {
return srs_error_wrap(err, "on rtp packet");
}
}
@ -1677,33 +1742,6 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& n
}
}
void SrsRtcPublishStream::http_hooks_on_unpublish()
{
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost);
if (!conf) {
return;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_unpublish(url, req);
}
}
ISrsRtcConnectionHijacker::ISrsRtcConnectionHijacker()
{
}
@ -1753,7 +1791,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
{
req = NULL;
req_ = NULL;
cid_ = cid;
hijacker_ = NULL;
@ -1820,7 +1858,7 @@ SrsRtcConnection::~SrsRtcConnection()
srs_freep(cache_buffer_);
srs_freep(transport_);
srs_freep(req);
srs_freep(req_);
srs_freep(pp_address_change);
srs_freep(pli_epp);
}
@ -2019,7 +2057,7 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
srs_error_t err = srs_success;
username_ = username;
req = r->copy();
req_ = r->copy();
if (!srtp) {
srs_freep(transport_);
@ -2036,10 +2074,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
}
// TODO: FIXME: Support reload.
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
session_timeout = _srs_config->get_rtc_stun_timeout(req_->vhost);
last_stun_time = srs_get_system_time();
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost);
srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms, nack=%d",
username.c_str(), r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role.c_str(), cfg->dtls_version.c_str(),
@ -2691,7 +2729,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
++_srs_pps_sstuns->sugar;
bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost);
bool strict_check = _srs_config->get_rtc_stun_strict_check(req_->vhost);
if (strict_check && r->get_ice_controlled()) {
// @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1
// TODO: Send 487 (Role Conflict) error response.