diff --git a/overlay/overlay-fec-broadcast.cpp b/overlay/overlay-fec-broadcast.cpp index aed5248b..cd030742 100644 --- a/overlay/overlay-fec-broadcast.cpp +++ b/overlay/overlay-fec-broadcast.cpp @@ -78,7 +78,6 @@ td::Status OverlayFecBroadcastPart::check_signature() { } td::Status OverlayFecBroadcastPart::run_checks() { - TRY_STATUS(check_time()); TRY_STATUS(check_duplicate()); TRY_STATUS(check_source()); @@ -94,14 +93,17 @@ void BroadcastFec::broadcast_checked(td::Result R) { overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone()); auto manager = overlay_->overlay_manager(); while (!parts_.empty()) { - distribute_part(parts_.begin()->first); + distribute_part(parts_.begin()->first); } + + is_checked_ = true; } // Do we need status here?? -td::Status BroadcastFec::distribute_part(td::uint32 seqno) { +td::Status BroadcastFec::distribute_part(td::uint32 seqno) { auto i = parts_.find(seqno); if (i == parts_.end()) { + VLOG(OVERLAY_WARNING) << "not distibuting empty part " << seqno; // should not get here return td::Status::OK(); } @@ -132,7 +134,6 @@ td::Status BroadcastFec::distribute_part(td::uint32 seqno) { } td::Status OverlayFecBroadcastPart::apply() { - if (!bcast_) { bcast_ = overlay_->get_fec_broadcast(broadcast_hash_); } @@ -165,16 +166,20 @@ td::Status OverlayFecBroadcastPart::apply() { return S; } } else { - if(untrusted_) { + if (untrusted_) { auto P = td::PromiseCreator::lambda( - [id = broadcast_hash_, overlay_id = actor_id(overlay_)](td::Result RR) mutable { - td::actor::send_closure(std::move(overlay_id), &OverlayImpl::broadcast_checked, id, std::move(RR)); - }); + [id = broadcast_hash_, overlay_id = actor_id(overlay_)](td::Result RR) mutable { + td::actor::send_closure(std::move(overlay_id), &OverlayImpl::broadcast_checked, id, std::move(RR)); + }); overlay_->check_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok(), std::move(P)); } else { overlay_->deliver_broadcast(bcast_->get_source().compute_short_id(), R.move_as_ok()); } } + } else { + bcast_->set_overlay(overlay_); + bcast_->set_src_peer_id(src_peer_id_); + TRY_STATUS(bcast_->add_part(seqno_, data_.clone(), export_serialized_short(), export_serialized())); } return td::Status::OK(); } @@ -304,7 +309,8 @@ td::Status OverlayFecBroadcastPart::create_new(OverlayImpl *overlay, td::actor:: auto B = std::make_unique( broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags, - part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, adnl::AdnlNodeIdShort::zero()); + part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, + adnl::AdnlNodeIdShort::zero()); auto to_sign = B->to_sign(); auto P = td::PromiseCreator::lambda( diff --git a/overlay/overlay-fec-broadcast.hpp b/overlay/overlay-fec-broadcast.hpp index 612af22f..85de648e 100644 --- a/overlay/overlay-fec-broadcast.hpp +++ b/overlay/overlay-fec-broadcast.hpp @@ -82,15 +82,15 @@ class BroadcastFec : public td::ListNode { } } - td::Status add_part(td::uint32 seqno, td::BufferSlice data, - td::BufferSlice serialized_fec_part_short, + td::Status add_part(td::uint32 seqno, td::BufferSlice data, td::BufferSlice serialized_fec_part_short, td::BufferSlice serialized_fec_part) { - CHECK(decoder_); - td::fec::Symbol s; - s.id = seqno; - s.data = std::move(data); + if (decoder_) { + td::fec::Symbol s; + s.id = seqno; + s.data = std::move(data); - decoder_->add_symbol(std::move(s)); + decoder_->add_symbol(std::move(s)); + } parts_[seqno] = std::pair(std::move(serialized_fec_part_short), std::move(serialized_fec_part)); @@ -200,8 +200,13 @@ class BroadcastFec : public td::ListNode { td::Status distribute_part(td::uint32 seqno); + bool is_checked() const { + return is_checked_; + } + private: bool ready_ = false; + bool is_checked_ = false; Overlay::BroadcastHash hash_; Overlay::BroadcastDataHash data_hash_; @@ -281,7 +286,7 @@ class OverlayFecBroadcastPart : public td::ListNode { , signature_(std::move(signature)) , is_short_(is_short) , bcast_(bcast) - , overlay_(overlay) + , overlay_(overlay) , src_peer_id_(src_peer_id) { } @@ -300,7 +305,7 @@ class OverlayFecBroadcastPart : public td::ListNode { signature_ = std::move(signature); } void update_overlay(OverlayImpl *overlay); - + tl_object_ptr export_tl(); tl_object_ptr export_tl_short(); td::BufferSlice export_serialized(); @@ -310,14 +315,16 @@ class OverlayFecBroadcastPart : public td::ListNode { td::Status run() { TRY_STATUS(run_checks()); TRY_STATUS(apply()); - if(!untrusted_) { + if (!untrusted_ || bcast_->is_checked()) { TRY_STATUS(distribute()); } return td::Status::OK(); } - static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr broadcast); - static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr broadcast); + static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, + tl_object_ptr broadcast); + static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, + tl_object_ptr broadcast); static td::Status create_new(OverlayImpl *overlay, td::actor::ActorId overlay_actor_id, PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size, td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type,