mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
Fix bug of broadcast stop on receiving the last FEC piece (#1040)
This commit is contained in:
parent
2634e63e4f
commit
11f39acef5
2 changed files with 34 additions and 21 deletions
|
@ -78,7 +78,6 @@ td::Status OverlayFecBroadcastPart::check_signature() {
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Status OverlayFecBroadcastPart::run_checks() {
|
td::Status OverlayFecBroadcastPart::run_checks() {
|
||||||
|
|
||||||
TRY_STATUS(check_time());
|
TRY_STATUS(check_time());
|
||||||
TRY_STATUS(check_duplicate());
|
TRY_STATUS(check_duplicate());
|
||||||
TRY_STATUS(check_source());
|
TRY_STATUS(check_source());
|
||||||
|
@ -94,14 +93,17 @@ void BroadcastFec::broadcast_checked(td::Result<td::Unit> R) {
|
||||||
overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone());
|
overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone());
|
||||||
auto manager = overlay_->overlay_manager();
|
auto manager = overlay_->overlay_manager();
|
||||||
while (!parts_.empty()) {
|
while (!parts_.empty()) {
|
||||||
distribute_part(parts_.begin()->first);
|
distribute_part(parts_.begin()->first);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is_checked_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do we need status here??
|
// Do we need status here??
|
||||||
td::Status BroadcastFec::distribute_part(td::uint32 seqno) {
|
td::Status BroadcastFec::distribute_part(td::uint32 seqno) {
|
||||||
auto i = parts_.find(seqno);
|
auto i = parts_.find(seqno);
|
||||||
if (i == parts_.end()) {
|
if (i == parts_.end()) {
|
||||||
|
VLOG(OVERLAY_WARNING) << "not distibuting empty part " << seqno;
|
||||||
// should not get here
|
// should not get here
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -132,7 +134,6 @@ td::Status BroadcastFec::distribute_part(td::uint32 seqno) {
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Status OverlayFecBroadcastPart::apply() {
|
td::Status OverlayFecBroadcastPart::apply() {
|
||||||
|
|
||||||
if (!bcast_) {
|
if (!bcast_) {
|
||||||
bcast_ = overlay_->get_fec_broadcast(broadcast_hash_);
|
bcast_ = overlay_->get_fec_broadcast(broadcast_hash_);
|
||||||
}
|
}
|
||||||
|
@ -165,16 +166,20 @@ td::Status OverlayFecBroadcastPart::apply() {
|
||||||
return S;
|
return S;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if(untrusted_) {
|
if (untrusted_) {
|
||||||
auto P = td::PromiseCreator::lambda(
|
auto P = td::PromiseCreator::lambda(
|
||||||
[id = broadcast_hash_, overlay_id = actor_id(overlay_)](td::Result<td::Unit> RR) mutable {
|
[id = broadcast_hash_, overlay_id = actor_id(overlay_)](td::Result<td::Unit> RR) mutable {
|
||||||
td::actor::send_closure(std::move(overlay_id), &OverlayImpl::broadcast_checked, id, std::move(RR));
|
td::actor::send_closure(std::move(overlay_id), &OverlayImpl::broadcast_checked, id, std::move(RR));
|
||||||
});
|
});
|
||||||
overlay_->check_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok(), std::move(P));
|
overlay_->check_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok(), std::move(P));
|
||||||
} else {
|
} else {
|
||||||
overlay_->deliver_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok());
|
overlay_->deliver_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
bcast_->set_overlay(overlay_);
|
||||||
|
bcast_->set_src_peer_id(src_peer_id_);
|
||||||
|
TRY_STATUS(bcast_->add_part(seqno_, data_.clone(), export_serialized_short(), export_serialized()));
|
||||||
}
|
}
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -304,7 +309,8 @@ td::Status OverlayFecBroadcastPart::create_new(OverlayImpl *overlay, td::actor::
|
||||||
|
|
||||||
auto B = std::make_unique<OverlayFecBroadcastPart>(
|
auto B = std::make_unique<OverlayFecBroadcastPart>(
|
||||||
broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags,
|
broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags,
|
||||||
part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, adnl::AdnlNodeIdShort::zero());
|
part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay,
|
||||||
|
adnl::AdnlNodeIdShort::zero());
|
||||||
auto to_sign = B->to_sign();
|
auto to_sign = B->to_sign();
|
||||||
|
|
||||||
auto P = td::PromiseCreator::lambda(
|
auto P = td::PromiseCreator::lambda(
|
||||||
|
|
|
@ -82,15 +82,15 @@ class BroadcastFec : public td::ListNode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Status add_part(td::uint32 seqno, td::BufferSlice data,
|
td::Status add_part(td::uint32 seqno, td::BufferSlice data, td::BufferSlice serialized_fec_part_short,
|
||||||
td::BufferSlice serialized_fec_part_short,
|
|
||||||
td::BufferSlice serialized_fec_part) {
|
td::BufferSlice serialized_fec_part) {
|
||||||
CHECK(decoder_);
|
if (decoder_) {
|
||||||
td::fec::Symbol s;
|
td::fec::Symbol s;
|
||||||
s.id = seqno;
|
s.id = seqno;
|
||||||
s.data = std::move(data);
|
s.data = std::move(data);
|
||||||
|
|
||||||
decoder_->add_symbol(std::move(s));
|
decoder_->add_symbol(std::move(s));
|
||||||
|
}
|
||||||
parts_[seqno] = std::pair<td::BufferSlice, td::BufferSlice>(std::move(serialized_fec_part_short),
|
parts_[seqno] = std::pair<td::BufferSlice, td::BufferSlice>(std::move(serialized_fec_part_short),
|
||||||
std::move(serialized_fec_part));
|
std::move(serialized_fec_part));
|
||||||
|
|
||||||
|
@ -200,8 +200,13 @@ class BroadcastFec : public td::ListNode {
|
||||||
|
|
||||||
td::Status distribute_part(td::uint32 seqno);
|
td::Status distribute_part(td::uint32 seqno);
|
||||||
|
|
||||||
|
bool is_checked() const {
|
||||||
|
return is_checked_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool ready_ = false;
|
bool ready_ = false;
|
||||||
|
bool is_checked_ = false;
|
||||||
|
|
||||||
Overlay::BroadcastHash hash_;
|
Overlay::BroadcastHash hash_;
|
||||||
Overlay::BroadcastDataHash data_hash_;
|
Overlay::BroadcastDataHash data_hash_;
|
||||||
|
@ -281,7 +286,7 @@ class OverlayFecBroadcastPart : public td::ListNode {
|
||||||
, signature_(std::move(signature))
|
, signature_(std::move(signature))
|
||||||
, is_short_(is_short)
|
, is_short_(is_short)
|
||||||
, bcast_(bcast)
|
, bcast_(bcast)
|
||||||
, overlay_(overlay)
|
, overlay_(overlay)
|
||||||
, src_peer_id_(src_peer_id) {
|
, src_peer_id_(src_peer_id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +305,7 @@ class OverlayFecBroadcastPart : public td::ListNode {
|
||||||
signature_ = std::move(signature);
|
signature_ = std::move(signature);
|
||||||
}
|
}
|
||||||
void update_overlay(OverlayImpl *overlay);
|
void update_overlay(OverlayImpl *overlay);
|
||||||
|
|
||||||
tl_object_ptr<ton_api::overlay_broadcastFec> export_tl();
|
tl_object_ptr<ton_api::overlay_broadcastFec> export_tl();
|
||||||
tl_object_ptr<ton_api::overlay_broadcastFecShort> export_tl_short();
|
tl_object_ptr<ton_api::overlay_broadcastFecShort> export_tl_short();
|
||||||
td::BufferSlice export_serialized();
|
td::BufferSlice export_serialized();
|
||||||
|
@ -310,14 +315,16 @@ class OverlayFecBroadcastPart : public td::ListNode {
|
||||||
td::Status run() {
|
td::Status run() {
|
||||||
TRY_STATUS(run_checks());
|
TRY_STATUS(run_checks());
|
||||||
TRY_STATUS(apply());
|
TRY_STATUS(apply());
|
||||||
if(!untrusted_) {
|
if (!untrusted_ || bcast_->is_checked()) {
|
||||||
TRY_STATUS(distribute());
|
TRY_STATUS(distribute());
|
||||||
}
|
}
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcastFec> broadcast);
|
static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id,
|
||||||
static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast);
|
tl_object_ptr<ton_api::overlay_broadcastFec> broadcast);
|
||||||
|
static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id,
|
||||||
|
tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast);
|
||||||
static td::Status create_new(OverlayImpl *overlay, td::actor::ActorId<OverlayImpl> overlay_actor_id,
|
static td::Status create_new(OverlayImpl *overlay, td::actor::ActorId<OverlayImpl> overlay_actor_id,
|
||||||
PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size,
|
PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size,
|
||||||
td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type,
|
td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type,
|
||||||
|
|
Loading…
Reference in a new issue