mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-15 04:32:21 +00:00
Enable compression in private overlays v2 and in collator node
This commit is contained in:
parent
8385336eab
commit
f5cedc3b6e
8 changed files with 129 additions and 66 deletions
|
@ -789,7 +789,9 @@ validatorSession.stats success:Bool id:tonNode.blockIdExt timestamp:long self:in
|
||||||
signatures:int signatures_weight:long approve_signatures:int approve_signatures_weight:long
|
signatures:int signatures_weight:long approve_signatures:int approve_signatures_weight:long
|
||||||
first_round:int rounds:(vector validatorSession.statsRound) = validatorSession.Stats;
|
first_round:int rounds:(vector validatorSession.statsRound) = validatorSession.Stats;
|
||||||
|
|
||||||
collatorNode.generateBlockSuccess candidate:db.Candidate = collatorNode.GenerateBlockResult;
|
collatorNode.candidate source:PublicKey id:tonNode.blockIdExt data:bytes collated_data:bytes = collatorNode.Candidate;
|
||||||
|
collatorNode.compressedCandidate flags:# source:PublicKey id:tonNode.blockIdExt decompressed_size:int data:bytes = collatorNode.Candidate;
|
||||||
|
collatorNode.generateBlockSuccess candidate:collatorNode.Candidate = collatorNode.GenerateBlockResult;
|
||||||
collatorNode.generateBlockError code:int message:string = collatorNode.GenerateBlockResult;
|
collatorNode.generateBlockError code:int message:string = collatorNode.GenerateBlockResult;
|
||||||
|
|
||||||
---functions---
|
---functions---
|
||||||
|
|
Binary file not shown.
|
@ -23,27 +23,15 @@
|
||||||
|
|
||||||
namespace ton::validatorsession {
|
namespace ton::validatorsession {
|
||||||
|
|
||||||
td::Result<td::BufferSlice> serialize_candidate(const tl_object_ptr<ton_api::validatorSession_candidate> &block,
|
td::Result<td::BufferSlice> serialize_candidate(const tl_object_ptr<ton_api::validatorSession_candidate>& block,
|
||||||
bool compression_enabled) {
|
bool compression_enabled) {
|
||||||
if (!compression_enabled) {
|
if (!compression_enabled) {
|
||||||
return serialize_tl_object(block, true);
|
return serialize_tl_object(block, true);
|
||||||
}
|
}
|
||||||
vm::BagOfCells boc1, boc2;
|
size_t decompressed_size;
|
||||||
TRY_STATUS(boc1.deserialize(block->data_));
|
TRY_RESULT(compressed, compress_candidate_data(block->data_, block->collated_data_, decompressed_size))
|
||||||
if (boc1.get_root_count() != 1) {
|
|
||||||
return td::Status::Error("block candidate should have exactly one root");
|
|
||||||
}
|
|
||||||
std::vector<td::Ref<vm::Cell>> roots = {boc1.get_root_cell()};
|
|
||||||
TRY_STATUS(boc2.deserialize(block->collated_data_));
|
|
||||||
for (int i = 0; i < boc2.get_root_count(); ++i) {
|
|
||||||
roots.push_back(boc2.get_root_cell(i));
|
|
||||||
}
|
|
||||||
TRY_RESULT(data, vm::std_boc_serialize_multi(std::move(roots), 2));
|
|
||||||
td::BufferSlice compressed = td::lz4_compress(data);
|
|
||||||
LOG(VALIDATOR_SESSION_DEBUG) << "Compressing block candidate: " << block->data_.size() + block->collated_data_.size()
|
|
||||||
<< " -> " << compressed.size();
|
|
||||||
return create_serialize_tl_object<ton_api::validatorSession_compressedCandidate>(
|
return create_serialize_tl_object<ton_api::validatorSession_compressedCandidate>(
|
||||||
0, block->src_, block->round_, block->root_hash_, (int)data.size(), std::move(compressed));
|
0, block->src_, block->round_, block->root_hash_, (int)decompressed_size, std::move(compressed));
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candidate(td::Slice data,
|
td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candidate(td::Slice data,
|
||||||
|
@ -56,8 +44,34 @@ td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candi
|
||||||
if (f->decompressed_size_ > max_decompressed_data_size) {
|
if (f->decompressed_size_ > max_decompressed_data_size) {
|
||||||
return td::Status::Error("decompressed size is too big");
|
return td::Status::Error("decompressed size is too big");
|
||||||
}
|
}
|
||||||
TRY_RESULT(decompressed, td::lz4_decompress(f->data_, f->decompressed_size_));
|
TRY_RESULT(p, decompress_candidate_data(f->data_, f->decompressed_size_));
|
||||||
if (decompressed.size() != (size_t)f->decompressed_size_) {
|
return create_tl_object<ton_api::validatorSession_candidate>(f->src_, f->round_, f->root_hash_, std::move(p.first),
|
||||||
|
std::move(p.second));
|
||||||
|
}
|
||||||
|
|
||||||
|
td::Result<td::BufferSlice> compress_candidate_data(td::Slice block, td::Slice collated_data,
|
||||||
|
size_t& decompressed_size) {
|
||||||
|
vm::BagOfCells boc1, boc2;
|
||||||
|
TRY_STATUS(boc1.deserialize(block));
|
||||||
|
if (boc1.get_root_count() != 1) {
|
||||||
|
return td::Status::Error("block candidate should have exactly one root");
|
||||||
|
}
|
||||||
|
std::vector<td::Ref<vm::Cell>> roots = {boc1.get_root_cell()};
|
||||||
|
TRY_STATUS(boc2.deserialize(collated_data));
|
||||||
|
for (int i = 0; i < boc2.get_root_count(); ++i) {
|
||||||
|
roots.push_back(boc2.get_root_cell(i));
|
||||||
|
}
|
||||||
|
TRY_RESULT(data, vm::std_boc_serialize_multi(std::move(roots), 2));
|
||||||
|
decompressed_size = data.size();
|
||||||
|
td::BufferSlice compressed = td::lz4_compress(data);
|
||||||
|
LOG(DEBUG) << "Compressing block candidate: " << block.size() + collated_data.size() << " -> " << compressed.size();
|
||||||
|
return compressed;
|
||||||
|
}
|
||||||
|
|
||||||
|
td::Result<std::pair<td::BufferSlice, td::BufferSlice>> decompress_candidate_data(td::Slice compressed,
|
||||||
|
int decompressed_size) {
|
||||||
|
TRY_RESULT(decompressed, td::lz4_decompress(compressed, decompressed_size));
|
||||||
|
if (decompressed.size() != (size_t)decompressed_size) {
|
||||||
return td::Status::Error("decompressed size mismatch");
|
return td::Status::Error("decompressed size mismatch");
|
||||||
}
|
}
|
||||||
TRY_RESULT(roots, vm::std_boc_deserialize_multi(decompressed));
|
TRY_RESULT(roots, vm::std_boc_deserialize_multi(decompressed));
|
||||||
|
@ -67,10 +81,9 @@ td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candi
|
||||||
TRY_RESULT(block_data, vm::std_boc_serialize(roots[0], 31));
|
TRY_RESULT(block_data, vm::std_boc_serialize(roots[0], 31));
|
||||||
roots.erase(roots.begin());
|
roots.erase(roots.begin());
|
||||||
TRY_RESULT(collated_data, vm::std_boc_serialize_multi(std::move(roots), 31));
|
TRY_RESULT(collated_data, vm::std_boc_serialize_multi(std::move(roots), 31));
|
||||||
LOG(VALIDATOR_SESSION_DEBUG) << "Decompressing block candidate: " << f->data_.size() << " -> "
|
LOG(DEBUG) << "Decompressing block candidate: " << compressed.size() << " -> "
|
||||||
<< block_data.size() + collated_data.size();
|
<< block_data.size() + collated_data.size();
|
||||||
return create_tl_object<ton_api::validatorSession_candidate>(f->src_, f->round_, f->root_hash_, std::move(block_data),
|
return std::make_pair(std::move(block_data), std::move(collated_data));
|
||||||
std::move(collated_data));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ton::validatorsession
|
} // namespace ton::validatorsession
|
||||||
|
|
|
@ -20,10 +20,15 @@
|
||||||
|
|
||||||
namespace ton::validatorsession {
|
namespace ton::validatorsession {
|
||||||
|
|
||||||
td::Result<td::BufferSlice> serialize_candidate(const tl_object_ptr<ton_api::validatorSession_candidate> &block,
|
td::Result<td::BufferSlice> serialize_candidate(const tl_object_ptr<ton_api::validatorSession_candidate>& block,
|
||||||
bool compression_enabled);
|
bool compression_enabled);
|
||||||
td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candidate(td::Slice data,
|
td::Result<tl_object_ptr<ton_api::validatorSession_candidate>> deserialize_candidate(td::Slice data,
|
||||||
bool compression_enabled,
|
bool compression_enabled,
|
||||||
int max_decompressed_data_size);
|
int max_decompressed_data_size);
|
||||||
|
|
||||||
|
td::Result<td::BufferSlice> compress_candidate_data(td::Slice block, td::Slice collated_data,
|
||||||
|
size_t& decompressed_size);
|
||||||
|
td::Result<std::pair<td::BufferSlice, td::BufferSlice>> decompress_candidate_data(td::Slice compressed,
|
||||||
|
int decompressed_size);
|
||||||
|
|
||||||
} // namespace ton::validatorsession
|
} // namespace ton::validatorsession
|
||||||
|
|
|
@ -19,10 +19,11 @@
|
||||||
#include "fabric.h"
|
#include "fabric.h"
|
||||||
#include "block-auto.h"
|
#include "block-auto.h"
|
||||||
#include "block-db.h"
|
#include "block-db.h"
|
||||||
|
#include "td/utils/lz4.h"
|
||||||
|
#include "checksum.h"
|
||||||
|
#include "validator-session/candidate-serializer.h"
|
||||||
|
|
||||||
namespace ton {
|
namespace ton::validator {
|
||||||
|
|
||||||
namespace validator {
|
|
||||||
|
|
||||||
CollatorNode::CollatorNode(adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ValidatorManager> manager,
|
CollatorNode::CollatorNode(adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ValidatorManager> manager,
|
||||||
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp)
|
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp)
|
||||||
|
@ -110,12 +111,6 @@ static td::BufferSlice serialize_error(td::Status error) {
|
||||||
return create_serialize_tl_object<ton_api::collatorNode_generateBlockError>(error.code(), error.message().c_str());
|
return create_serialize_tl_object<ton_api::collatorNode_generateBlockError>(error.code(), error.message().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
static td::BufferSlice serialize_response(BlockCandidate block) {
|
|
||||||
return create_serialize_tl_object<ton_api::collatorNode_generateBlockSuccess>(create_tl_object<ton_api::db_candidate>(
|
|
||||||
PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id), std::move(block.data),
|
|
||||||
std::move(block.collated_data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey creator) {
|
static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey creator) {
|
||||||
CHECK(!block.id.is_masterchain());
|
CHECK(!block.id.is_masterchain());
|
||||||
if (block.pubkey == creator) {
|
if (block.pubkey == creator) {
|
||||||
|
@ -145,7 +140,8 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
|
||||||
promise.set_result(serialize_error(R.move_as_error()));
|
promise.set_result(serialize_error(R.move_as_error()));
|
||||||
} else {
|
} else {
|
||||||
LOG(INFO) << "Query from " << src << ", success";
|
LOG(INFO) << "Query from " << src << ", success";
|
||||||
promise.set_result(serialize_response(R.move_as_ok()));
|
promise.set_result(create_serialize_tl_object<ton_api::collatorNode_generateBlockSuccess>(
|
||||||
|
serialize_candidate(R.move_as_ok(), true)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!last_masterchain_block_.is_valid()) {
|
if (!last_masterchain_block_.is_valid()) {
|
||||||
|
@ -263,14 +259,62 @@ void CollatorNode::process_result(std::shared_ptr<CacheEntry> cache_entry, td::R
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CollatorNode::can_collate_shard(ShardIdFull shard) const {
|
bool CollatorNode::can_collate_shard(ShardIdFull shard) const {
|
||||||
for (ShardIdFull our_shard : shards_) {
|
return std::any_of(shards_.begin(), shards_.end(),
|
||||||
if (shard_intersects(shard, our_shard)) {
|
[&](const ShardIdFull& our_shard) { return shard_intersects(shard, our_shard); });
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace validator
|
tl_object_ptr<ton_api::collatorNode_Candidate> CollatorNode::serialize_candidate(const BlockCandidate& block,
|
||||||
|
bool compress) {
|
||||||
|
if (!compress) {
|
||||||
|
return create_tl_object<ton_api::collatorNode_candidate>(
|
||||||
|
PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id), block.data.clone(),
|
||||||
|
block.collated_data.clone());
|
||||||
|
}
|
||||||
|
size_t decompressed_size;
|
||||||
|
td::BufferSlice compressed =
|
||||||
|
validatorsession::compress_candidate_data(block.data, block.collated_data, decompressed_size).move_as_ok();
|
||||||
|
return create_tl_object<ton_api::collatorNode_compressedCandidate>(
|
||||||
|
0, PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id),
|
||||||
|
(int)decompressed_size, std::move(compressed));
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ton
|
td::Result<BlockCandidate> CollatorNode::deserialize_candidate(tl_object_ptr<ton_api::collatorNode_Candidate> f,
|
||||||
|
int max_decompressed_data_size) {
|
||||||
|
td::Result<BlockCandidate> res;
|
||||||
|
ton_api::downcast_call(*f, td::overloaded(
|
||||||
|
[&](ton_api::collatorNode_candidate& c) {
|
||||||
|
res = [&]() -> td::Result<BlockCandidate> {
|
||||||
|
auto hash = td::sha256_bits256(c.collated_data_);
|
||||||
|
auto key = ton::PublicKey{c.source_};
|
||||||
|
if (!key.is_ed25519()) {
|
||||||
|
return td::Status::Error("invalid pubkey");
|
||||||
|
}
|
||||||
|
auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()};
|
||||||
|
return BlockCandidate{e_key, create_block_id(c.id_), hash, std::move(c.data_),
|
||||||
|
std::move(c.collated_data_)};
|
||||||
|
}();
|
||||||
|
},
|
||||||
|
[&](ton_api::collatorNode_compressedCandidate& c) {
|
||||||
|
res = [&]() -> td::Result<BlockCandidate> {
|
||||||
|
if (c.decompressed_size_ <= 0) {
|
||||||
|
return td::Status::Error("invalid decompressed size");
|
||||||
|
}
|
||||||
|
if (c.decompressed_size_ > max_decompressed_data_size) {
|
||||||
|
return td::Status::Error("decompressed size is too big");
|
||||||
|
}
|
||||||
|
TRY_RESULT(
|
||||||
|
p, validatorsession::decompress_candidate_data(c.data_, c.decompressed_size_));
|
||||||
|
auto collated_data_hash = td::sha256_bits256(p.second);
|
||||||
|
auto key = ton::PublicKey{c.source_};
|
||||||
|
if (!key.is_ed25519()) {
|
||||||
|
return td::Status::Error("invalid pubkey");
|
||||||
|
}
|
||||||
|
auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()};
|
||||||
|
return BlockCandidate{e_key, create_block_id(c.id_), collated_data_hash,
|
||||||
|
std::move(p.first), std::move(p.second)};
|
||||||
|
}();
|
||||||
|
}));
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ton::validator
|
||||||
|
|
|
@ -20,9 +20,7 @@
|
||||||
#include "rldp/rldp.h"
|
#include "rldp/rldp.h"
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
namespace ton {
|
namespace ton::validator {
|
||||||
|
|
||||||
namespace validator {
|
|
||||||
|
|
||||||
class ValidatorManager;
|
class ValidatorManager;
|
||||||
|
|
||||||
|
@ -77,8 +75,11 @@ class CollatorNode : public td::actor::Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);
|
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);
|
||||||
|
|
||||||
|
public:
|
||||||
|
static tl_object_ptr<ton_api::collatorNode_Candidate> serialize_candidate(const BlockCandidate& block, bool compress);
|
||||||
|
static td::Result<BlockCandidate> deserialize_candidate(tl_object_ptr<ton_api::collatorNode_Candidate> f,
|
||||||
|
int max_decompressed_data_size);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace validator
|
} // namespace ton::validator
|
||||||
|
|
||||||
} // namespace ton
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ void FullNodePrivateOverlayV2::send_broadcast(BlockBroadcast broadcast) {
|
||||||
if (!inited_) {
|
if (!inited_) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false
|
auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true
|
||||||
if (B.is_error()) {
|
if (B.is_error()) {
|
||||||
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
|
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "ton/lite-tl.hpp"
|
#include "ton/lite-tl.hpp"
|
||||||
#include "ton/ton-tl.hpp"
|
#include "ton/ton-tl.hpp"
|
||||||
#include "td/utils/Random.h"
|
#include "td/utils/Random.h"
|
||||||
|
#include "collator-node.hpp"
|
||||||
|
|
||||||
namespace ton {
|
namespace ton {
|
||||||
|
|
||||||
|
@ -506,7 +507,7 @@ void ValidatorGroup::send_collate_query(td::uint32 round_id, td::Timestamp timeo
|
||||||
std::move(promise));
|
std::move(promise));
|
||||||
});
|
});
|
||||||
LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to " << collator;
|
LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to " << collator;
|
||||||
size_t max_answer_size = config_.max_block_size + config_.max_collated_data_size + 256;
|
size_t max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024;
|
||||||
td::Timestamp query_timeout = td::Timestamp::in(10.0);
|
td::Timestamp query_timeout = td::Timestamp::in(10.0);
|
||||||
query_timeout.relax(timeout);
|
query_timeout.relax(timeout);
|
||||||
td::actor::send_closure(rldp_, &rldp::Rldp::send_query_ex, local_adnl_id_, collator, "collatequery", std::move(P),
|
td::actor::send_closure(rldp_, &rldp::Rldp::send_query_ex, local_adnl_id_, collator, "collatequery", std::move(P),
|
||||||
|
@ -520,29 +521,26 @@ void ValidatorGroup::receive_collate_query_response(td::uint32 round_id, td::Buf
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::collatorNode_GenerateBlockResult>(data, true));
|
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::collatorNode_GenerateBlockResult>(data, true));
|
||||||
tl_object_ptr<ton_api::db_candidate> b;
|
td::Result<BlockCandidate> res;
|
||||||
ton_api::downcast_call(*f, td::overloaded(
|
ton_api::downcast_call(*f, td::overloaded(
|
||||||
[&](ton_api::collatorNode_generateBlockError &r) {
|
[&](ton_api::collatorNode_generateBlockError &r) {
|
||||||
td::Status error = td::Status::Error(r.code_, r.message_);
|
td::Status error = td::Status::Error(r.code_, r.message_);
|
||||||
promise.set_error(error.move_as_error_prefix("collate query: "));
|
res = error.move_as_error_prefix("collate query: ");
|
||||||
},
|
},
|
||||||
[&](ton_api::collatorNode_generateBlockSuccess &r) { b = std::move(r.candidate_); }));
|
[&](ton_api::collatorNode_generateBlockSuccess &r) {
|
||||||
if (!b) {
|
res = CollatorNode::deserialize_candidate(
|
||||||
return;
|
std::move(r.candidate_),
|
||||||
}
|
config_.max_block_size + config_.max_collated_data_size + 1024);
|
||||||
auto key = PublicKey{b->source_};
|
}));
|
||||||
if (key != local_id_full_) {
|
TRY_RESULT_PROMISE(promise, candidate, std::move(res));
|
||||||
|
if (candidate.pubkey.as_bits256() != local_id_full_.ed25519_value().raw()) {
|
||||||
promise.set_error(td::Status::Error("collate query: block candidate source mismatch"));
|
promise.set_error(td::Status::Error("collate query: block candidate source mismatch"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()};
|
if (candidate.id.shard_full() != shard_) {
|
||||||
auto block_id = ton::create_block_id(b->id_);
|
|
||||||
if (block_id.shard_full() != shard_) {
|
|
||||||
promise.set_error(td::Status::Error("collate query: shard mismatch"));
|
promise.set_error(td::Status::Error("collate query: shard mismatch"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto collated_data_hash = td::sha256_bits256(b->collated_data_);
|
|
||||||
BlockCandidate candidate(e_key, block_id, collated_data_hash, std::move(b->data_), std::move(b->collated_data_));
|
|
||||||
|
|
||||||
auto P = td::PromiseCreator::lambda(
|
auto P = td::PromiseCreator::lambda(
|
||||||
[candidate = candidate.clone(), promise = std::move(promise)](td::Result<UnixTime> R) mutable {
|
[candidate = candidate.clone(), promise = std::move(promise)](td::Result<UnixTime> R) mutable {
|
||||||
|
|
Loading…
Reference in a new issue