diff --git a/overlay/overlay.cpp b/overlay/overlay.cpp index 8e35e2a2..3e10daec 100644 --- a/overlay/overlay.cpp +++ b/overlay/overlay.cpp @@ -421,48 +421,50 @@ void OverlayImpl::bcast_gc() { CHECK(delivered_broadcasts_.size() == bcast_lru_.size()); } -void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) { - if (neighbours_.empty()) { - // TODO: limit retries +void OverlayImpl::wait_neighbours_not_empty(td::Promise promise, int max_retries) { + if (!neighbours_.empty()) { + promise.set_result(td::Unit()); + } else if (max_retries > 0) { delay_action( - [SelfId = actor_id(this), data = std::move(data)]() mutable { - td::actor::send_closure(SelfId, &OverlayImpl::send_message_to_neighbours, std::move(data)); + [SelfId = actor_id(this), promise = std::move(promise), max_retries]() mutable { + td::actor::send_closure(SelfId, &OverlayImpl::wait_neighbours_not_empty, std::move(promise), max_retries - 1); }, td::Timestamp::in(0.5)); - return; - } - for (auto &n : neighbours_) { - td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone()); + } else { + promise.set_error(td::Status::Error(ErrorCode::timeout)); } } +void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) { + wait_neighbours_not_empty([this, data = std::move(data)](td::Result R) { + if (R.is_error()) { + return; + } + for (auto &n : neighbours_) { + td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone()); + } + }); +} + void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) { - if (neighbours_.empty()) { - // TODO: limit retries - delay_action( - [SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable { - td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast, send_as, flags, std::move(data)); - }, - td::Timestamp::in(0.5)); - return; - } - auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags); - if (S.is_error()) { - LOG(WARNING) << "failed to send broadcast: " << S; - } + wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result R) mutable { + if (R.is_error()) { + return; + } + auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags); + if (S.is_error()) { + LOG(WARNING) << "failed to send broadcast: " << S; + } + }); } void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) { - if (neighbours_.empty()) { - // TODO: limit retries - delay_action( - [SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable { - td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast_fec, send_as, flags, std::move(data)); - }, - td::Timestamp::in(0.5)); - return; - } - OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as); + wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result R) mutable { + if (R.is_error()) { + return; + } + OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as); + }); } void OverlayImpl::print(td::StringBuilder &sb) { diff --git a/overlay/overlay.hpp b/overlay/overlay.hpp index 07003e09..f52de86d 100644 --- a/overlay/overlay.hpp +++ b/overlay/overlay.hpp @@ -266,6 +266,8 @@ class OverlayImpl : public Overlay { priority_broadcast_receivers_ = std::move(nodes); } + void wait_neighbours_not_empty(td::Promise promise, int max_retries = 10); + private: template void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise promise) {