mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	Merge pull request #1156 from ton-blockchain/safe_features
Merge safe features branch
This commit is contained in:
		
						commit
						1bef6df455
					
				
					 39 changed files with 1015 additions and 212 deletions
				
			
		|  | @ -112,16 +112,16 @@ void AdnlChannelImpl::send_message(td::uint32 priority, td::actor::ActorId<AdnlN | |||
| } | ||||
| 
 | ||||
| void AdnlChannelImpl::receive(td::IPAddress addr, td::BufferSlice data) { | ||||
|   auto P = td::PromiseCreator::lambda( | ||||
|       [peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id()](td::Result<AdnlPacket> R) { | ||||
|         if (R.is_error()) { | ||||
|           VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error(); | ||||
|         } else { | ||||
|           auto packet = R.move_as_ok(); | ||||
|           packet.set_remote_addr(addr); | ||||
|           td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet)); | ||||
|         } | ||||
|       }); | ||||
|   auto P = td::PromiseCreator::lambda([peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id(), | ||||
|                                        size = data.size()](td::Result<AdnlPacket> R) { | ||||
|     if (R.is_error()) { | ||||
|       VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error(); | ||||
|     } else { | ||||
|       auto packet = R.move_as_ok(); | ||||
|       packet.set_remote_addr(addr); | ||||
|       td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet), size); | ||||
|     } | ||||
|   }); | ||||
| 
 | ||||
|   decrypt(std::move(data), std::move(P)); | ||||
| } | ||||
|  |  | |||
|  | @ -41,20 +41,34 @@ AdnlAddressList AdnlLocalId::get_addr_list() const { | |||
| } | ||||
| 
 | ||||
| void AdnlLocalId::receive(td::IPAddress addr, td::BufferSlice data) { | ||||
|   auto P = td::PromiseCreator::lambda( | ||||
|       [peer_table = peer_table_, dst = short_id_, addr, id = print_id()](td::Result<AdnlPacket> R) { | ||||
|         if (R.is_error()) { | ||||
|           VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error(); | ||||
|         } else { | ||||
|           auto packet = R.move_as_ok(); | ||||
|           packet.set_remote_addr(addr); | ||||
|           td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet)); | ||||
|         } | ||||
|       }); | ||||
| 
 | ||||
|   InboundRateLimiter& rate_limiter = inbound_rate_limiter_[addr]; | ||||
|   if (!rate_limiter.rate_limiter.take()) { | ||||
|     VLOG(ADNL_NOTICE) << this << ": dropping IN message: rate limit exceeded"; | ||||
|     add_dropped_packet_stats(addr); | ||||
|     return; | ||||
|   } | ||||
|   ++rate_limiter.currently_decrypting_packets; | ||||
|   auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), peer_table = peer_table_, dst = short_id_, addr, | ||||
|                                        id = print_id(), size = data.size()](td::Result<AdnlPacket> R) { | ||||
|     td::actor::send_closure(SelfId, &AdnlLocalId::decrypt_packet_done, addr); | ||||
|     if (R.is_error()) { | ||||
|       VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error(); | ||||
|     } else { | ||||
|       auto packet = R.move_as_ok(); | ||||
|       packet.set_remote_addr(addr); | ||||
|       td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet), size); | ||||
|     } | ||||
|   }); | ||||
|   decrypt(std::move(data), std::move(P)); | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::decrypt_packet_done(td::IPAddress addr) { | ||||
|   auto it = inbound_rate_limiter_.find(addr); | ||||
|   CHECK(it != inbound_rate_limiter_.end()); | ||||
|   --it->second.currently_decrypting_packets; | ||||
|   add_decrypted_packet_stats(addr); | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::deliver(AdnlNodeIdShort src, td::BufferSlice data) { | ||||
|   auto s = std::move(data); | ||||
|   for (auto &cb : cb_) { | ||||
|  | @ -292,6 +306,67 @@ void AdnlLocalId::update_packet(AdnlPacket packet, bool update_id, bool sign, td | |||
|   } | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise) { | ||||
|   auto stats = create_tl_object<ton_api::adnl_stats_localId>(); | ||||
|   stats->short_id_ = short_id_.bits256_value(); | ||||
|   for (auto &[ip, x] : inbound_rate_limiter_) { | ||||
|     if (x.currently_decrypting_packets != 0) { | ||||
|       stats->current_decrypt_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>( | ||||
|           ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", x.currently_decrypting_packets)); | ||||
|     } | ||||
|   } | ||||
|   prepare_packet_stats(); | ||||
|   stats->packets_recent_ = packet_stats_prev_.tl(); | ||||
|   stats->packets_total_ = packet_stats_total_.tl(); | ||||
|   stats->packets_total_->ts_start_ = (double)Adnl::adnl_start_time(); | ||||
|   stats->packets_total_->ts_end_ = td::Clocks::system(); | ||||
|   promise.set_result(std::move(stats)); | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::add_decrypted_packet_stats(td::IPAddress addr) { | ||||
|   prepare_packet_stats(); | ||||
|   ++packet_stats_cur_.decrypted_packets[addr]; | ||||
|   ++packet_stats_total_.decrypted_packets[addr]; | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::add_dropped_packet_stats(td::IPAddress addr) { | ||||
|   prepare_packet_stats(); | ||||
|   ++packet_stats_cur_.dropped_packets[addr]; | ||||
|   ++packet_stats_total_.dropped_packets[addr]; | ||||
| } | ||||
| 
 | ||||
| void AdnlLocalId::prepare_packet_stats() { | ||||
|   double now = td::Clocks::system(); | ||||
|   if (now >= packet_stats_cur_.ts_end) { | ||||
|     packet_stats_prev_ = std::move(packet_stats_cur_); | ||||
|     packet_stats_cur_ = {}; | ||||
|     auto now_int = (int)td::Clocks::system(); | ||||
|     packet_stats_cur_.ts_start = (double)(now_int / 60 * 60); | ||||
|     packet_stats_cur_.ts_end = packet_stats_cur_.ts_start + 60.0; | ||||
|     if (packet_stats_prev_.ts_end < now - 60.0) { | ||||
|       packet_stats_prev_ = {}; | ||||
|       packet_stats_prev_.ts_end = packet_stats_cur_.ts_start; | ||||
|       packet_stats_prev_.ts_start = packet_stats_prev_.ts_end - 60.0; | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| tl_object_ptr<ton_api::adnl_stats_localIdPackets> AdnlLocalId::PacketStats::tl() const { | ||||
|   auto obj = create_tl_object<ton_api::adnl_stats_localIdPackets>(); | ||||
|   obj->ts_start_ = ts_start; | ||||
|   obj->ts_end_ = ts_end; | ||||
|   for (const auto &[ip, packets] : decrypted_packets) { | ||||
|     obj->decrypted_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>( | ||||
|         ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets)); | ||||
|   } | ||||
|   for (const auto &[ip, packets] : dropped_packets) { | ||||
|     obj->dropped_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>( | ||||
|         ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets)); | ||||
|   } | ||||
|   return obj; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| }  // namespace adnl
 | ||||
| 
 | ||||
| }  // namespace ton
 | ||||
|  |  | |||
|  | @ -55,6 +55,7 @@ class AdnlLocalId : public td::actor::Actor { | |||
|   void deliver(AdnlNodeIdShort src, td::BufferSlice data); | ||||
|   void deliver_query(AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise); | ||||
|   void receive(td::IPAddress addr, td::BufferSlice data); | ||||
|   void decrypt_packet_done(td::IPAddress addr); | ||||
| 
 | ||||
|   void subscribe(std::string prefix, std::unique_ptr<AdnlPeerTable::Callback> callback); | ||||
|   void unsubscribe(std::string prefix); | ||||
|  | @ -77,6 +78,8 @@ class AdnlLocalId : public td::actor::Actor { | |||
|   void update_packet(AdnlPacket packet, bool update_id, bool sign, td::int32 update_addr_list_if, | ||||
|                      td::int32 update_priority_addr_list_if, td::Promise<AdnlPacket> promise); | ||||
| 
 | ||||
|   void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise); | ||||
| 
 | ||||
|   td::uint32 get_mode() { | ||||
|     return mode_; | ||||
|   } | ||||
|  | @ -101,6 +104,22 @@ class AdnlLocalId : public td::actor::Actor { | |||
| 
 | ||||
|   td::uint32 mode_; | ||||
| 
 | ||||
|   struct InboundRateLimiter { | ||||
|     RateLimiter rate_limiter = RateLimiter(75, 0.33); | ||||
|     td::uint64 currently_decrypting_packets = 0; | ||||
|   }; | ||||
|   std::map<td::IPAddress, InboundRateLimiter> inbound_rate_limiter_; | ||||
|   struct PacketStats { | ||||
|     double ts_start = 0.0, ts_end = 0.0; | ||||
|     std::map<td::IPAddress, td::uint64> decrypted_packets; | ||||
|     std::map<td::IPAddress, td::uint64> dropped_packets; | ||||
| 
 | ||||
|     tl_object_ptr<ton_api::adnl_stats_localIdPackets> tl() const; | ||||
|   } packet_stats_cur_, packet_stats_prev_, packet_stats_total_; | ||||
|   void add_decrypted_packet_stats(td::IPAddress addr); | ||||
|   void add_dropped_packet_stats(td::IPAddress addr); | ||||
|   void prepare_packet_stats(); | ||||
| 
 | ||||
|   void publish_address_list(); | ||||
| }; | ||||
| 
 | ||||
|  |  | |||
|  | @ -84,7 +84,7 @@ void AdnlPeerTableImpl::receive_packet(td::IPAddress addr, AdnlCategoryMask cat_ | |||
|                    << " (len=" << (data.size() + 32) << ")"; | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) { | ||||
| void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) { | ||||
|   packet.run_basic_checks().ensure(); | ||||
| 
 | ||||
|   if (!packet.inited_from_short()) { | ||||
|  | @ -119,7 +119,7 @@ void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket | |||
|     return; | ||||
|   } | ||||
|   td::actor::send_closure(it->second, &AdnlPeer::receive_packet, dst, it2->second.mode, it2->second.local_id.get(), | ||||
|                           std::move(packet)); | ||||
|                           std::move(packet), serialized_size); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerTableImpl::add_peer(AdnlNodeIdShort local_id, AdnlNodeIdFull id, AdnlAddressList addr_list) { | ||||
|  | @ -385,6 +385,88 @@ void AdnlPeerTableImpl::get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_ | |||
|   td::actor::send_closure(it->second, &AdnlPeer::get_conn_ip_str, l_id, std::move(promise)); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) { | ||||
|   class Cb : public td::actor::Actor { | ||||
|    public: | ||||
|     explicit Cb(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) : promise_(std::move(promise)) { | ||||
|     } | ||||
| 
 | ||||
|     void got_local_id_stats(tl_object_ptr<ton_api::adnl_stats_localId> local_id) { | ||||
|       auto &local_id_stats = local_id_stats_[local_id->short_id_]; | ||||
|       if (local_id_stats) { | ||||
|         local_id->peers_ = std::move(local_id_stats->peers_); | ||||
|       } | ||||
|       local_id_stats = std::move(local_id); | ||||
|       dec_pending(); | ||||
|     } | ||||
| 
 | ||||
|     void got_peer_stats(std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>> peer_pairs) { | ||||
|       for (auto &peer_pair : peer_pairs) { | ||||
|         auto &local_id_stats = local_id_stats_[peer_pair->local_id_]; | ||||
|         if (local_id_stats == nullptr) { | ||||
|           local_id_stats = create_tl_object<ton_api::adnl_stats_localId>(); | ||||
|           local_id_stats->short_id_ = peer_pair->local_id_; | ||||
|         } | ||||
|         local_id_stats->peers_.push_back(std::move(peer_pair)); | ||||
|       } | ||||
|       dec_pending(); | ||||
|     } | ||||
| 
 | ||||
|     void inc_pending() { | ||||
|       ++pending_; | ||||
|     } | ||||
| 
 | ||||
|     void dec_pending() { | ||||
|       CHECK(pending_ > 0); | ||||
|       --pending_; | ||||
|       if (pending_ == 0) { | ||||
|         auto stats = create_tl_object<ton_api::adnl_stats>(); | ||||
|         stats->timestamp_ = td::Clocks::system(); | ||||
|         for (auto &[id, local_id_stats] : local_id_stats_) { | ||||
|           stats->local_ids_.push_back(std::move(local_id_stats)); | ||||
|         } | ||||
|         promise_.set_result(std::move(stats)); | ||||
|         stop(); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|    private: | ||||
|     td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise_; | ||||
|     size_t pending_ = 1; | ||||
| 
 | ||||
|     std::map<td::Bits256, tl_object_ptr<ton_api::adnl_stats_localId>> local_id_stats_; | ||||
|   }; | ||||
|   auto callback = td::actor::create_actor<Cb>("adnlstats", std::move(promise)).release(); | ||||
| 
 | ||||
|   for (auto &[id, local_id] : local_ids_) { | ||||
|     td::actor::send_closure(callback, &Cb::inc_pending); | ||||
|     td::actor::send_closure(local_id.local_id, &AdnlLocalId::get_stats, | ||||
|                             [id = id, callback](td::Result<tl_object_ptr<ton_api::adnl_stats_localId>> R) { | ||||
|                               if (R.is_error()) { | ||||
|                                 VLOG(ADNL_NOTICE) | ||||
|                                     << "failed to get stats for local id " << id << " : " << R.move_as_error(); | ||||
|                                 td::actor::send_closure(callback, &Cb::dec_pending); | ||||
|                               } else { | ||||
|                                 td::actor::send_closure(callback, &Cb::got_local_id_stats, R.move_as_ok()); | ||||
|                               } | ||||
|                             }); | ||||
|   } | ||||
|   for (auto &[id, peer] : peers_) { | ||||
|     td::actor::send_closure(callback, &Cb::inc_pending); | ||||
|     td::actor::send_closure( | ||||
|         peer, &AdnlPeer::get_stats, | ||||
|         [id = id, callback](td::Result<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> R) { | ||||
|           if (R.is_error()) { | ||||
|             VLOG(ADNL_NOTICE) << "failed to get stats for peer " << id << " : " << R.move_as_error(); | ||||
|             td::actor::send_closure(callback, &Cb::dec_pending); | ||||
|           } else { | ||||
|             td::actor::send_closure(callback, &Cb::got_peer_stats, R.move_as_ok()); | ||||
|           } | ||||
|         }); | ||||
|   } | ||||
|   td::actor::send_closure(callback, &Cb::dec_pending); | ||||
| } | ||||
| 
 | ||||
| }  // namespace adnl
 | ||||
| 
 | ||||
| }  // namespace ton
 | ||||
|  |  | |||
|  | @ -90,7 +90,7 @@ class AdnlPeerTable : public Adnl { | |||
|   virtual void answer_query(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlQueryId query_id, td::BufferSlice data) = 0; | ||||
| 
 | ||||
|   virtual void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) = 0; | ||||
|   virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) = 0; | ||||
|   virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) = 0; | ||||
|   virtual void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) = 0; | ||||
| 
 | ||||
|   virtual void register_channel(AdnlChannelIdShort id, AdnlNodeIdShort local_id, | ||||
|  |  | |||
|  | @ -44,7 +44,7 @@ class AdnlPeerTableImpl : public AdnlPeerTable { | |||
|   void add_static_nodes_from_config(AdnlNodesList nodes) override; | ||||
| 
 | ||||
|   void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) override; | ||||
|   void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data) override; | ||||
|   void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data, td::uint64 serialized_size) override; | ||||
|   void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) override; | ||||
|   void send_message(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::BufferSlice data) override { | ||||
|     send_message_ex(src, dst, std::move(data), 0); | ||||
|  | @ -108,6 +108,8 @@ class AdnlPeerTableImpl : public AdnlPeerTable { | |||
|                      td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override; | ||||
|   void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) override; | ||||
| 
 | ||||
|   void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) override; | ||||
| 
 | ||||
|   struct PrintId {}; | ||||
|   PrintId print_id() const { | ||||
|     return PrintId{}; | ||||
|  |  | |||
|  | @ -26,6 +26,7 @@ | |||
| #include "td/utils/base64.h" | ||||
| #include "td/utils/Random.h" | ||||
| #include "auto/tl/ton_api.h" | ||||
| #include "td/utils/overloaded.h" | ||||
| 
 | ||||
| namespace ton { | ||||
| 
 | ||||
|  | @ -50,9 +51,13 @@ void AdnlPeerPairImpl::start_up() { | |||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::alarm() { | ||||
|   if (next_dht_query_at_ && next_dht_query_at_.is_in_past()) { | ||||
|     next_dht_query_at_ = td::Timestamp::never(); | ||||
|     discover(); | ||||
|   if (!disable_dht_query_) { | ||||
|     disable_dht_query_ = true; | ||||
|     if (next_dht_query_at_ && next_dht_query_at_.is_in_past()) { | ||||
|       next_dht_query_at_ = td::Timestamp::never(); | ||||
|       discover(); | ||||
|     } | ||||
|     alarm_timestamp().relax(next_dht_query_at_); | ||||
|   } | ||||
|   if (next_db_update_at_ && next_db_update_at_.is_in_past()) { | ||||
|     if (received_from_db_ && received_from_static_nodes_ && !peer_id_.empty()) { | ||||
|  | @ -68,11 +73,8 @@ void AdnlPeerPairImpl::alarm() { | |||
|   } | ||||
|   if (retry_send_at_ && retry_send_at_.is_in_past()) { | ||||
|     retry_send_at_ = td::Timestamp::never(); | ||||
|     auto messages = std::move(pending_messages_); | ||||
|     pending_messages_.clear(); | ||||
|     send_messages_in(std::move(messages), false); | ||||
|     send_messages_from_queue(); | ||||
|   } | ||||
|   alarm_timestamp().relax(next_dht_query_at_); | ||||
|   alarm_timestamp().relax(next_db_update_at_); | ||||
|   alarm_timestamp().relax(retry_send_at_); | ||||
| } | ||||
|  | @ -207,18 +209,24 @@ void AdnlPeerPairImpl::receive_packet_checked(AdnlPacket packet) { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet) { | ||||
| void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet, | ||||
|                                                    td::uint64 serialized_size) { | ||||
|   add_packet_stats(serialized_size, /* in = */ true, /* channel = */ true); | ||||
|   if (id != channel_in_id_) { | ||||
|     VLOG(ADNL_NOTICE) << this << ": dropping IN message: outdated channel id" << id; | ||||
|     return; | ||||
|   } | ||||
|   if (channel_inited_) { | ||||
|   if (channel_inited_ && !channel_ready_) { | ||||
|     channel_ready_ = true; | ||||
|     if (!out_messages_queue_.empty()) { | ||||
|       td::actor::send_closure(actor_id(this), &AdnlPeerPairImpl::send_messages_from_queue); | ||||
|     } | ||||
|   } | ||||
|   receive_packet_checked(std::move(packet)); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::receive_packet(AdnlPacket packet) { | ||||
| void AdnlPeerPairImpl::receive_packet(AdnlPacket packet, td::uint64 serialized_size) { | ||||
|   add_packet_stats(serialized_size, /* in = */ true, /* channel = */ false); | ||||
|   packet.run_basic_checks().ensure(); | ||||
| 
 | ||||
|   if (!encryptor_) { | ||||
|  | @ -239,132 +247,132 @@ void AdnlPeerPairImpl::deliver_message(AdnlMessage message) { | |||
|   message.visit([&](const auto &obj) { this->process_message(obj); }); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::send_messages_in(std::vector<OutboundAdnlMessage> messages, bool allow_postpone) { | ||||
|   for (td::int32 idx = 0; idx < 2; idx++) { | ||||
|     std::vector<OutboundAdnlMessage> not_sent; | ||||
| void AdnlPeerPairImpl::send_messages_from_queue() { | ||||
|   while (!out_messages_queue_.empty() && out_messages_queue_.front().second.is_in_past()) { | ||||
|     out_messages_queue_total_size_ -= out_messages_queue_.front().first.size(); | ||||
|     add_expired_msg_stats(out_messages_queue_.front().first.size()); | ||||
|     out_messages_queue_.pop(); | ||||
|     VLOG(ADNL_NOTICE) << this << ": dropping OUT message: message in queue expired"; | ||||
|   } | ||||
|   if (out_messages_queue_.empty()) { | ||||
|     return; | ||||
|   } | ||||
| 
 | ||||
|     auto connR = get_conn(idx == 1); | ||||
|     if (connR.is_error()) { | ||||
|       if (!allow_postpone) { | ||||
|         VLOG(ADNL_NOTICE) << this << ": dropping OUT messages: cannot get conn: " << connR.move_as_error(); | ||||
|         return; | ||||
|       } | ||||
|       VLOG(ADNL_INFO) << this << ": delaying OUT messages: cannot get conn: " << connR.move_as_error(); | ||||
|       if (!retry_send_at_) { | ||||
|         retry_send_at_.relax(td::Timestamp::in(10.0)); | ||||
|         alarm_timestamp().relax(retry_send_at_); | ||||
|       } | ||||
|       for (auto &m : messages) { | ||||
|         pending_messages_.push_back(std::move(m)); | ||||
|       } | ||||
|   auto connR = get_conn(); | ||||
|   if (connR.is_error()) { | ||||
|     disable_dht_query_ = false; | ||||
|     retry_send_at_.relax(td::Timestamp::in(message_in_queue_ttl_ - 1.0)); | ||||
|     alarm_timestamp().relax(retry_send_at_); | ||||
|     VLOG(ADNL_INFO) << this << ": delaying OUT messages: cannot get conn: " << connR.move_as_error(); | ||||
|     return; | ||||
|   } | ||||
|   disable_dht_query_ = true; | ||||
|   auto C = connR.move_as_ok(); | ||||
|   auto conn = std::move(C.first); | ||||
|   bool is_direct = C.second; | ||||
| 
 | ||||
|   bool first = !skip_init_packet_; | ||||
|   while (!out_messages_queue_.empty()) { | ||||
|     bool try_reinit = try_reinit_at_ && try_reinit_at_.is_in_past(); | ||||
|     bool via_channel = channel_ready_ && !try_reinit; | ||||
|     if (!via_channel && !nochannel_rate_limiter_.take()) { | ||||
|       alarm_timestamp().relax(retry_send_at_ = nochannel_rate_limiter_.ready_at()); | ||||
|       return; | ||||
|     } | ||||
|     auto C = connR.move_as_ok(); | ||||
|     bool is_direct = C.second; | ||||
|     auto conn = std::move(C.first); | ||||
|     if (idx == 1) { | ||||
|       CHECK(is_direct); | ||||
|     if (try_reinit) { | ||||
|       try_reinit_at_ = td::Timestamp::in(td::Random::fast(0.5, 1.5)); | ||||
|     } | ||||
|     respond_with_nop_after_ = td::Timestamp::in(td::Random::fast(1.0, 2.0)); | ||||
| 
 | ||||
|     size_t s = (via_channel ? channel_packet_header_max_size() : packet_header_max_size()); | ||||
|     if (first) { | ||||
|       s += 2 * addr_list_max_size(); | ||||
|     } | ||||
| 
 | ||||
|     size_t ptr = 0; | ||||
|     bool first = true; | ||||
|     do { | ||||
|       respond_with_nop_after_ = td::Timestamp::in(td::Random::fast(1.0, 2.0)); | ||||
|       bool try_reinit = try_reinit_at_ && try_reinit_at_.is_in_past(); | ||||
|       if (try_reinit) { | ||||
|         try_reinit_at_ = td::Timestamp::in(td::Random::fast(0.5, 1.5)); | ||||
|       } | ||||
|       bool via_channel = channel_ready_ && !try_reinit; | ||||
|       size_t s = (via_channel ? channel_packet_header_max_size() : packet_header_max_size()); | ||||
|       if (first) { | ||||
|         s += 2 * addr_list_max_size(); | ||||
|       } | ||||
|     AdnlPacket packet; | ||||
|     packet.set_seqno(++out_seqno_); | ||||
|     packet.set_confirm_seqno(in_seqno_); | ||||
| 
 | ||||
|       AdnlPacket packet; | ||||
|       packet.set_seqno(++out_seqno_); | ||||
|       packet.set_confirm_seqno(in_seqno_); | ||||
| 
 | ||||
|       if (first) { | ||||
|         if (!channel_inited_) { | ||||
|           auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_}; | ||||
|           s += M.size(); | ||||
|           packet.add_message(std::move(M)); | ||||
|         } else if (!channel_ready_) { | ||||
|           auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_}; | ||||
|           s += M.size(); | ||||
|           packet.add_message(std::move(M)); | ||||
|         } | ||||
|     if (first) { | ||||
|       if (!channel_inited_) { | ||||
|         auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_}; | ||||
|         s += M.size(); | ||||
|         packet.add_message(std::move(M)); | ||||
|       } else if (!channel_ready_) { | ||||
|         auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_}; | ||||
|         s += M.size(); | ||||
|         packet.add_message(std::move(M)); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|       if (!addr_list_.empty()) { | ||||
|         packet.set_received_addr_list_version(addr_list_.version()); | ||||
|       } | ||||
|       if (!priority_addr_list_.empty()) { | ||||
|         packet.set_received_priority_addr_list_version(priority_addr_list_.version()); | ||||
|       } | ||||
|     if (!addr_list_.empty()) { | ||||
|       packet.set_received_addr_list_version(addr_list_.version()); | ||||
|     } | ||||
|     if (!priority_addr_list_.empty()) { | ||||
|       packet.set_received_priority_addr_list_version(priority_addr_list_.version()); | ||||
|     } | ||||
| 
 | ||||
|       while (ptr < messages.size()) { | ||||
|         auto &M = messages[ptr]; | ||||
|         if (!is_direct && (M.flags() & Adnl::SendFlags::direct_only)) { | ||||
|           not_sent.push_back(std::move(M)); | ||||
|           continue; | ||||
|         } | ||||
|         CHECK(M.size() <= get_mtu()); | ||||
|     skip_init_packet_ = true; | ||||
|     while (!out_messages_queue_.empty()) { | ||||
|       auto &M = out_messages_queue_.front().first; | ||||
|       if (!is_direct && (M.flags() & Adnl::SendFlags::direct_only)) { | ||||
|         out_messages_queue_total_size_ -= M.size(); | ||||
|         out_messages_queue_.pop(); | ||||
|         continue; | ||||
|       } | ||||
|       CHECK(M.size() <= get_mtu()); | ||||
|       if (s + M.size() <= AdnlNetworkManager::get_mtu()) { | ||||
|         s += M.size(); | ||||
|         out_messages_queue_total_size_ -= M.size(); | ||||
|         packet.add_message(M.release()); | ||||
|         out_messages_queue_.pop(); | ||||
|         skip_init_packet_ = false; | ||||
|       } else { | ||||
|         break; | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (!via_channel) { | ||||
|       packet.set_reinit_date(Adnl::adnl_start_time(), reinit_date_); | ||||
|       packet.set_source(local_id_); | ||||
|     } | ||||
| 
 | ||||
|     if (!first) { | ||||
|       if (!channel_inited_) { | ||||
|         auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_}; | ||||
|         if (s + M.size() <= AdnlNetworkManager::get_mtu()) { | ||||
|           s += M.size(); | ||||
|           packet.add_message(M.release()); | ||||
|           ptr++; | ||||
|         } else { | ||||
|           break; | ||||
|           packet.add_message(std::move(M)); | ||||
|         } | ||||
|       } else if (!channel_ready_) { | ||||
|         auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_}; | ||||
|         if (s + M.size() <= AdnlNetworkManager::get_mtu()) { | ||||
|           s += M.size(); | ||||
|           packet.add_message(std::move(M)); | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       if (!via_channel) { | ||||
|         packet.set_reinit_date(Adnl::adnl_start_time(), reinit_date_); | ||||
|         packet.set_source(local_id_); | ||||
|       } | ||||
| 
 | ||||
|       if (!first) { | ||||
|         if (!channel_inited_) { | ||||
|           auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_}; | ||||
|           if (s + M.size() <= AdnlNetworkManager::get_mtu()) { | ||||
|             s += M.size(); | ||||
|             packet.add_message(std::move(M)); | ||||
|           } | ||||
|         } else if (!channel_ready_) { | ||||
|           auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_}; | ||||
|           if (s + M.size() <= AdnlNetworkManager::get_mtu()) { | ||||
|             s += M.size(); | ||||
|             packet.add_message(std::move(M)); | ||||
|           } | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       packet.run_basic_checks().ensure(); | ||||
|       auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), conn, id = print_id(), | ||||
|                                            via_channel](td::Result<AdnlPacket> res) { | ||||
|         if (res.is_error()) { | ||||
|           LOG(ERROR) << id << ": dropping OUT message: error while creating packet: " << res.move_as_error(); | ||||
|         } else { | ||||
|           td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_packet_continue, res.move_as_ok(), conn, via_channel); | ||||
|         } | ||||
|       }); | ||||
| 
 | ||||
|       td::actor::send_closure(local_actor_, &AdnlLocalId::update_packet, std::move(packet), | ||||
|                               (!channel_ready_ && ack_seqno_ == 0 && in_seqno_ == 0) || try_reinit, !via_channel, | ||||
|                               (first || s + addr_list_max_size() <= AdnlNetworkManager::get_mtu()) | ||||
|                                   ? (try_reinit ? 0 : peer_recv_addr_list_version_) | ||||
|                                   : 0x7fffffff, | ||||
|                               (first || s + 2 * addr_list_max_size() <= AdnlNetworkManager::get_mtu()) | ||||
|                                   ? peer_recv_priority_addr_list_version_ | ||||
|                                   : 0x7fffffff, | ||||
|                               std::move(P)); | ||||
|       first = false; | ||||
|     } while (ptr < messages.size()); | ||||
|     messages = std::move(not_sent); | ||||
|     if (!messages.size()) { | ||||
|       break; | ||||
|     } | ||||
| 
 | ||||
|     packet.run_basic_checks().ensure(); | ||||
|     auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), conn, id = print_id(), | ||||
|                                          via_channel](td::Result<AdnlPacket> res) { | ||||
|       if (res.is_error()) { | ||||
|         LOG(ERROR) << id << ": dropping OUT message: error while creating packet: " << res.move_as_error(); | ||||
|       } else { | ||||
|         td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_packet_continue, res.move_as_ok(), conn, via_channel); | ||||
|       } | ||||
|     }); | ||||
| 
 | ||||
|     td::actor::send_closure(local_actor_, &AdnlLocalId::update_packet, std::move(packet), | ||||
|                             (!channel_ready_ && ack_seqno_ == 0 && in_seqno_ == 0) || try_reinit, !via_channel, | ||||
|                             (first || s + addr_list_max_size() <= AdnlNetworkManager::get_mtu()) | ||||
|                                 ? (try_reinit ? 0 : peer_recv_addr_list_version_) | ||||
|                                 : 0x7fffffff, | ||||
|                             (first || s + 2 * addr_list_max_size() <= AdnlNetworkManager::get_mtu()) | ||||
|                                 ? peer_recv_priority_addr_list_version_ | ||||
|                                 : 0x7fffffff, | ||||
|                             std::move(P)); | ||||
|     first = false; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
|  | @ -395,7 +403,11 @@ void AdnlPeerPairImpl::send_messages(std::vector<OutboundAdnlMessage> messages) | |||
|       } | ||||
|     } | ||||
|   } | ||||
|   send_messages_in(std::move(new_vec), true); | ||||
|   for (auto &m : new_vec) { | ||||
|     out_messages_queue_total_size_ += m.size(); | ||||
|     out_messages_queue_.emplace(std::move(m), td::Timestamp::in(message_in_queue_ttl_)); | ||||
|   } | ||||
|   send_messages_from_queue(); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorId<AdnlNetworkConnection> conn, | ||||
|  | @ -407,6 +419,7 @@ void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorI | |||
|   auto B = serialize_tl_object(packet.tl(), true); | ||||
|   if (via_channel) { | ||||
|     if (channel_ready_) { | ||||
|       add_packet_stats(B.size(), /* in = */ false, /* channel = */ true); | ||||
|       td::actor::send_closure(channel_, &AdnlChannel::send_message, priority_, conn, std::move(B)); | ||||
|     } else { | ||||
|       VLOG(ADNL_WARNING) << this << ": dropping OUT message [" << local_id_ << "->" << peer_id_short_ | ||||
|  | @ -434,6 +447,7 @@ void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorI | |||
|   S.remove_prefix(32); | ||||
|   S.copy_from(X.as_slice()); | ||||
| 
 | ||||
|   add_packet_stats(B.size(), /* in = */ false, /* channel = */ false); | ||||
|   td::actor::send_closure(conn, &AdnlNetworkConnection::send, local_id_, peer_id_short_, priority_, std::move(enc)); | ||||
| } | ||||
| 
 | ||||
|  | @ -520,7 +534,10 @@ void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageConfirmChan | |||
|     VLOG(ADNL_NOTICE) << this << ": received adnl.message.confirmChannel with old key"; | ||||
|     return; | ||||
|   } | ||||
|   channel_ready_ = true; | ||||
|   if (!channel_ready_) { | ||||
|     channel_ready_ = true; | ||||
|     send_messages_from_queue(); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageCustom &message) { | ||||
|  | @ -674,7 +691,7 @@ void AdnlPeerPairImpl::reinit(td::int32 date) { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerPairImpl::get_conn(bool direct_only) { | ||||
| td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerPairImpl::get_conn() { | ||||
|   if (!priority_addr_list_.empty() && priority_addr_list_.expire_at() < td::Clocks::system()) { | ||||
|     priority_addr_list_ = AdnlAddressList{}; | ||||
|     priority_conns_.clear(); | ||||
|  | @ -692,14 +709,18 @@ td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerP | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   for (auto &conn : priority_conns_) { | ||||
|     if (conn.ready() && (!direct_only || conn.is_direct())) { | ||||
|       return std::make_pair(conn.conn.get(), conn.is_direct()); | ||||
|   for (int direct_only = 1; direct_only >= 0; --direct_only) { | ||||
|     for (auto &conn : priority_conns_) { | ||||
|       if (conn.ready() && (!direct_only || conn.is_direct())) { | ||||
|         return std::make_pair(conn.conn.get(), conn.is_direct()); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   for (auto &conn : conns_) { | ||||
|     if (conn.ready() && (!direct_only || conn.is_direct())) { | ||||
|       return std::make_pair(conn.conn.get(), conn.is_direct()); | ||||
|   for (int direct_only = 1; direct_only >= 0; --direct_only) { | ||||
|     for (auto &conn : conns_) { | ||||
|       if (conn.ready() && (!direct_only || conn.is_direct())) { | ||||
|         return std::make_pair(conn.conn.get(), conn.is_direct()); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   return td::Status::Error(ErrorCode::notready, "no active connections"); | ||||
|  | @ -787,6 +808,47 @@ void AdnlPeerPairImpl::get_conn_ip_str(td::Promise<td::string> promise) { | |||
|   promise.set_value("undefined"); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) { | ||||
|   auto stats = create_tl_object<ton_api::adnl_stats_peerPair>(); | ||||
|   stats->local_id_ = local_id_.bits256_value(); | ||||
|   stats->peer_id_ = peer_id_short_.bits256_value(); | ||||
|   for (const AdnlAddress &addr : addr_list_.addrs()) { | ||||
|     ton_api::downcast_call(*addr->tl(), td::overloaded( | ||||
|                                             [&](const ton_api::adnl_address_udp &obj) { | ||||
|                                               stats->ip_str_ = PSTRING() << td::IPAddress::ipv4_to_str(obj.ip_) << ":" | ||||
|                                                                          << obj.port_; | ||||
|                                             }, | ||||
|                                             [&](const auto &) {})); | ||||
|     if (!stats->ip_str_.empty()) { | ||||
|       break; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   prepare_packet_stats(); | ||||
|   stats->last_in_packet_ts_ = last_in_packet_ts_; | ||||
|   stats->last_out_packet_ts_ = last_out_packet_ts_; | ||||
|   stats->packets_total_ = packet_stats_total_.tl(); | ||||
|   stats->packets_total_->ts_start_ = started_ts_; | ||||
|   stats->packets_total_->ts_end_ = td::Clocks::system(); | ||||
|   stats->packets_recent_ = packet_stats_prev_.tl(); | ||||
| 
 | ||||
|   if (channel_ready_) { | ||||
|     stats->channel_status_ = 2; | ||||
|   } else if (channel_inited_) { | ||||
|     stats->channel_status_ = 1; | ||||
|   } else { | ||||
|     stats->channel_status_ = 0; | ||||
|   } | ||||
|   stats->try_reinit_at_ = (try_reinit_at_ ? try_reinit_at_.at_unix() : 0.0); | ||||
|   stats->connection_ready_ = | ||||
|       std::any_of(conns_.begin(), conns_.end(), [](const Conn &conn) { return conn.ready(); }) || | ||||
|       std::any_of(priority_conns_.begin(), priority_conns_.end(), [](const Conn &conn) { return conn.ready(); }); | ||||
|   stats->out_queue_messages_ = out_messages_queue_.size(); | ||||
|   stats->out_queue_bytes_ = out_messages_queue_total_size_; | ||||
| 
 | ||||
|   promise.set_result(std::move(stats)); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerImpl::update_id(AdnlNodeIdFull id) { | ||||
|   CHECK(id.compute_short_id() == peer_id_short_); | ||||
|   if (!peer_id_.empty()) { | ||||
|  | @ -810,10 +872,8 @@ void AdnlPeerPairImpl::Conn::create_conn(td::actor::ActorId<AdnlPeerPairImpl> pe | |||
| 
 | ||||
| void AdnlPeerPairImpl::conn_change_state(AdnlConnectionIdShort id, bool ready) { | ||||
|   if (ready) { | ||||
|     if (pending_messages_.size() > 0) { | ||||
|       auto messages = std::move(pending_messages_); | ||||
|       pending_messages_.clear(); | ||||
|       send_messages_in(std::move(messages), true); | ||||
|     if (out_messages_queue_.empty()) { | ||||
|       send_messages_from_queue(); | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | @ -835,7 +895,7 @@ td::actor::ActorOwn<AdnlPeer> AdnlPeer::create(td::actor::ActorId<AdnlNetworkMan | |||
| } | ||||
| 
 | ||||
| void AdnlPeerImpl::receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td::actor::ActorId<AdnlLocalId> dst_actor, | ||||
|                                   AdnlPacket packet) { | ||||
|                                   AdnlPacket packet, td::uint64 serialized_size) { | ||||
|   if (packet.inited_from()) { | ||||
|     update_id(packet.from()); | ||||
|   } | ||||
|  | @ -853,7 +913,7 @@ void AdnlPeerImpl::receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td:: | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   td::actor::send_closure(it->second.get(), &AdnlPeerPair::receive_packet, std::move(packet)); | ||||
|   td::actor::send_closure(it->second.get(), &AdnlPeerPair::receive_packet, std::move(packet), serialized_size); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerImpl::send_messages(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor, | ||||
|  | @ -933,6 +993,56 @@ void AdnlPeerImpl::update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_m | |||
|   td::actor::send_closure(it->second, &AdnlPeerPair::update_addr_list, std::move(addr_list)); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerImpl::get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) { | ||||
|   class Cb : public td::actor::Actor { | ||||
|    public: | ||||
|     explicit Cb(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) | ||||
|         : promise_(std::move(promise)) { | ||||
|     } | ||||
| 
 | ||||
|     void got_peer_pair_stats(tl_object_ptr<ton_api::adnl_stats_peerPair> peer_pair) { | ||||
|       result_.push_back(std::move(peer_pair)); | ||||
|       dec_pending(); | ||||
|     } | ||||
| 
 | ||||
|     void inc_pending() { | ||||
|       ++pending_; | ||||
|     } | ||||
| 
 | ||||
|     void dec_pending() { | ||||
|       CHECK(pending_ > 0); | ||||
|       --pending_; | ||||
|       if (pending_ == 0) { | ||||
|         promise_.set_result(std::move(result_)); | ||||
|         stop(); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|    private: | ||||
|     td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise_; | ||||
|     size_t pending_ = 1; | ||||
|     std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>> result_; | ||||
|   }; | ||||
|   auto callback = td::actor::create_actor<Cb>("adnlpeerstats", std::move(promise)).release(); | ||||
| 
 | ||||
|   for (auto &[local_id, peer_pair] : peer_pairs_) { | ||||
|     td::actor::send_closure(callback, &Cb::inc_pending); | ||||
|     td::actor::send_closure(peer_pair, &AdnlPeerPair::get_stats, | ||||
|                             [local_id = local_id, peer_id = peer_id_short_, | ||||
|                              callback](td::Result<tl_object_ptr<ton_api::adnl_stats_peerPair>> R) { | ||||
|                               if (R.is_error()) { | ||||
|                                 VLOG(ADNL_NOTICE) << "failed to get stats for peer pair " << peer_id << "->" << local_id | ||||
|                                                   << " : " << R.move_as_error(); | ||||
|                                 td::actor::send_closure(callback, &Cb::dec_pending); | ||||
|                               } else { | ||||
|                                 td::actor::send_closure(callback, &Cb::got_peer_pair_stats, R.move_as_ok()); | ||||
|                               } | ||||
|                             }); | ||||
|   } | ||||
|   td::actor::send_closure(callback, &Cb::dec_pending); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| void AdnlPeerPairImpl::got_data_from_db(td::Result<AdnlDbItem> R) { | ||||
|   received_from_db_ = false; | ||||
|   if (R.is_error()) { | ||||
|  | @ -1016,6 +1126,66 @@ void AdnlPeerPairImpl::request_reverse_ping_result(td::Result<td::Unit> R) { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::add_packet_stats(td::uint64 bytes, bool in, bool channel) { | ||||
|   prepare_packet_stats(); | ||||
|   auto add_stats = [&](PacketStats &stats) { | ||||
|     if (in) { | ||||
|       ++stats.in_packets; | ||||
|       stats.in_bytes += bytes; | ||||
|       if (channel) { | ||||
|         ++stats.in_packets_channel; | ||||
|         stats.in_bytes_channel += bytes; | ||||
|       } | ||||
|     } else { | ||||
|       ++stats.out_packets; | ||||
|       stats.out_bytes += bytes; | ||||
|       if (channel) { | ||||
|         ++stats.out_packets_channel; | ||||
|         stats.out_bytes_channel += bytes; | ||||
|       } | ||||
|     } | ||||
|   }; | ||||
|   add_stats(packet_stats_cur_); | ||||
|   add_stats(packet_stats_total_); | ||||
|   if (in) { | ||||
|     last_in_packet_ts_ = td::Clocks::system(); | ||||
|   } else { | ||||
|     last_out_packet_ts_ = td::Clocks::system(); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::add_expired_msg_stats(td::uint64 bytes) { | ||||
|   prepare_packet_stats(); | ||||
|   auto add_stats = [&](PacketStats &stats) { | ||||
|     ++stats.out_expired_messages; | ||||
|     stats.out_expired_bytes += bytes; | ||||
|   }; | ||||
|   add_stats(packet_stats_cur_); | ||||
|   add_stats(packet_stats_total_); | ||||
| } | ||||
| 
 | ||||
| void AdnlPeerPairImpl::prepare_packet_stats() { | ||||
|   double now = td::Clocks::system(); | ||||
|   if (now >= packet_stats_cur_.ts_end) { | ||||
|     packet_stats_prev_ = std::move(packet_stats_cur_); | ||||
|     packet_stats_cur_ = {}; | ||||
|     auto now_int = (int)now; | ||||
|     packet_stats_cur_.ts_start = (double)(now_int / 60 * 60); | ||||
|     packet_stats_cur_.ts_end = packet_stats_cur_.ts_start + 60.0; | ||||
|     if (packet_stats_prev_.ts_end < now - 60.0) { | ||||
|       packet_stats_prev_ = {}; | ||||
|       packet_stats_prev_.ts_end = packet_stats_cur_.ts_start; | ||||
|       packet_stats_prev_.ts_start = packet_stats_prev_.ts_end - 60.0; | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| tl_object_ptr<ton_api::adnl_stats_packets> AdnlPeerPairImpl::PacketStats::tl() const { | ||||
|   return create_tl_object<ton_api::adnl_stats_packets>(ts_start, ts_end, in_packets, in_bytes, in_packets_channel, | ||||
|                                                        in_bytes_channel, out_packets, out_bytes, out_packets_channel, | ||||
|                                                        out_bytes_channel, out_expired_messages, out_expired_bytes); | ||||
| } | ||||
| 
 | ||||
| }  // namespace adnl
 | ||||
| 
 | ||||
| }  // namespace ton
 | ||||
|  |  | |||
|  | @ -39,9 +39,9 @@ class AdnlPeer; | |||
| 
 | ||||
| class AdnlPeerPair : public td::actor::Actor { | ||||
|  public: | ||||
|   virtual void receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet) = 0; | ||||
|   virtual void receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet, td::uint64 serialized_size) = 0; | ||||
|   virtual void receive_packet_checked(AdnlPacket packet) = 0; | ||||
|   virtual void receive_packet(AdnlPacket packet) = 0; | ||||
|   virtual void receive_packet(AdnlPacket packet, td::uint64 serialized_size) = 0; | ||||
| 
 | ||||
|   virtual void send_messages(std::vector<OutboundAdnlMessage> message) = 0; | ||||
|   inline void send_message(OutboundAdnlMessage message) { | ||||
|  | @ -59,6 +59,7 @@ class AdnlPeerPair : public td::actor::Actor { | |||
|   virtual void update_peer_id(AdnlNodeIdFull id) = 0; | ||||
|   virtual void update_addr_list(AdnlAddressList addr_list) = 0; | ||||
|   virtual void get_conn_ip_str(td::Promise<td::string> promise) = 0; | ||||
|   virtual void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) = 0; | ||||
| 
 | ||||
|   static td::actor::ActorOwn<AdnlPeerPair> create(td::actor::ActorId<AdnlNetworkManager> network_manager, | ||||
|                                                   td::actor::ActorId<AdnlPeerTable> peer_table, td::uint32 local_mode, | ||||
|  | @ -71,7 +72,7 @@ class AdnlPeerPair : public td::actor::Actor { | |||
| class AdnlPeer : public td::actor::Actor { | ||||
|  public: | ||||
|   virtual void receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td::actor::ActorId<AdnlLocalId> dst_actor, | ||||
|                               AdnlPacket message) = 0; | ||||
|                               AdnlPacket message, td::uint64 serialized_size) = 0; | ||||
|   virtual void send_messages(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor, | ||||
|                              std::vector<OutboundAdnlMessage> messages) = 0; | ||||
|   virtual void send_query(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor, | ||||
|  | @ -100,6 +101,7 @@ class AdnlPeer : public td::actor::Actor { | |||
|                                 td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) = 0; | ||||
|   virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0; | ||||
|   virtual void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) = 0; | ||||
|   virtual void get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) = 0; | ||||
| }; | ||||
| 
 | ||||
| }  // namespace adnl
 | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ | |||
| 
 | ||||
| #include <vector> | ||||
| #include <map> | ||||
| #include <queue> | ||||
| 
 | ||||
| #include "adnl-peer.h" | ||||
| #include "adnl-peer-table.h" | ||||
|  | @ -66,12 +67,12 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
| 
 | ||||
|   void discover(); | ||||
| 
 | ||||
|   void receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet) override; | ||||
|   void receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet, td::uint64 serialized_size) override; | ||||
|   void receive_packet_checked(AdnlPacket packet) override; | ||||
|   void receive_packet(AdnlPacket packet) override; | ||||
|   void receive_packet(AdnlPacket packet, td::uint64 serialized_size) override; | ||||
|   void deliver_message(AdnlMessage message); | ||||
| 
 | ||||
|   void send_messages_in(std::vector<OutboundAdnlMessage> messages, bool allow_postpone); | ||||
|   void send_messages_from_queue(); | ||||
|   void send_messages(std::vector<OutboundAdnlMessage> messages) override; | ||||
|   void send_packet_continue(AdnlPacket packet, td::actor::ActorId<AdnlNetworkConnection> conn, bool via_channel); | ||||
|   void send_query(std::string name, td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data, | ||||
|  | @ -89,6 +90,7 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
|   void update_peer_id(AdnlNodeIdFull id) override; | ||||
| 
 | ||||
|   void get_conn_ip_str(td::Promise<td::string> promise) override; | ||||
|   void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) override; | ||||
| 
 | ||||
|   void got_data_from_db(td::Result<AdnlDbItem> R); | ||||
|   void got_data_from_static_nodes(td::Result<AdnlNode> R); | ||||
|  | @ -124,7 +126,7 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
|  private: | ||||
|   void respond_with_nop(); | ||||
|   void reinit(td::int32 date); | ||||
|   td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> get_conn(bool direct_only); | ||||
|   td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> get_conn(); | ||||
|   void create_channel(pubkeys::Ed25519 pub, td::uint32 date); | ||||
| 
 | ||||
|   bool received_packet(td::uint64 seqno) const { | ||||
|  | @ -183,11 +185,11 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
|     Conn() { | ||||
|     } | ||||
| 
 | ||||
|     bool ready() { | ||||
|     bool ready() const { | ||||
|       return !conn.empty() && conn.get_actor_unsafe().is_active(); | ||||
|     } | ||||
| 
 | ||||
|     bool is_direct() { | ||||
|     bool is_direct() const { | ||||
|       return addr->is_public(); | ||||
|     } | ||||
| 
 | ||||
|  | @ -195,7 +197,14 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
|                      td::actor::ActorId<Adnl> adnl); | ||||
|   }; | ||||
| 
 | ||||
|   std::vector<OutboundAdnlMessage> pending_messages_; | ||||
|   // Messages waiting for connection or for nochannel rate limiter
 | ||||
|   std::queue<std::pair<OutboundAdnlMessage, td::Timestamp>> out_messages_queue_; | ||||
|   td::uint64 out_messages_queue_total_size_ = 0; | ||||
|   RateLimiter nochannel_rate_limiter_ = RateLimiter(50, 0.5);  // max 50, period = 0.5s
 | ||||
|   td::Timestamp retry_send_at_ = td::Timestamp::never(); | ||||
|   bool disable_dht_query_ = false; | ||||
|   bool skip_init_packet_ = false; | ||||
|   double message_in_queue_ttl_ = 10.0; | ||||
| 
 | ||||
|   td::actor::ActorId<AdnlNetworkManager> network_manager_; | ||||
|   td::actor::ActorId<AdnlPeerTable> peer_table_; | ||||
|  | @ -254,7 +263,6 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
| 
 | ||||
|   td::Timestamp next_dht_query_at_ = td::Timestamp::never(); | ||||
|   td::Timestamp next_db_update_at_ = td::Timestamp::never(); | ||||
|   td::Timestamp retry_send_at_ = td::Timestamp::never(); | ||||
| 
 | ||||
|   td::Timestamp last_received_packet_ = td::Timestamp::never(); | ||||
|   td::Timestamp try_reinit_at_ = td::Timestamp::never(); | ||||
|  | @ -262,12 +270,26 @@ class AdnlPeerPairImpl : public AdnlPeerPair { | |||
|   bool has_reverse_addr_ = false; | ||||
|   td::Timestamp request_reverse_ping_after_ = td::Timestamp::now(); | ||||
|   bool request_reverse_ping_active_ = false; | ||||
| 
 | ||||
|   struct PacketStats { | ||||
|     double ts_start = 0.0, ts_end = 0.0; | ||||
|     td::uint64 in_packets = 0, in_bytes = 0, in_packets_channel = 0, in_bytes_channel = 0; | ||||
|     td::uint64 out_packets = 0, out_bytes = 0, out_packets_channel = 0, out_bytes_channel = 0; | ||||
|     td::uint64 out_expired_messages = 0, out_expired_bytes = 0; | ||||
| 
 | ||||
|     tl_object_ptr<ton_api::adnl_stats_packets> tl() const; | ||||
|   } packet_stats_cur_, packet_stats_prev_, packet_stats_total_; | ||||
|   double last_in_packet_ts_ = 0.0, last_out_packet_ts_ = 0.0; | ||||
|   double started_ts_ = td::Clocks::system(); | ||||
|   void add_packet_stats(td::uint64 bytes, bool in, bool channel); | ||||
|   void add_expired_msg_stats(td::uint64 bytes); | ||||
|   void prepare_packet_stats(); | ||||
| }; | ||||
| 
 | ||||
| class AdnlPeerImpl : public AdnlPeer { | ||||
|  public: | ||||
|   void receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td::actor::ActorId<AdnlLocalId> dst_actor, | ||||
|                       AdnlPacket packet) override; | ||||
|                       AdnlPacket packet, td::uint64 serialized_size) override; | ||||
|   void send_messages(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor, | ||||
|                      std::vector<OutboundAdnlMessage> messages) override; | ||||
|   void send_query(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor, std::string name, | ||||
|  | @ -280,6 +302,7 @@ class AdnlPeerImpl : public AdnlPeer { | |||
|                         AdnlAddressList addr_list) override; | ||||
|   void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) override; | ||||
|   void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) override; | ||||
|   void get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) override; | ||||
|   //void check_signature(td::BufferSlice data, td::BufferSlice signature, td::Promise<td::Unit> promise) override;
 | ||||
| 
 | ||||
|   AdnlPeerImpl(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<AdnlPeerTable> peer_table, | ||||
|  |  | |||
|  | @ -121,6 +121,8 @@ class Adnl : public AdnlSenderInterface { | |||
|   virtual void create_tunnel(AdnlNodeIdShort dst, td::uint32 size, | ||||
|                              td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) = 0; | ||||
| 
 | ||||
|   virtual void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) = 0; | ||||
| 
 | ||||
|   static td::actor::ActorOwn<Adnl> create(std::string db, td::actor::ActorId<keyring::Keyring> keyring); | ||||
| 
 | ||||
|   static std::string int_to_bytestring(td::int32 id) { | ||||
|  |  | |||
|  | @ -40,6 +40,40 @@ inline bool adnl_node_is_older(AdnlNode &a, AdnlNode &b) { | |||
|   return a.addr_list().version() < b.addr_list().version(); | ||||
| } | ||||
| 
 | ||||
| class RateLimiter { | ||||
| public: | ||||
|   explicit RateLimiter(td::uint32 capacity, double period) : capacity_(capacity), period_(period), remaining_(capacity) { | ||||
|   } | ||||
| 
 | ||||
|   bool take() { | ||||
|     while (remaining_ < capacity_ && increment_at_.is_in_past()) { | ||||
|       ++remaining_; | ||||
|       increment_at_ += period_; | ||||
|     } | ||||
|     if (remaining_) { | ||||
|       --remaining_; | ||||
|       if (increment_at_.is_in_past()) { | ||||
|         increment_at_ = td::Timestamp::in(period_); | ||||
|       } | ||||
|       return true; | ||||
|     } | ||||
|     return false; | ||||
|   } | ||||
| 
 | ||||
|   td::Timestamp ready_at() const { | ||||
|     if (remaining_) { | ||||
|       return td::Timestamp::now(); | ||||
|     } | ||||
|     return increment_at_; | ||||
|   } | ||||
| 
 | ||||
| private: | ||||
|   td::uint32 capacity_; | ||||
|   double period_; | ||||
|   td::uint32 remaining_; | ||||
|   td::Timestamp increment_at_ = td::Timestamp::never(); | ||||
| }; | ||||
| 
 | ||||
| }  // namespace adnl
 | ||||
| 
 | ||||
| }  // namespace ton
 | ||||
|  |  | |||
|  | @ -368,6 +368,12 @@ void CatChainReceiverImpl::add_block(td::BufferSlice payload, std::vector<CatCha | |||
|   } | ||||
| 
 | ||||
|   int height = prev->height_ + 1; | ||||
|   auto max_block_height = get_max_block_height(opts_, sources_.size()); | ||||
|   if (height > max_block_height) { | ||||
|     VLOG(CATCHAIN_WARNING) << this << ": cannot create block: max height exceeded (" << max_block_height << ")"; | ||||
|     active_send_ = false; | ||||
|     return; | ||||
|   } | ||||
|   auto block_data = create_tl_object<ton_api::catchain_block_data>(std::move(prev), std::move(deps_arr)); | ||||
|   auto block = create_tl_object<ton_api::catchain_block>(incarnation_, local_idx_, height, std::move(block_data), | ||||
|                                                          td::BufferSlice()); | ||||
|  |  | |||
|  | @ -225,13 +225,19 @@ int main() { | |||
| 
 | ||||
|   auto f = td::Clocks::system(); | ||||
|   scheduler.run_in_context([&] { | ||||
|     for (td::uint32 i = 1; i <= ton::adnl::Adnl::huge_packet_max_size(); i++) { | ||||
|     // Don't send too many packets
 | ||||
|     // Channels are disabled, so packet rate is limited
 | ||||
|     for (td::uint32 i : {1, 2, 3, 4, 100, 500, 900}) { | ||||
|       remaining++; | ||||
|       td::actor::send_closure(adnl, &ton::adnl::Adnl::send_message, src, dst, send_packet(i)); | ||||
|     } | ||||
|     for (td::uint32 i = 1024; i <= ton::adnl::Adnl::huge_packet_max_size() /* 1024 * 8 */; i += 1024) { | ||||
|       remaining++; | ||||
|       td::actor::send_closure(adnl, &ton::adnl::Adnl::send_message, src, dst, send_packet(i)); | ||||
|     } | ||||
|   }); | ||||
| 
 | ||||
|   auto t = td::Timestamp::in(320.0); | ||||
|   auto t = td::Timestamp::in(60.0); | ||||
|   while (scheduler.run(1)) { | ||||
|     if (!remaining) { | ||||
|       break; | ||||
|  | @ -241,7 +247,7 @@ int main() { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   LOG(ERROR) << "successfully tested delivering of packets of all sizes. Time=" << (td::Clocks::system() - f); | ||||
|   LOG(ERROR) << "successfully tested delivering of packets of various sizes. Time=" << (td::Clocks::system() - f); | ||||
| 
 | ||||
|   scheduler.run_in_context([&] { | ||||
|     td::actor::send_closure(network_manager, &ton::adnl::TestLoopbackNetworkManager::add_node_id, src, true, true); | ||||
|  |  | |||
|  | @ -144,7 +144,26 @@ adnl.message.part hash:int256 total_size:int offset:int data:bytes = adnl.Messag | |||
| ---types--- | ||||
| 
 | ||||
| adnl.db.node.key local_id:int256 peer_id:int256 = adnl.db.Key; | ||||
| adnl.db.node.value date:int id:PublicKey addr_list:adnl.addressList priority_addr_list:adnl.addressList = adnl.db.node.Value;  | ||||
| adnl.db.node.value date:int id:PublicKey addr_list:adnl.addressList priority_addr_list:adnl.addressList = adnl.db.node.Value; | ||||
| 
 | ||||
| adnl.stats.packets ts_start:double ts_end:double | ||||
|     in_packets:long in_bytes:long in_packets_channel:long in_bytes_channel:long | ||||
|     out_packets:long out_bytes:long out_packets_channel:long out_bytes_channel:long | ||||
|     out_expired_messages:long out_expired_bytes:long = adnl.stats.Packets; | ||||
| adnl.stats.peerPair local_id:int256 peer_id:int256 ip_str:string | ||||
|     packets_recent:adnl.stats.packets packets_total:adnl.stats.packets | ||||
|     last_out_packet_ts:double last_in_packet_ts:double | ||||
|     connection_ready:Bool channel_status:int try_reinit_at:double | ||||
|     out_queue_messages:long out_queue_bytes:long | ||||
|     = adnl.stats.PeerPair; | ||||
| adnl.stats.ipPackets ip_str:string packets:long = adnl.stats.IpPackets; | ||||
| adnl.stats.localIdPackets ts_start:double ts_end:double | ||||
|     decrypted_packets:(vector adnl.stats.ipPackets) dropped_packets:(vector adnl.stats.ipPackets) = adnl.stats.LocalIdPackets; | ||||
| adnl.stats.localId short_id:int256 | ||||
|     current_decrypt:(vector adnl.stats.ipPackets) | ||||
|     packets_recent:adnl.stats.localIdPackets packets_total:adnl.stats.localIdPackets | ||||
|     peers:(vector adnl.stats.peerPair) = adnl.stats.LocalId; | ||||
| adnl.stats timestamp:double local_ids:(vector adnl.stats.localId) = adnl.Stats; | ||||
| 
 | ||||
| ---functions--- | ||||
| 
 | ||||
|  | @ -609,7 +628,8 @@ engine.validator.customOverlaysConfig overlays:(vector engine.validator.customOv | |||
| engine.validator.collatorOptions | ||||
|   deferring_enabled:Bool defer_messages_after:int defer_out_queue_size_limit:long | ||||
|   dispatch_phase_2_max_total:int dispatch_phase_3_max_total:int | ||||
|   dispatch_phase_2_max_per_initiator:int dispatch_phase_3_max_per_initiator:int = engine.validator.CollatorOptions; | ||||
|   dispatch_phase_2_max_per_initiator:int dispatch_phase_3_max_per_initiator:int | ||||
|   whitelist:(vector string) prioritylist:(vector string) = engine.validator.CollatorOptions; | ||||
| 
 | ||||
| ---functions--- | ||||
| ---types--- | ||||
|  | @ -723,6 +743,8 @@ engine.validator.setStateSerializerEnabled enabled:Bool = engine.validator.Succe | |||
| engine.validator.setCollatorOptionsJson json:string = engine.validator.Success; | ||||
| engine.validator.getCollatorOptionsJson = engine.validator.JsonConfig; | ||||
| 
 | ||||
| engine.validator.getAdnlStats = adnl.Stats; | ||||
| 
 | ||||
| ---types--- | ||||
| 
 | ||||
| storage.pong = storage.Pong; | ||||
|  |  | |||
										
											Binary file not shown.
										
									
								
							|  | @ -1263,3 +1263,166 @@ td::Status GetCollatorOptionsJsonQuery::receive(td::BufferSlice data) { | |||
|   td::TerminalIO::out() << "saved config to " << file_name_ << "\n"; | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsJsonQuery::run() { | ||||
|   TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token<std::string>()); | ||||
|   TRY_STATUS(tokenizer_.check_endl()); | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsJsonQuery::send() { | ||||
|   auto b = | ||||
|       ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>(); | ||||
|   td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsJsonQuery::receive(td::BufferSlice data) { | ||||
|   TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::adnl_stats>(data.as_slice(), true), | ||||
|                     "received incorrect answer: "); | ||||
|   auto s = td::json_encode<std::string>(td::ToJson(*f), true); | ||||
|   TRY_STATUS(td::write_file(file_name_, s)); | ||||
|   td::TerminalIO::out() << "saved adnl stats to " << file_name_ << "\n"; | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsQuery::run() { | ||||
|   TRY_STATUS(tokenizer_.check_endl()); | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsQuery::send() { | ||||
|   auto b = | ||||
|       ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>(); | ||||
|   td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); | ||||
|   return td::Status::OK(); | ||||
| } | ||||
| 
 | ||||
| td::Status GetAdnlStatsQuery::receive(td::BufferSlice data) { | ||||
|   TRY_RESULT_PREFIX(stats, ton::fetch_tl_object<ton::ton_api::adnl_stats>(data.as_slice(), true), | ||||
|                     "received incorrect answer: "); | ||||
|   td::StringBuilder sb; | ||||
|   sb << "================================= ADNL STATS =================================\n"; | ||||
|   bool first = true; | ||||
|   double now = td::Clocks::system(); | ||||
|   for (auto &local_id : stats->local_ids_) { | ||||
|     if (first) { | ||||
|       first = false; | ||||
|     } else { | ||||
|       sb << "\n"; | ||||
|     } | ||||
|     sb << "LOCAL ID " << local_id->short_id_ << "\n"; | ||||
|     if (!local_id->current_decrypt_.empty()) { | ||||
|       std::sort(local_id->current_decrypt_.begin(), local_id->current_decrypt_.end(), | ||||
|                 [](const ton::tl_object_ptr<ton::ton_api::adnl_stats_ipPackets> &a, | ||||
|                    const ton::tl_object_ptr<ton::ton_api::adnl_stats_ipPackets> &b) { | ||||
|                   return a->packets_ > b->packets_; | ||||
|                 }); | ||||
|       td::uint64 total = 0; | ||||
|       for (auto &x : local_id->current_decrypt_) { | ||||
|         total += x->packets_; | ||||
|       } | ||||
|       sb << "  Packets in decryptor: total=" << total; | ||||
|       for (auto &x : local_id->current_decrypt_) { | ||||
|         sb << " " << (x->ip_str_.empty() ? "unknown" : x->ip_str_) << "=" << x->packets_; | ||||
|       } | ||||
|       sb << "\n"; | ||||
|     } | ||||
|     auto print_local_id_packets = [&](const std::string &name, | ||||
|                                       std::vector<ton::tl_object_ptr<ton::ton_api::adnl_stats_ipPackets>> &vec) { | ||||
|       if (vec.empty()) { | ||||
|         return; | ||||
|       } | ||||
|       std::sort(vec.begin(), vec.end(), | ||||
|                 [](const ton::tl_object_ptr<ton::ton_api::adnl_stats_ipPackets> &a, | ||||
|                    const ton::tl_object_ptr<ton::ton_api::adnl_stats_ipPackets> &b) { | ||||
|                   return a->packets_ > b->packets_; | ||||
|                 }); | ||||
|       td::uint64 total = 0; | ||||
|       for (auto &x : vec) { | ||||
|         total += x->packets_; | ||||
|       } | ||||
|       sb << "  " << name << ": total=" << total; | ||||
|       int cnt = 0; | ||||
|       for (auto &x : vec) { | ||||
|         ++cnt; | ||||
|         if (cnt >= 8) { | ||||
|           sb << " ..."; | ||||
|           break; | ||||
|         } | ||||
|         sb << " " << (x->ip_str_.empty() ? "unknown" : x->ip_str_) << "=" << x->packets_; | ||||
|       } | ||||
|       sb << "\n"; | ||||
|     }; | ||||
|     print_local_id_packets("Decrypted packets (recent)", local_id->packets_recent_->decrypted_packets_); | ||||
|     print_local_id_packets("Dropped packets   (recent)", local_id->packets_recent_->dropped_packets_); | ||||
|     print_local_id_packets("Decrypted packets (total)", local_id->packets_total_->decrypted_packets_); | ||||
|     print_local_id_packets("Dropped packets   (total)", local_id->packets_total_->dropped_packets_); | ||||
|     sb << "  PEERS (" << local_id->peers_.size() << "):\n"; | ||||
|     std::sort(local_id->peers_.begin(), local_id->peers_.end(), | ||||
|               [](const ton::tl_object_ptr<ton::ton_api::adnl_stats_peerPair> &a, | ||||
|                  const ton::tl_object_ptr<ton::ton_api::adnl_stats_peerPair> &b) { | ||||
|                 return a->packets_recent_->in_bytes_ + a->packets_recent_->out_bytes_ > | ||||
|                        b->packets_recent_->in_bytes_ + b->packets_recent_->out_bytes_; | ||||
|               }); | ||||
|     for (auto &peer : local_id->peers_) { | ||||
|       sb << "    PEER " << peer->peer_id_ << "\n"; | ||||
|       sb << "      Address: " << (peer->ip_str_.empty() ? "unknown" : peer->ip_str_) << "\n"; | ||||
|       sb << "      Connection " << (peer->connection_ready_ ? "ready" : "not ready") << ", "; | ||||
|       switch (peer->channel_status_) { | ||||
|         case 0: | ||||
|           sb << "channel: none\n"; | ||||
|           break; | ||||
|         case 1: | ||||
|           sb << "channel: inited\n"; | ||||
|           break; | ||||
|         case 2: | ||||
|           sb << "channel: ready\n"; | ||||
|           break; | ||||
|         default: | ||||
|           sb << "\n"; | ||||
|       } | ||||
| 
 | ||||
|       auto print_packets = [&](const std::string &name, | ||||
|                                const ton::tl_object_ptr<ton::ton_api::adnl_stats_packets> &obj) { | ||||
|         if (obj->in_packets_) { | ||||
|           sb << "      In  (" << name << "): " << obj->in_packets_ << " packets (" | ||||
|              << td::format::as_size(obj->in_bytes_) << "), channel: " << obj->in_packets_channel_ << " packets (" | ||||
|              << td::format::as_size(obj->in_bytes_channel_) << ")\n"; | ||||
|         } | ||||
|         if (obj->out_packets_) { | ||||
|           sb << "      Out (" << name << "): " << obj->out_packets_ << " packets (" | ||||
|              << td::format::as_size(obj->out_bytes_) << "), channel: " << obj->out_packets_channel_ << " packets (" | ||||
|              << td::format::as_size(obj->out_bytes_channel_) << ")\n"; | ||||
|         } | ||||
|         if (obj->out_expired_messages_) { | ||||
|           sb << "      Out expired (" << name << "): " << obj->out_expired_messages_ << " messages (" | ||||
|              << td::format::as_size(obj->out_expired_bytes_) << ")\n"; | ||||
|         } | ||||
|       }; | ||||
|       print_packets("recent", peer->packets_recent_); | ||||
|       print_packets("total", peer->packets_total_); | ||||
| 
 | ||||
|       sb << "      Last in packet: "; | ||||
|       if (peer->last_in_packet_ts_) { | ||||
|         sb << now - peer->last_in_packet_ts_ << " s ago"; | ||||
|       } else { | ||||
|         sb << "never"; | ||||
|       } | ||||
|       sb << "    Last out packet: "; | ||||
|       if (peer->last_out_packet_ts_) { | ||||
|         sb << now - peer->last_out_packet_ts_ << " s ago"; | ||||
|       } else { | ||||
|         sb << "never"; | ||||
|       } | ||||
|       sb << "\n"; | ||||
|       if (peer->out_queue_messages_) { | ||||
|         sb << "      Out message queue: " << peer->out_queue_messages_ << " messages (" | ||||
|            << td::format::as_size(peer->out_queue_bytes_) << ")\n"; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   sb << "==============================================================================\n"; | ||||
|   td::TerminalIO::out() << sb.as_cslice(); | ||||
|   return td::Status::OK(); | ||||
| } | ||||
|  |  | |||
|  | @ -1292,3 +1292,47 @@ class GetCollatorOptionsJsonQuery : public Query { | |||
|  private: | ||||
|   std::string file_name_; | ||||
| }; | ||||
| 
 | ||||
| class GetAdnlStatsJsonQuery : public Query { | ||||
|  public: | ||||
|   GetAdnlStatsJsonQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer) | ||||
|       : Query(console, std::move(tokenizer)) { | ||||
|   } | ||||
|   td::Status run() override; | ||||
|   td::Status send() override; | ||||
|   td::Status receive(td::BufferSlice data) override; | ||||
|   static std::string get_name() { | ||||
|     return "getadnlstatsjson"; | ||||
|   } | ||||
|   static std::string get_help() { | ||||
|     return "getadnlstatsjson <filename>\tsave adnl stats to <filename>"; | ||||
|   } | ||||
|   std::string name() const override { | ||||
|     return get_name(); | ||||
|   } | ||||
| 
 | ||||
|  private: | ||||
|   std::string file_name_; | ||||
| }; | ||||
| 
 | ||||
| class GetAdnlStatsQuery : public Query { | ||||
|  public: | ||||
|   GetAdnlStatsQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer) | ||||
|       : Query(console, std::move(tokenizer)) { | ||||
|   } | ||||
|   td::Status run() override; | ||||
|   td::Status send() override; | ||||
|   td::Status receive(td::BufferSlice data) override; | ||||
|   static std::string get_name() { | ||||
|     return "getadnlstats"; | ||||
|   } | ||||
|   static std::string get_help() { | ||||
|     return "getadnlstats\tdisplay adnl stats"; | ||||
|   } | ||||
|   std::string name() const override { | ||||
|     return get_name(); | ||||
|   } | ||||
| 
 | ||||
|  private: | ||||
|   std::string file_name_; | ||||
| }; | ||||
|  |  | |||
|  | @ -150,6 +150,8 @@ void ValidatorEngineConsole::run() { | |||
|   add_query_runner(std::make_unique<QueryRunnerImpl<SetCollatorOptionsJsonQuery>>()); | ||||
|   add_query_runner(std::make_unique<QueryRunnerImpl<ResetCollatorOptionsQuery>>()); | ||||
|   add_query_runner(std::make_unique<QueryRunnerImpl<GetCollatorOptionsJsonQuery>>()); | ||||
|   add_query_runner(std::make_unique<QueryRunnerImpl<GetAdnlStatsJsonQuery>>()); | ||||
|   add_query_runner(std::make_unique<QueryRunnerImpl<GetAdnlStatsQuery>>()); | ||||
| } | ||||
| 
 | ||||
| bool ValidatorEngineConsole::envelope_send_query(td::BufferSlice query, td::Promise<td::BufferSlice> promise) { | ||||
|  |  | |||
|  | @ -1448,6 +1448,9 @@ td::Status ValidatorEngine::load_global_config() { | |||
|   if (catchain_max_block_delay_) { | ||||
|     validator_options_.write().set_catchain_max_block_delay(catchain_max_block_delay_.value()); | ||||
|   } | ||||
|   if (catchain_max_block_delay_slow_) { | ||||
|     validator_options_.write().set_catchain_max_block_delay_slow(catchain_max_block_delay_slow_.value()); | ||||
|   } | ||||
| 
 | ||||
|   std::vector<ton::BlockIdExt> h; | ||||
|   for (auto &x : conf.validator_->hardforks_) { | ||||
|  | @ -2528,6 +2531,14 @@ static td::Result<td::Ref<ton::validator::CollatorOptions>> parse_collator_optio | |||
|   } else { | ||||
|     opts.dispatch_phase_3_max_per_initiator = {}; | ||||
|   } | ||||
|   for (const std::string& s : f.whitelist_) { | ||||
|     TRY_RESULT(addr, block::StdAddress::parse(s)); | ||||
|     opts.whitelist.emplace(addr.workchain, addr.addr); | ||||
|   } | ||||
|   for (const std::string& s : f.prioritylist_) { | ||||
|     TRY_RESULT(addr, block::StdAddress::parse(s)); | ||||
|     opts.prioritylist.emplace(addr.workchain, addr.addr); | ||||
|   } | ||||
| 
 | ||||
|   return ref; | ||||
| } | ||||
|  | @ -3863,6 +3874,28 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getCollat | |||
|   } | ||||
| } | ||||
| 
 | ||||
| void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getAdnlStats &query, td::BufferSlice data, | ||||
|                                         ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise) { | ||||
|   if (!(perm & ValidatorEnginePermissions::vep_default)) { | ||||
|     promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); | ||||
|     return; | ||||
|   } | ||||
|   if (adnl_.empty()) { | ||||
|     promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); | ||||
|     return; | ||||
|   } | ||||
|   td::actor::send_closure( | ||||
|       adnl_, &ton::adnl::Adnl::get_stats, | ||||
|       [promise = std::move(promise)](td::Result<ton::tl_object_ptr<ton::ton_api::adnl_stats>> R) mutable { | ||||
|         if (R.is_ok()) { | ||||
|           promise.set_value(ton::serialize_tl_object(R.move_as_ok(), true)); | ||||
|         } else { | ||||
|           promise.set_value( | ||||
|               create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "failed to get adnl stats"))); | ||||
|         } | ||||
|       }); | ||||
| } | ||||
| 
 | ||||
| void ValidatorEngine::process_control_query(td::uint16 port, ton::adnl::AdnlNodeIdShort src, | ||||
|                                             ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, | ||||
|                                             td::Promise<td::BufferSlice> promise) { | ||||
|  | @ -4072,7 +4105,7 @@ int main(int argc, char *argv[]) { | |||
|     logger_ = td::TsFileLog::create(fname.str()).move_as_ok(); | ||||
|     td::log_interface = logger_.get(); | ||||
|   }); | ||||
|   p.add_checked_option('s', "state-ttl", "state will be gc'd after this time (in seconds) default=3600", | ||||
|   p.add_checked_option('s', "state-ttl", "state will be gc'd after this time (in seconds) default=86400", | ||||
|                        [&](td::Slice fname) { | ||||
|                          auto v = td::to_double(fname); | ||||
|                          if (v <= 0) { | ||||
|  | @ -4233,7 +4266,7 @@ int main(int argc, char *argv[]) { | |||
|       "preload all cells from CellDb on startup (recommended to use with big enough celldb-cache-size and celldb-direct-io)", | ||||
|       [&]() { acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_preload_all, true); }); }); | ||||
|   p.add_checked_option( | ||||
|       '\0', "catchain-max-block-delay", "delay before creating a new catchain block, in seconds (default: 0.5)", | ||||
|       '\0', "catchain-max-block-delay", "delay before creating a new catchain block, in seconds (default: 0.4)", | ||||
|       [&](td::Slice s) -> td::Status { | ||||
|         auto v = td::to_double(s); | ||||
|         if (v < 0) { | ||||
|  | @ -4242,6 +4275,16 @@ int main(int argc, char *argv[]) { | |||
|         acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_catchain_max_block_delay, v); }); | ||||
|         return td::Status::OK(); | ||||
|       }); | ||||
|   p.add_checked_option( | ||||
|       '\0', "catchain-max-block-delay-slow", "max extended catchain block delay (for too long rounds), (default: 1.0)", | ||||
|       [&](td::Slice s) -> td::Status { | ||||
|         auto v = td::to_double(s); | ||||
|         if (v < 0) { | ||||
|           return td::Status::Error("catchain-max-block-delay-slow should be non-negative"); | ||||
|         } | ||||
|         acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_catchain_max_block_delay_slow, v); }); | ||||
|         return td::Status::OK(); | ||||
|       }); | ||||
|   p.add_option( | ||||
|       '\0', "fast-state-serializer", | ||||
|       "faster persistent state serializer, but requires more RAM (enabled automatically on machines with >= 90GB RAM)", | ||||
|  |  | |||
|  | @ -214,7 +214,7 @@ class ValidatorEngine : public td::actor::Actor { | |||
|   td::optional<td::uint64> celldb_cache_size_ = 1LL << 30; | ||||
|   bool celldb_direct_io_ = false; | ||||
|   bool celldb_preload_all_ = false; | ||||
|   td::optional<double> catchain_max_block_delay_; | ||||
|   td::optional<double> catchain_max_block_delay_, catchain_max_block_delay_slow_; | ||||
|   bool read_config_ = false; | ||||
|   bool started_keyring_ = false; | ||||
|   bool started_ = false; | ||||
|  | @ -300,6 +300,9 @@ class ValidatorEngine : public td::actor::Actor { | |||
|   void set_catchain_max_block_delay(double value) { | ||||
|     catchain_max_block_delay_ = value; | ||||
|   } | ||||
|   void set_catchain_max_block_delay_slow(double value) { | ||||
|     catchain_max_block_delay_slow_ = value; | ||||
|   } | ||||
|   void set_fast_state_serializer_enabled(bool value) { | ||||
|     fast_state_serializer_enabled_ = value; | ||||
|   } | ||||
|  | @ -489,6 +492,8 @@ class ValidatorEngine : public td::actor::Actor { | |||
|                          ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise); | ||||
|   void run_control_query(ton::ton_api::engine_validator_getCollatorOptionsJson &query, td::BufferSlice data, | ||||
|                          ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise); | ||||
|   void run_control_query(ton::ton_api::engine_validator_getAdnlStats &query, td::BufferSlice data, | ||||
|                          ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise); | ||||
|   template <class T> | ||||
|   void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, | ||||
|                          td::Promise<td::BufferSlice> promise) { | ||||
|  |  | |||
|  | @ -478,6 +478,14 @@ class ValidatorSessionState : public ValidatorSessionDescription::RootObject { | |||
|   auto get_ts(td::uint32 src_idx) const { | ||||
|     return att_->at(src_idx); | ||||
|   } | ||||
|   td::uint32 cur_attempt_in_round(const ValidatorSessionDescription& desc) const { | ||||
|     td::uint32 first_attempt = cur_round_->get_first_attempt(desc.get_self_idx()); | ||||
|     td::uint32 cur_attempt = desc.get_attempt_seqno(desc.get_ts()); | ||||
|     if (cur_attempt < first_attempt || first_attempt == 0) { | ||||
|       return 0; | ||||
|     } | ||||
|     return cur_attempt - first_attempt; | ||||
|   } | ||||
| 
 | ||||
|   const SentBlock* choose_block_to_sign(ValidatorSessionDescription& desc, td::uint32 src_idx, bool& found) const; | ||||
|   const SentBlock* get_committed_block(ValidatorSessionDescription& desc, td::uint32 seqno) const; | ||||
|  |  | |||
|  | @ -813,13 +813,25 @@ void ValidatorSessionImpl::request_new_block(bool now) { | |||
|   } else { | ||||
|     double lambda = 10.0 / description().get_total_nodes(); | ||||
|     double x = -1 / lambda * log(td::Random::fast(1, 999) * 0.001); | ||||
|     if (x > catchain_max_block_delay_) {  // default = 0.5
 | ||||
|       x = catchain_max_block_delay_; | ||||
|     } | ||||
|     x = std::min(x, get_current_max_block_delay());  // default = 0.4
 | ||||
|     td::actor::send_closure(catchain_, &catchain::CatChain::need_new_block, td::Timestamp::in(x)); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| double ValidatorSessionImpl::get_current_max_block_delay() const { | ||||
|   td::uint32 att = real_state_->cur_attempt_in_round(*description_); | ||||
|   td::uint32 att1 = description_->opts().max_round_attempts; | ||||
|   if (att <= att1) { | ||||
|     return catchain_max_block_delay_; | ||||
|   } | ||||
|   td::uint32 att2 = att1 + 4; | ||||
|   if (att >= att2) { | ||||
|     return catchain_max_block_delay_slow_; | ||||
|   } | ||||
|   return catchain_max_block_delay_ + | ||||
|          (catchain_max_block_delay_slow_ - catchain_max_block_delay_) * (double)(att - att1) / (double)(att2 - att1); | ||||
| } | ||||
| 
 | ||||
| void ValidatorSessionImpl::on_new_round(td::uint32 round) { | ||||
|   if (round != 0) { | ||||
|     CHECK(cur_round_ < round); | ||||
|  |  | |||
|  | @ -109,7 +109,7 @@ class ValidatorSession : public td::actor::Actor { | |||
|   virtual void get_validator_group_info_for_litequery( | ||||
|       td::uint32 cur_round, | ||||
|       td::Promise<std::vector<tl_object_ptr<lite_api::liteServer_nonfinal_candidateInfo>>> promise) = 0; | ||||
|   virtual void set_catchain_max_block_delay(double value) = 0; | ||||
|   virtual void set_catchain_max_block_delay(double delay, double delay_slow) = 0; | ||||
| 
 | ||||
|   static td::actor::ActorOwn<ValidatorSession> create( | ||||
|       catchain::CatChainSessionId session_id, ValidatorSessionOptions opts, PublicKeyHash local_id, | ||||
|  |  | |||
|  | @ -91,6 +91,7 @@ class ValidatorSessionImpl : public ValidatorSession { | |||
|   std::unique_ptr<ValidatorSessionDescription> description_; | ||||
| 
 | ||||
|   double catchain_max_block_delay_ = 0.4; | ||||
|   double catchain_max_block_delay_slow_ = 1.0; | ||||
| 
 | ||||
|   void on_new_round(td::uint32 round); | ||||
|   void on_catchain_started(); | ||||
|  | @ -150,6 +151,7 @@ class ValidatorSessionImpl : public ValidatorSession { | |||
|   } | ||||
| 
 | ||||
|   void request_new_block(bool now); | ||||
|   double get_current_max_block_delay() const; | ||||
|   void get_broadcast_p2p(PublicKeyHash node, ValidatorSessionFileHash file_hash, | ||||
|                          ValidatorSessionCollatedDataFileHash collated_data_file_hash, PublicKeyHash src, | ||||
|                          td::uint32 round, ValidatorSessionRootHash root_hash, td::Promise<td::BufferSlice> promise, | ||||
|  | @ -191,8 +193,10 @@ class ValidatorSessionImpl : public ValidatorSession { | |||
|   void get_validator_group_info_for_litequery( | ||||
|       td::uint32 cur_round, | ||||
|       td::Promise<std::vector<tl_object_ptr<lite_api::liteServer_nonfinal_candidateInfo>>> promise) override; | ||||
|   void set_catchain_max_block_delay(double value) override { | ||||
|     catchain_max_block_delay_ = value; | ||||
| 
 | ||||
|   void set_catchain_max_block_delay(double delay, double delay_slow) override { | ||||
|     catchain_max_block_delay_ = delay; | ||||
|     catchain_max_block_delay_slow_ = delay_slow; | ||||
|   } | ||||
| 
 | ||||
|   void process_blocks(std::vector<catchain::CatChainBlock *> blocks); | ||||
|  |  | |||
|  | @ -188,12 +188,17 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi | |||
|   if (!opts_->get_disable_rocksdb_stats()) { | ||||
|     cell_db_statistics_.store_cell_time_.insert(timer.elapsed() * 1e6); | ||||
|   } | ||||
|   LOG(DEBUG) << "Stored state " << block_id.to_str(); | ||||
| } | ||||
| 
 | ||||
| void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) { | ||||
|   promise.set_result(boc_->get_cell_db_reader()); | ||||
| } | ||||
| 
 | ||||
| void CellDbIn::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) { | ||||
|   promise.set_result(last_deleted_mc_state_); | ||||
| } | ||||
| 
 | ||||
| void CellDbIn::flush_db_stats() { | ||||
|   if (opts_->get_disable_rocksdb_stats()) { | ||||
|     return; | ||||
|  | @ -320,6 +325,10 @@ void CellDbIn::gc_cont2(BlockHandle handle) { | |||
|   if (!opts_->get_disable_rocksdb_stats()) { | ||||
|     cell_db_statistics_.gc_cell_time_.insert(timer.elapsed() * 1e6); | ||||
|   } | ||||
|   if (handle->id().is_masterchain()) { | ||||
|     last_deleted_mc_state_ = handle->id().seqno(); | ||||
|   } | ||||
|   LOG(DEBUG) << "Deleted state " << handle->id().to_str(); | ||||
| } | ||||
| 
 | ||||
| void CellDbIn::skip_gc() { | ||||
|  | @ -453,6 +462,10 @@ void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p | |||
|   td::actor::send_closure(cell_db_, &CellDbIn::get_cell_db_reader, std::move(promise)); | ||||
| } | ||||
| 
 | ||||
| void CellDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) { | ||||
|   td::actor::send_closure(cell_db_, &CellDbIn::get_last_deleted_mc_state, std::move(promise)); | ||||
| } | ||||
| 
 | ||||
| void CellDb::start_up() { | ||||
|   CellDbBase::start_up(); | ||||
|   boc_ = vm::DynamicBagOfCellsDb::create(); | ||||
|  |  | |||
|  | @ -61,6 +61,7 @@ class CellDbIn : public CellDbBase { | |||
|   void load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise); | ||||
|   void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise); | ||||
|   void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise); | ||||
|   void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise); | ||||
| 
 | ||||
|   void migrate_cell(td::Bits256 hash); | ||||
| 
 | ||||
|  | @ -143,6 +144,7 @@ class CellDbIn : public CellDbBase { | |||
|   std::shared_ptr<td::RocksDbSnapshotStatistics> snapshot_statistics_; | ||||
|   CellDbStatistics cell_db_statistics_; | ||||
|   td::Timestamp statistics_flush_at_ = td::Timestamp::never(); | ||||
|   BlockSeqno last_deleted_mc_state_ = 0; | ||||
| 
 | ||||
|  public: | ||||
|   class MigrationProxy : public td::actor::Actor { | ||||
|  | @ -167,6 +169,7 @@ class CellDb : public CellDbBase { | |||
|     boc_->set_loader(std::make_unique<vm::CellLoader>(std::move(snapshot), on_load_callback_)).ensure(); | ||||
|   } | ||||
|   void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise); | ||||
|   void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise); | ||||
| 
 | ||||
|   CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts) | ||||
|       : root_db_(root_db), path_(path), opts_(opts) { | ||||
|  |  | |||
|  | @ -274,6 +274,10 @@ void RootDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p | |||
|   td::actor::send_closure(cell_db_, &CellDb::get_cell_db_reader, std::move(promise)); | ||||
| } | ||||
| 
 | ||||
| void RootDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) { | ||||
|   td::actor::send_closure(cell_db_, &CellDb::get_last_deleted_mc_state, std::move(promise)); | ||||
| } | ||||
| 
 | ||||
| void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, | ||||
|                                          td::Promise<td::Unit> promise) { | ||||
|   td::actor::send_closure(archive_db_, &ArchiveManager::add_persistent_state, block_id, masterchain_block_id, | ||||
|  |  | |||
|  | @ -63,6 +63,7 @@ class RootDb : public Db { | |||
|                          td::Promise<td::Ref<ShardState>> promise) override; | ||||
|   void get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) override; | ||||
|   void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) override; | ||||
|   void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) override; | ||||
| 
 | ||||
|   void store_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) override; | ||||
|   void get_block_handle(BlockIdExt id, td::Promise<BlockHandle> promise) override; | ||||
|  |  | |||
|  | @ -3066,7 +3066,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R | |||
|   bool defer = false; | ||||
|   if (!from_dispatch_queue) { | ||||
|     if (deferring_messages_enabled_ && collator_opts_->deferring_enabled && !is_special && !is_special_account && | ||||
|         msg.msg_idx != 0) { | ||||
|         !collator_opts_->whitelist.count({src_wc, src_addr}) && msg.msg_idx != 0) { | ||||
|       if (++sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after || | ||||
|           out_msg_queue_size_ > defer_out_queue_size_limit_) { | ||||
|         defer = true; | ||||
|  | @ -3697,6 +3697,8 @@ bool Collator::process_dispatch_queue() { | |||
|     vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root(), 256, block::tlb::aug_DispatchQueue}; | ||||
|     std::map<std::tuple<WorkchainId, StdSmcAddress, LogicalTime>, size_t> count_per_initiator; | ||||
|     size_t total_count = 0; | ||||
|     auto prioritylist = collator_opts_->prioritylist; | ||||
|     auto prioritylist_iter = prioritylist.begin(); | ||||
|     while (!cur_dispatch_queue.is_empty()) { | ||||
|       block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal); | ||||
|       if (block_full_) { | ||||
|  | @ -3713,9 +3715,30 @@ bool Collator::process_dispatch_queue() { | |||
|         return true; | ||||
|       } | ||||
|       StdSmcAddress src_addr; | ||||
|       auto account_dispatch_queue = block::get_dispatch_queue_min_lt_account(cur_dispatch_queue, src_addr); | ||||
|       td::Ref<vm::CellSlice> account_dispatch_queue; | ||||
|       while (!prioritylist.empty()) { | ||||
|         if (prioritylist_iter == prioritylist.end()) { | ||||
|           prioritylist_iter = prioritylist.begin(); | ||||
|         } | ||||
|         auto priority_addr = *prioritylist_iter; | ||||
|         if (priority_addr.first != workchain() || !is_our_address(priority_addr.second)) { | ||||
|           prioritylist_iter = prioritylist.erase(prioritylist_iter); | ||||
|           continue; | ||||
|         } | ||||
|         src_addr = priority_addr.second; | ||||
|         account_dispatch_queue = cur_dispatch_queue.lookup(src_addr); | ||||
|         if (account_dispatch_queue.is_null()) { | ||||
|           prioritylist_iter = prioritylist.erase(prioritylist_iter); | ||||
|         } else { | ||||
|           ++prioritylist_iter; | ||||
|           break; | ||||
|         } | ||||
|       } | ||||
|       if (account_dispatch_queue.is_null()) { | ||||
|         return fatal_error("invalid dispatch queue in shard state"); | ||||
|         account_dispatch_queue = block::get_dispatch_queue_min_lt_account(cur_dispatch_queue, src_addr); | ||||
|         if (account_dispatch_queue.is_null()) { | ||||
|           return fatal_error("invalid dispatch queue in shard state"); | ||||
|         } | ||||
|       } | ||||
|       vm::Dictionary dict{64}; | ||||
|       td::uint64 dict_size; | ||||
|  | @ -3735,7 +3758,8 @@ bool Collator::process_dispatch_queue() { | |||
|       // Remove message from DispatchQueue
 | ||||
|       bool ok; | ||||
|       if (iter == 0 || | ||||
|           (iter == 1 && sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after)) { | ||||
|           (iter == 1 && sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after && | ||||
|            !collator_opts_->whitelist.count({workchain(), src_addr}))) { | ||||
|         ok = cur_dispatch_queue.lookup_delete(src_addr).not_null(); | ||||
|       } else { | ||||
|         dict.lookup_delete(key); | ||||
|  |  | |||
|  | @ -870,7 +870,7 @@ void LiteQuery::perform_runSmcMethod(BlockIdExt blkid, WorkchainId workchain, St | |||
|       vm::FakeVmStateLimits fstate(1000);  // limit recursive (de)serialization calls
 | ||||
|       vm::VmStateInterface::Guard guard(&fstate); | ||||
|       auto cs = vm::load_cell_slice(res.move_as_ok()); | ||||
|       if (!(vm::Stack::deserialize_to(cs, stack_, 0) && cs.empty_ext())) { | ||||
|       if (!(vm::Stack::deserialize_to(cs, stack_, 2 /* no continuations */) && cs.empty_ext())) { | ||||
|         fatal_error("parameter list boc cannot be deserialized as a VmStack"); | ||||
|         return; | ||||
|       } | ||||
|  |  | |||
|  | @ -44,6 +44,7 @@ ShardStateQ::ShardStateQ(const ShardStateQ& other) | |||
|     , root(other.root) | ||||
|     , lt(other.lt) | ||||
|     , utime(other.utime) | ||||
|     , global_id_(other.global_id_) | ||||
|     , before_split_(other.before_split_) | ||||
|     , fake_split_(other.fake_split_) | ||||
|     , fake_merge_(other.fake_merge_) { | ||||
|  | @ -121,6 +122,7 @@ td::Status ShardStateQ::init() { | |||
|   } | ||||
|   lt = info.gen_lt; | ||||
|   utime = info.gen_utime; | ||||
|   global_id_ = info.global_id; | ||||
|   before_split_ = info.before_split; | ||||
|   block::ShardId id{info.shard_id}; | ||||
|   ton::BlockId hdr_id{ton::ShardIdFull(id), info.seq_no}; | ||||
|  |  | |||
|  | @ -38,6 +38,7 @@ class ShardStateQ : virtual public ShardState { | |||
|   Ref<vm::Cell> root; | ||||
|   LogicalTime lt{0}; | ||||
|   UnixTime utime{0}; | ||||
|   td::int32 global_id_{0}; | ||||
|   bool before_split_{false}; | ||||
|   bool fake_split_{false}; | ||||
|   bool fake_merge_{false}; | ||||
|  | @ -81,6 +82,9 @@ class ShardStateQ : virtual public ShardState { | |||
|   LogicalTime get_logical_time() const override { | ||||
|     return lt; | ||||
|   } | ||||
|   td::int32 get_global_id() const override { | ||||
|     return global_id_; | ||||
|   } | ||||
|   td::optional<BlockIdExt> get_master_ref() const override { | ||||
|     return master_ref; | ||||
|   } | ||||
|  |  | |||
|  | @ -51,6 +51,7 @@ class Db : public td::actor::Actor { | |||
|                                  td::Promise<td::Ref<ShardState>> promise) = 0; | ||||
|   virtual void get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) = 0; | ||||
|   virtual void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) = 0; | ||||
|   virtual void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) = 0; | ||||
| 
 | ||||
|   virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, | ||||
|                                            td::Promise<td::Unit> promise) = 0; | ||||
|  |  | |||
|  | @ -39,6 +39,7 @@ class ShardState : public td::CntObject { | |||
| 
 | ||||
|   virtual UnixTime get_unix_time() const = 0; | ||||
|   virtual LogicalTime get_logical_time() const = 0; | ||||
|   virtual td::int32 get_global_id() const = 0; | ||||
|   virtual ShardIdFull get_shard() const = 0; | ||||
|   virtual BlockSeqno get_seqno() const = 0; | ||||
|   virtual BlockIdExt get_block_id() const = 0; | ||||
|  |  | |||
|  | @ -2057,7 +2057,6 @@ void ValidatorManagerImpl::update_shards() { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   bool validating_masterchain = false; | ||||
|   if (allow_validate_) { | ||||
|     for (auto &desc : new_shards) { | ||||
|       auto shard = desc.first; | ||||
|  | @ -2074,9 +2073,6 @@ void ValidatorManagerImpl::update_shards() { | |||
|       auto validator_id = get_validator(shard, val_set); | ||||
| 
 | ||||
|       if (!validator_id.is_zero()) { | ||||
|         if (shard.is_masterchain()) { | ||||
|           validating_masterchain = true; | ||||
|         } | ||||
|         auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts); | ||||
| 
 | ||||
|         if (force_recover) { | ||||
|  | @ -2171,16 +2167,14 @@ void ValidatorManagerImpl::update_shards() { | |||
|       td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_destroyed_validator_sessions, std::move(gc)); | ||||
|     }); | ||||
|     td::actor::send_closure(db_, &Db::update_destroyed_validator_sessions, gc_list_, std::move(P)); | ||||
| 
 | ||||
|     if (!serializer_.empty()) { | ||||
|       td::actor::send_closure( | ||||
|           serializer_, &AsyncStateSerializer::auto_disable_serializer, | ||||
|           validating_masterchain && | ||||
|               last_masterchain_state_->get_validator_set(ShardIdFull{masterchainId})->export_vector().size() * 2 <= | ||||
|                   last_masterchain_state_->get_total_validator_set(0)->export_vector().size()); | ||||
|     } | ||||
|   } | ||||
| }  // namespace validator
 | ||||
| 
 | ||||
|   if (!serializer_.empty()) { | ||||
|     td::actor::send_closure( | ||||
|         serializer_, &AsyncStateSerializer::auto_disable_serializer, | ||||
|         !validator_groups_.empty() && last_masterchain_state_->get_global_id() == -239);  // mainnet only
 | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| void ValidatorManagerImpl::written_destroyed_validator_sessions(std::vector<td::actor::ActorId<ValidatorGroup>> list) { | ||||
|   for (auto &v : list) { | ||||
|  | @ -2773,6 +2767,23 @@ void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std:: | |||
|     vec.emplace_back("rotatemasterchainblock", last_rotate_block_id_.to_str()); | ||||
|     //vec.emplace_back("shardclientmasterchainseqno", td::to_string(min_confirmed_masterchain_seqno_));
 | ||||
|     vec.emplace_back("stateserializermasterchainseqno", td::to_string(state_serializer_masterchain_seqno_)); | ||||
| 
 | ||||
|     td::actor::send_closure(db_, &Db::get_last_deleted_mc_state, | ||||
|                             [promise = merger.make_promise(""), | ||||
|                              gc_seqno = gc_masterchain_handle_->id().seqno()](td::Result<BlockSeqno> R) mutable { | ||||
|                               TRY_RESULT_PROMISE(promise, seqno, std::move(R)); | ||||
|                               std::string s; | ||||
|                               if (seqno == 0) { | ||||
|                                 s = "none"; | ||||
|                               } else if (seqno <= gc_seqno) { | ||||
|                                 s = PSTRING() << seqno << " (gc_seqno-" << (gc_seqno - seqno) << ")"; | ||||
|                               } else { | ||||
|                                 s = PSTRING() << seqno << " (gc_seqno+" << (seqno - gc_seqno) << ")"; | ||||
|                               } | ||||
|                               std::vector<std::pair<std::string, std::string>> vec; | ||||
|                               vec.emplace_back("lastgcdmasterchainstate", std::move(s)); | ||||
|                               promise.set_value(std::move(vec)); | ||||
|                             }); | ||||
|   } | ||||
| 
 | ||||
|   if (!shard_client_.empty()) { | ||||
|  |  | |||
|  | @ -378,6 +378,7 @@ void AsyncStateSerializer::got_shard_handle(BlockHandle handle) { | |||
| 
 | ||||
| void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state, | ||||
|                                            std::shared_ptr<vm::CellDbReader> cell_db_reader) { | ||||
|   next_idx_++; | ||||
|   if (!opts_->get_state_serializer_enabled() || auto_disabled_) { | ||||
|     success_handler(); | ||||
|     return; | ||||
|  | @ -406,7 +407,6 @@ void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardStat | |||
|   }); | ||||
|   td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(), | ||||
|                           masterchain_handle_->id(), write_data, std::move(P)); | ||||
|   next_idx_++; | ||||
| } | ||||
| 
 | ||||
| void AsyncStateSerializer::fail_handler(td::Status reason) { | ||||
|  |  | |||
|  | @ -348,10 +348,12 @@ void ValidatorGroup::create_session() { | |||
|                   << ".", | ||||
|         allow_unsafe_self_blocks_resync_); | ||||
|   } | ||||
|   if (opts_->get_catchain_max_block_delay()) { | ||||
|     td::actor::send_closure(session_, &validatorsession::ValidatorSession::set_catchain_max_block_delay, | ||||
|                             opts_->get_catchain_max_block_delay().value()); | ||||
|   } | ||||
|   double catchain_delay = opts_->get_catchain_max_block_delay() ? opts_->get_catchain_max_block_delay().value() : 0.4; | ||||
|   double catchain_delay_slow = | ||||
|       std::max(catchain_delay, | ||||
|                opts_->get_catchain_max_block_delay_slow() ? opts_->get_catchain_max_block_delay_slow().value() : 1.0); | ||||
|   td::actor::send_closure(session_, &validatorsession::ValidatorSession::set_catchain_max_block_delay, catchain_delay, | ||||
|                           catchain_delay_slow); | ||||
|   if (started_) { | ||||
|     td::actor::send_closure(session_, &validatorsession::ValidatorSession::start); | ||||
|   } | ||||
|  |  | |||
|  | @ -141,6 +141,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { | |||
|   td::optional<double> get_catchain_max_block_delay() const override { | ||||
|     return catchain_max_block_delay_; | ||||
|   } | ||||
|   td::optional<double> get_catchain_max_block_delay_slow() const override { | ||||
|     return catchain_max_block_delay_slow_; | ||||
|   } | ||||
|   bool get_state_serializer_enabled() const override { | ||||
|     return state_serializer_enabled_; | ||||
|   } | ||||
|  | @ -230,6 +233,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { | |||
|   void set_catchain_max_block_delay(double value) override { | ||||
|     catchain_max_block_delay_ = value; | ||||
|   } | ||||
|   void set_catchain_max_block_delay_slow(double value) override { | ||||
|     catchain_max_block_delay_slow_ = value; | ||||
|   } | ||||
|   void set_state_serializer_enabled(bool value) override { | ||||
|     state_serializer_enabled_ = value; | ||||
|   } | ||||
|  | @ -289,7 +295,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { | |||
|   td::optional<td::uint64> celldb_cache_size_; | ||||
|   bool celldb_direct_io_ = false; | ||||
|   bool celldb_preload_all_ = false; | ||||
|   td::optional<double> catchain_max_block_delay_; | ||||
|   td::optional<double> catchain_max_block_delay_, catchain_max_block_delay_slow_; | ||||
|   bool state_serializer_enabled_ = true; | ||||
|   td::Ref<CollatorOptions> collator_options_{true}; | ||||
|   bool fast_state_serializer_enabled_ = false; | ||||
|  |  | |||
|  | @ -64,6 +64,11 @@ struct CollatorOptions : public td::CntObject { | |||
|   td::uint32 dispatch_phase_3_max_total = 150; | ||||
|   td::uint32 dispatch_phase_2_max_per_initiator = 20; | ||||
|   td::optional<td::uint32> dispatch_phase_3_max_per_initiator;  // Default - depends on out msg queue size
 | ||||
| 
 | ||||
|   // Don't defer messages from these accounts
 | ||||
|   std::set<std::pair<WorkchainId, StdSmcAddress>> whitelist; | ||||
|   // Prioritize these accounts on each phase of process_dispatch_queue
 | ||||
|   std::set<std::pair<WorkchainId, StdSmcAddress>> prioritylist; | ||||
| }; | ||||
| 
 | ||||
| struct ValidatorManagerOptions : public td::CntObject { | ||||
|  | @ -105,6 +110,7 @@ struct ValidatorManagerOptions : public td::CntObject { | |||
|   virtual bool get_celldb_direct_io() const = 0; | ||||
|   virtual bool get_celldb_preload_all() const = 0; | ||||
|   virtual td::optional<double> get_catchain_max_block_delay() const = 0; | ||||
|   virtual td::optional<double> get_catchain_max_block_delay_slow() const = 0; | ||||
|   virtual bool get_state_serializer_enabled() const = 0; | ||||
|   virtual td::Ref<CollatorOptions> get_collator_options() const = 0; | ||||
|   virtual bool get_fast_state_serializer_enabled() const = 0; | ||||
|  | @ -136,6 +142,7 @@ struct ValidatorManagerOptions : public td::CntObject { | |||
|   virtual void set_celldb_direct_io(bool value) = 0; | ||||
|   virtual void set_celldb_preload_all(bool value) = 0; | ||||
|   virtual void set_catchain_max_block_delay(double value) = 0; | ||||
|   virtual void set_catchain_max_block_delay_slow(double value) = 0; | ||||
|   virtual void set_state_serializer_enabled(bool value) = 0; | ||||
|   virtual void set_collator_options(td::Ref<CollatorOptions> value) = 0; | ||||
|   virtual void set_fast_state_serializer_enabled(bool value) = 0; | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue