mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	Improve importing msg queues
This commit is contained in:
		
							parent
							
								
									e814973749
								
							
						
					
					
						commit
						9e02853cbb
					
				
					 5 changed files with 107 additions and 28 deletions
				
			
		| 
						 | 
				
			
			@ -218,9 +218,12 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
 | 
			
		|||
 | 
			
		||||
struct ImportedMsgQueueLimits {
 | 
			
		||||
  // Default values
 | 
			
		||||
  td::uint32 max_bytes = 1 << 19;
 | 
			
		||||
  td::uint32 max_msgs = 500;
 | 
			
		||||
  td::uint32 max_bytes = 1 << 16;
 | 
			
		||||
  td::uint32 max_msgs = 30;
 | 
			
		||||
  bool deserialize(vm::CellSlice& cs);
 | 
			
		||||
  ImportedMsgQueueLimits operator*(td::uint32 x) const {
 | 
			
		||||
    return {max_bytes * x, max_msgs * x};
 | 
			
		||||
  }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct ParamLimits {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -960,26 +960,26 @@ void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std:
 | 
			
		|||
      create_tl_shard_id(dst_shard), std::move(blocks_tl),
 | 
			
		||||
      create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));
 | 
			
		||||
 | 
			
		||||
  auto P = td::PromiseCreator::lambda([=, promise = create_neighbour_promise(b, std::move(promise), true),
 | 
			
		||||
                                       blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
 | 
			
		||||
    if (R.is_error()) {
 | 
			
		||||
      promise.set_result(R.move_as_error());
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
 | 
			
		||||
    ton_api::downcast_call(
 | 
			
		||||
        *f, td::overloaded(
 | 
			
		||||
                [&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
 | 
			
		||||
                  promise.set_error(td::Status::Error("node doesn't have this block"));
 | 
			
		||||
                },
 | 
			
		||||
                [&](ton_api::tonNode_outMsgQueueProof &x) {
 | 
			
		||||
                  delay_action(
 | 
			
		||||
                      [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
 | 
			
		||||
                        promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
 | 
			
		||||
                      },
 | 
			
		||||
                      td::Timestamp::now());
 | 
			
		||||
                }));
 | 
			
		||||
  });
 | 
			
		||||
  auto P = td::PromiseCreator::lambda(
 | 
			
		||||
      [=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
 | 
			
		||||
        if (R.is_error()) {
 | 
			
		||||
          promise.set_result(R.move_as_error());
 | 
			
		||||
          return;
 | 
			
		||||
        }
 | 
			
		||||
        TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
 | 
			
		||||
        ton_api::downcast_call(
 | 
			
		||||
            *f, td::overloaded(
 | 
			
		||||
                    [&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
 | 
			
		||||
                      promise.set_error(td::Status::Error("node doesn't have this block"));
 | 
			
		||||
                    },
 | 
			
		||||
                    [&](ton_api::tonNode_outMsgQueueProof &x) {
 | 
			
		||||
                      delay_action(
 | 
			
		||||
                          [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
 | 
			
		||||
                            promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
 | 
			
		||||
                          },
 | 
			
		||||
                          td::Timestamp::now());
 | 
			
		||||
                    }));
 | 
			
		||||
      });
 | 
			
		||||
  td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
 | 
			
		||||
                          "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -285,6 +285,54 @@ void OutMsgQueueImporter::new_masterchain_block_notification(td::Ref<Masterchain
 | 
			
		|||
void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
 | 
			
		||||
    ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, td::Timestamp timeout,
 | 
			
		||||
    td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise) {
 | 
			
		||||
  if (blocks.empty()) {
 | 
			
		||||
    promise.set_value({});
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  if (dst_shard.is_masterchain() && blocks.size() != 1) {
 | 
			
		||||
    // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
 | 
			
		||||
    // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
 | 
			
		||||
    class Worker : public td::actor::Actor {
 | 
			
		||||
     public:
 | 
			
		||||
      Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
 | 
			
		||||
          : pending_(pending), promise_(std::move(promise)) {
 | 
			
		||||
        CHECK(pending_ > 0);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      void on_result(td::Ref<OutMsgQueueProof> res) {
 | 
			
		||||
        result_[res->block_id_] = res;
 | 
			
		||||
        if (--pending_ == 0) {
 | 
			
		||||
          promise_.set_result(std::move(result_));
 | 
			
		||||
          stop();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      void on_error(td::Status error) {
 | 
			
		||||
        promise_.set_error(std::move(error));
 | 
			
		||||
        stop();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
     private:
 | 
			
		||||
      size_t pending_;
 | 
			
		||||
      td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
 | 
			
		||||
      std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
 | 
			
		||||
    };
 | 
			
		||||
    auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
 | 
			
		||||
    for (const BlockIdExt& block : blocks) {
 | 
			
		||||
      get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout,
 | 
			
		||||
                                    [=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
 | 
			
		||||
                                      if (R.is_error()) {
 | 
			
		||||
                                        td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
 | 
			
		||||
                                      } else {
 | 
			
		||||
                                        auto res = R.move_as_ok();
 | 
			
		||||
                                        CHECK(res.size() == 1);
 | 
			
		||||
                                        td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
 | 
			
		||||
                                      }
 | 
			
		||||
                                    });
 | 
			
		||||
    }
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  std::sort(blocks.begin(), blocks.end());
 | 
			
		||||
  auto entry = cache_[{dst_shard, blocks}];
 | 
			
		||||
  if (entry) {
 | 
			
		||||
| 
						 | 
				
			
			@ -326,7 +374,8 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
 | 
			
		|||
    ++entry->pending;
 | 
			
		||||
    for (size_t i = 0; i < p.second.size(); i += 16) {
 | 
			
		||||
      size_t j = std::min(i + 16, p.second.size());
 | 
			
		||||
      get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j), limits);
 | 
			
		||||
      get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j),
 | 
			
		||||
                       limits * (td::uint32)(j - i));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  if (entry->pending == 0) {
 | 
			
		||||
| 
						 | 
				
			
			@ -341,7 +390,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
 | 
			
		|||
  td::actor::send_closure(
 | 
			
		||||
      manager_, &ValidatorManager::wait_block_state_short, block, 0, entry->timeout,
 | 
			
		||||
      [=, SelfId = actor_id(this), manager = manager_, timeout = entry->timeout,
 | 
			
		||||
       retry_after = td::Timestamp::in(0.5)](td::Result<Ref<ShardState>> R) mutable {
 | 
			
		||||
       retry_after = td::Timestamp::in(0.1)](td::Result<Ref<ShardState>> R) mutable {
 | 
			
		||||
        if (R.is_error()) {
 | 
			
		||||
          LOG(DEBUG) << "Failed to get block state for " << block.to_str() << ": " << R.move_as_error();
 | 
			
		||||
          delay_action([=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_local, entry, block); },
 | 
			
		||||
| 
						 | 
				
			
			@ -380,7 +429,7 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
 | 
			
		|||
  }
 | 
			
		||||
  td::actor::send_closure(
 | 
			
		||||
      manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, entry->dst_shard, blocks, limits,
 | 
			
		||||
      [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.5),
 | 
			
		||||
      [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1),
 | 
			
		||||
       dst_shard = entry->dst_shard](td::Result<std::vector<td::Ref<OutMsgQueueProof>>> R) {
 | 
			
		||||
        if (R.is_error()) {
 | 
			
		||||
          LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -478,6 +478,10 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
 | 
			
		|||
      });
 | 
			
		||||
      wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
 | 
			
		||||
    }
 | 
			
		||||
    if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) {
 | 
			
		||||
      wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0),
 | 
			
		||||
                                     [](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>>) {});
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -2525,6 +2529,16 @@ bool ValidatorManagerImpl::validating_masterchain() {
 | 
			
		|||
              .is_zero();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool ValidatorManagerImpl::collating_masterchain() {
 | 
			
		||||
  if (masterchain_collators_) {
 | 
			
		||||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
  if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_all) {
 | 
			
		||||
    return false;
 | 
			
		||||
  }
 | 
			
		||||
  return validating_masterchain();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<ValidatorSet> val_set) {
 | 
			
		||||
  for (auto &key : temp_keys_) {
 | 
			
		||||
    if (val_set->is_validator(key.bits256_value())) {
 | 
			
		||||
| 
						 | 
				
			
			@ -2726,7 +2740,12 @@ void ValidatorManagerImpl::add_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
 | 
			
		|||
    it = collator_nodes_.emplace(id, Collator()).first;
 | 
			
		||||
    it->second.actor = td::actor::create_actor<CollatorNode>("collatornode", id, actor_id(this), adnl_, rldp_);
 | 
			
		||||
  }
 | 
			
		||||
  it->second.shards.insert(shard);
 | 
			
		||||
  if (!it->second.shards.insert(shard).second) {
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  if (shard.is_masterchain()) {
 | 
			
		||||
    ++masterchain_collators_;
 | 
			
		||||
  }
 | 
			
		||||
  td::actor::send_closure(it->second.actor, &CollatorNode::add_shard, shard);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -2735,10 +2754,16 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
 | 
			
		|||
  if (it == collator_nodes_.end()) {
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
 | 
			
		||||
  it->second.shards.erase(shard);
 | 
			
		||||
  if (!it->second.shards.erase(shard)) {
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  if (shard.is_masterchain()) {
 | 
			
		||||
    --masterchain_collators_;
 | 
			
		||||
  }
 | 
			
		||||
  if (it->second.shards.empty()) {
 | 
			
		||||
    collator_nodes_.erase(it);
 | 
			
		||||
  } else {
 | 
			
		||||
    td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -506,6 +506,7 @@ class ValidatorManagerImpl : public ValidatorManager {
 | 
			
		|||
  bool is_validator();
 | 
			
		||||
  bool is_collator();
 | 
			
		||||
  bool validating_masterchain();
 | 
			
		||||
  bool collating_masterchain();
 | 
			
		||||
  PublicKeyHash get_validator(ShardIdFull shard, td::Ref<ValidatorSet> val_set);
 | 
			
		||||
 | 
			
		||||
  ValidatorManagerImpl(td::Ref<ValidatorManagerOptions> opts, std::string db_root,
 | 
			
		||||
| 
						 | 
				
			
			@ -650,6 +651,7 @@ class ValidatorManagerImpl : public ValidatorManager {
 | 
			
		|||
    std::set<ShardIdFull> shards;
 | 
			
		||||
  };
 | 
			
		||||
  std::map<adnl::AdnlNodeIdShort, Collator> collator_nodes_;
 | 
			
		||||
  size_t masterchain_collators_ = 0;
 | 
			
		||||
 | 
			
		||||
  std::set<ShardIdFull> extra_active_shards_;
 | 
			
		||||
  std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue