mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-13 19:52:18 +00:00
Improve retries in overlay.cpp
This commit is contained in:
parent
9fb986f6f5
commit
c2dde00459
2 changed files with 36 additions and 32 deletions
|
@ -421,48 +421,50 @@ void OverlayImpl::bcast_gc() {
|
||||||
CHECK(delivered_broadcasts_.size() == bcast_lru_.size());
|
CHECK(delivered_broadcasts_.size() == bcast_lru_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
|
void OverlayImpl::wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries) {
|
||||||
if (neighbours_.empty()) {
|
if (!neighbours_.empty()) {
|
||||||
// TODO: limit retries
|
promise.set_result(td::Unit());
|
||||||
|
} else if (max_retries > 0) {
|
||||||
delay_action(
|
delay_action(
|
||||||
[SelfId = actor_id(this), data = std::move(data)]() mutable {
|
[SelfId = actor_id(this), promise = std::move(promise), max_retries]() mutable {
|
||||||
td::actor::send_closure(SelfId, &OverlayImpl::send_message_to_neighbours, std::move(data));
|
td::actor::send_closure(SelfId, &OverlayImpl::wait_neighbours_not_empty, std::move(promise), max_retries - 1);
|
||||||
},
|
},
|
||||||
td::Timestamp::in(0.5));
|
td::Timestamp::in(0.5));
|
||||||
|
} else {
|
||||||
|
promise.set_error(td::Status::Error(ErrorCode::timeout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
|
||||||
|
wait_neighbours_not_empty([this, data = std::move(data)](td::Result<td::Unit> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (auto &n : neighbours_) {
|
for (auto &n : neighbours_) {
|
||||||
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
|
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
|
void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
|
||||||
if (neighbours_.empty()) {
|
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
|
||||||
// TODO: limit retries
|
if (R.is_error()) {
|
||||||
delay_action(
|
|
||||||
[SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable {
|
|
||||||
td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast, send_as, flags, std::move(data));
|
|
||||||
},
|
|
||||||
td::Timestamp::in(0.5));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
|
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
|
||||||
if (S.is_error()) {
|
if (S.is_error()) {
|
||||||
LOG(WARNING) << "failed to send broadcast: " << S;
|
LOG(WARNING) << "failed to send broadcast: " << S;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
|
void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
|
||||||
if (neighbours_.empty()) {
|
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
|
||||||
// TODO: limit retries
|
if (R.is_error()) {
|
||||||
delay_action(
|
|
||||||
[SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable {
|
|
||||||
td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast_fec, send_as, flags, std::move(data));
|
|
||||||
},
|
|
||||||
td::Timestamp::in(0.5));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
|
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void OverlayImpl::print(td::StringBuilder &sb) {
|
void OverlayImpl::print(td::StringBuilder &sb) {
|
||||||
|
|
|
@ -266,6 +266,8 @@ class OverlayImpl : public Overlay {
|
||||||
priority_broadcast_receivers_ = std::move(nodes);
|
priority_broadcast_receivers_ = std::move(nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries = 10);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
template <class T>
|
template <class T>
|
||||||
void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise<td::BufferSlice> promise) {
|
void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise<td::BufferSlice> promise) {
|
||||||
|
|
Loading…
Reference in a new issue