1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

UniquePtr: Support SrsUniquePtr to replace SrsAutoFree. v6.0.136 (#4109)

To manage an object:

```cpp
// Before
MyClass* ptr = new MyClass();
SrsAutoFree(MyClass, ptr);
ptr->do_something();

// Now
SrsUniquePtr<MyClass> ptr(new MyClass());
ptr->do_something();
```

To manage an array of objects:

```cpp
// Before
char* ptr = new char[10];
SrsAutoFreeA(char, ptr);
ptr[0] = 0xf;

// Now
SrsUniquePtr<char[]> ptr(new char[10]);
ptr[0] = 0xf;
```

In fact, SrsUniquePtr is a limited subset of SrsAutoFree, mainly
managing pointers and arrays. SrsUniquePtr is better than SrsAutoFree
because it has the same API to standard unique ptr.

```cpp
SrsUniquePtr<MyClass> ptr(new MyClass());
ptr->do_something();
MyClass* p = ptr.get();
```

SrsAutoFree actually uses a pointer to a pointer, so it can be set to
NULL, allowing the pointer's value to be changed later (this usage is
different from SrsUniquePtr).

```cpp
// OK to free ptr correctly.
MyClass* ptr;
SrsAutoFree(MyClass, ptr);
ptr = new MyClass();

// Crash because ptr is an invalid pointer.
MyClass* ptr;
SrsUniquePtr<MyClass> ptr(ptr);
ptr = new MyClass();
```

Additionally, SrsAutoFreeH can use specific release functions, which
SrsUniquePtr does not support.

---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
This commit is contained in:
Winlin 2024-07-09 10:29:36 +08:00 committed by GitHub
parent baf22d01c1
commit 23d2602c34
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
72 changed files with 1720 additions and 1669 deletions

View file

@ -755,25 +755,26 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
set_sock_options();
// Create a consumer of source.
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
SrsLiveConsumer* consumer_raw = NULL;
if ((err = source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
if ((err = source->consumer_dumps(consumer)) != srs_success) {
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);
if ((err = source->consumer_dumps(consumer.get())) != srs_success) {
return srs_error_wrap(err, "rtmp: dumps consumer");
}
// Use receiving thread to receive packets from peer.
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
SrsQueueRecvThread trd(consumer.get(), rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}
// Deliver packets to peer.
wakable = consumer;
err = do_playing(source, consumer, &trd);
wakable = consumer.get();
err = do_playing(source, consumer.get(), &trd);
wakable = NULL;
trd.stop();
@ -795,9 +796,8 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
srs_assert(consumer);
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsUniquePtr<SrsPithyPrint> pprint(SrsPithyPrint::create_rtmp_play());
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
@ -816,9 +816,8 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
#ifdef SRS_APM
ISrsApmSpan* span = _srs_apm->span("play-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
->attr("realtime", srs_fmt("%d", realtime))->end();
SrsAutoFree(ISrsApmSpan, span);
SrsUniquePtr<ISrsApmSpan> span(_srs_apm->span("play-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
->attr("realtime", srs_fmt("%d", realtime))->end());
#endif
while (true) {
@ -866,7 +865,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
#ifdef SRS_APM
// TODO: Do not use pithy print for frame span.
ISrsApmSpan* sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
ISrsApmSpan* sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())
->attr("msgs", srs_fmt("%d", count))->attr("kbps", srs_fmt("%d", kbps->get_send_kbps_30s()));
srs_freep(sample);
#endif
@ -974,8 +973,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
srs_error_t err = srs_success;
SrsRequest* req = info->req;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
SrsUniquePtr<SrsPithyPrint> pprint(SrsPithyPrint::create_rtmp_publish());
// start isolate recv thread.
// TODO: FIXME: Pass the callback here.
@ -998,9 +996,8 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
}
#ifdef SRS_APM
ISrsApmSpan* span = _srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
->attr("timeout", srs_fmt("%d", srsu2msi(publish_normal_timeout)))->end();
SrsAutoFree(ISrsApmSpan, span);
SrsUniquePtr<ISrsApmSpan> span(_srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
->attr("timeout", srs_fmt("%d", srsu2msi(publish_normal_timeout)))->end());
#endif
// Response the start publishing message, let client start to publish messages.
@ -1062,7 +1059,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
#ifdef SRS_APM
// TODO: Do not use pithy print for frame span.
ISrsApmSpan* sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
ISrsApmSpan* sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())
->attr("msgs", srs_fmt("%" PRId64, nb_frames))->attr("kbps", srs_fmt("%d", kbps->get_recv_kbps_30s()));
srs_freep(sample);
#endif
@ -1158,12 +1155,12 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr<SrsLiveSource>& sou
// process publish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
SrsPacket* pkt_raw = NULL;
if ((err = rtmp->decode_message(msg, &pkt_raw)) != srs_success) {
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
SrsUniquePtr<SrsPacket> pkt(pkt_raw);
// for flash, any packet is republish.
if (info->type == SrsRtmpConnFlashPublish) {
// flash unpublish.
@ -1173,8 +1170,8 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr<SrsLiveSource>& sou
}
// for fmle, drop others except the fmle start packet.
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
if (dynamic_cast<SrsFMLEStartPacket*>(pkt.get())) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt.get());
if ((err = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: republish");
}
@ -1230,14 +1227,14 @@ srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr<SrsLiveSource>& so
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
SrsPacket* pkt_raw = NULL;
if ((err = rtmp->decode_message(msg, &pkt_raw)) != srs_success) {
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
SrsUniquePtr<SrsPacket> pkt(pkt_raw);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt.get())) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt.get());
if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume metadata");
}
@ -1249,27 +1246,27 @@ srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr<SrsLiveSource>& so
return err;
}
srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg_raw)
{
srs_error_t err = srs_success;
if (!msg) {
if (!msg_raw) {
return err;
}
SrsAutoFree(SrsCommonMessage, msg);
SrsUniquePtr<SrsCommonMessage> msg(msg_raw);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
return err;
}
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
SrsPacket* pkt_raw = NULL;
if ((err = rtmp->decode_message(msg.get(), &pkt_raw)) != srs_success) {
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
SrsUniquePtr<SrsPacket> pkt(pkt_raw);
// for jwplayer/flowplayer, which send close as pause message.
SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);
SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt.get());
if (close) {
return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");
}
@ -1277,7 +1274,7 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, Srs
// call msg,
// support response null first,
// TODO: FIXME: response in right way, or forward in edge mode.
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt.get());
if (call) {
// only response it when transaction id not zero,
// for the zero means donot need response.
@ -1293,7 +1290,7 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, Srs
}
// pause
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt.get());
if (pause) {
if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {
return srs_error_wrap(err, "rtmp: pause");
@ -1343,19 +1340,16 @@ srs_error_t SrsRtmpConn::check_edge_token_traverse_auth()
string server;
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(hostport, server, port);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT);
SrsAutoFree(SrsTcpClient, transport);
SrsUniquePtr<SrsTcpClient> transport(new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT));
if ((err = transport->connect()) != srs_success) {
srs_warn("Illegal edge token, tcUrl=%s, %s", req->tcUrl.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
continue;
}
SrsRtmpClient* client = new SrsRtmpClient(transport);
SrsAutoFree(SrsRtmpClient, client);
return do_token_traverse_auth(client);
SrsUniquePtr<SrsRtmpClient> client(new SrsRtmpClient(transport.get()));
return do_token_traverse_auth(client.get());
}
return srs_error_new(ERROR_EDGE_PORT_INVALID, "rtmp: Illegal edge token, server=%d", (int)args.size());
@ -1611,8 +1605,7 @@ srs_error_t SrsRtmpConn::cycle()
#ifdef SRS_APM
// Final APM span, parent is the last span, not the root span. Note that only client or server kind will be filtered
// for error or exception report.
ISrsApmSpan* span_final = _srs_apm->span("final")->set_kind(SrsApmKindServer)->as_child(span_client_);
SrsAutoFree(ISrsApmSpan, span_final);
SrsUniquePtr<ISrsApmSpan> span_final(_srs_apm->span("final")->set_kind(SrsApmKindServer)->as_child(span_client_));
if (srs_error_code(err) != 0) {
span_final->record_error(err)->set_status(SrsApmStatusError, srs_fmt("fail code=%d", srs_error_code(err)));
}