mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	Make asynchronous celldb interface (#388)
* Asynchronous load_cell in celldb Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
		
							parent
							
								
									ca00f0ed91
								
							
						
					
					
						commit
						845cbca1e5
					
				
					 5 changed files with 159 additions and 10 deletions
				
			
		| 
						 | 
				
			
			@ -64,6 +64,13 @@ class CellHashTable {
 | 
			
		|||
  size_t size() const {
 | 
			
		||||
    return set_.size();
 | 
			
		||||
  }
 | 
			
		||||
  InfoT* get_if_exists(td::Slice hash) {
 | 
			
		||||
    auto it = set_.find(hash);
 | 
			
		||||
    if (it != set_.end()) {
 | 
			
		||||
      return &const_cast<InfoT &>(*it);
 | 
			
		||||
    }
 | 
			
		||||
    return nullptr;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 private:
 | 
			
		||||
  std::set<InfoT, std::less<>> set_;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -86,6 +86,40 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
 | 
			
		|||
    TRY_RESULT(loaded_cell, get_cell_info_force(hash).cell->load_cell());
 | 
			
		||||
    return std::move(loaded_cell.data_cell);
 | 
			
		||||
  }
 | 
			
		||||
  void load_cell_async(td::Slice hash, std::shared_ptr<AsyncExecutor> executor,
 | 
			
		||||
                       td::Promise<Ref<DataCell>> promise) override {
 | 
			
		||||
    auto info = hash_table_.get_if_exists(hash);
 | 
			
		||||
    if (info && info->sync_with_db) {
 | 
			
		||||
      TRY_RESULT_PROMISE(promise, loaded_cell, info->cell->load_cell());
 | 
			
		||||
      promise.set_result(loaded_cell.data_cell);
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    SimpleExtCellCreator ext_cell_creator(cell_db_reader_);
 | 
			
		||||
    auto promise_ptr = std::make_shared<td::Promise<Ref<DataCell>>>(std::move(promise));
 | 
			
		||||
    executor->execute_async(
 | 
			
		||||
        [executor, loader = *loader_, hash = CellHash::from_slice(hash), db = this,
 | 
			
		||||
         ext_cell_creator = std::move(ext_cell_creator), promise = std::move(promise_ptr)]() mutable {
 | 
			
		||||
          TRY_RESULT_PROMISE((*promise), res, loader.load(hash.as_slice(), true, ext_cell_creator));
 | 
			
		||||
          if (res.status != CellLoader::LoadResult::Ok) {
 | 
			
		||||
            promise->set_error(td::Status::Error("cell not found"));
 | 
			
		||||
            return;
 | 
			
		||||
          }
 | 
			
		||||
          Ref<Cell> cell = res.cell();
 | 
			
		||||
          executor->execute_sync([hash, db, res = std::move(res),
 | 
			
		||||
                                  ext_cell_creator = std::move(ext_cell_creator)]() mutable {
 | 
			
		||||
            db->hash_table_.apply(hash.as_slice(), [&](CellInfo &info) {
 | 
			
		||||
              db->update_cell_info_loaded(info, hash.as_slice(), std::move(res));
 | 
			
		||||
            });
 | 
			
		||||
            for (auto &ext_cell : ext_cell_creator.get_created_cells()) {
 | 
			
		||||
              auto ext_cell_hash = ext_cell->get_hash();
 | 
			
		||||
              db->hash_table_.apply(ext_cell_hash.as_slice(), [&](CellInfo &info) {
 | 
			
		||||
                db->update_cell_info_created_ext(info, std::move(ext_cell));
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
          });
 | 
			
		||||
          promise->set_result(std::move(cell));
 | 
			
		||||
        });
 | 
			
		||||
  }
 | 
			
		||||
  CellInfo &get_cell_info_force(td::Slice hash) {
 | 
			
		||||
    return hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_force(info, hash); });
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -198,6 +232,27 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
 | 
			
		|||
    return res;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  class SimpleExtCellCreator : public ExtCellCreator {
 | 
			
		||||
   public:
 | 
			
		||||
    explicit SimpleExtCellCreator(std::shared_ptr<CellDbReader> cell_db_reader) :
 | 
			
		||||
        cell_db_reader_(std::move(cell_db_reader)) {}
 | 
			
		||||
 | 
			
		||||
    td::Result<Ref<Cell>> ext_cell(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) override {
 | 
			
		||||
      TRY_RESULT(ext_cell, DynamicBocExtCell::create(PrunnedCellInfo{level_mask, hash, depth},
 | 
			
		||||
                                                     DynamicBocExtCellExtra{cell_db_reader_}));
 | 
			
		||||
      created_cells_.push_back(ext_cell);
 | 
			
		||||
      return std::move(ext_cell);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::vector<Ref<Cell>>& get_created_cells() {
 | 
			
		||||
      return created_cells_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
   private:
 | 
			
		||||
    std::vector<Ref<Cell>> created_cells_;
 | 
			
		||||
    std::shared_ptr<CellDbReader> cell_db_reader_;
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  class CellDbReaderImpl : public CellDbReader,
 | 
			
		||||
                           private ExtCellCreator,
 | 
			
		||||
                           public std::enable_shared_from_this<CellDbReaderImpl> {
 | 
			
		||||
| 
						 | 
				
			
			@ -484,6 +539,33 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
 | 
			
		|||
    info.sync_with_db = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // same as update_cell_info_force, but with cell provided by a caller
 | 
			
		||||
  void update_cell_info_loaded(CellInfo &info, td::Slice hash, CellLoader::LoadResult res) {
 | 
			
		||||
    if (info.sync_with_db) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    DCHECK(res.status == CellLoader::LoadResult::Ok);
 | 
			
		||||
    info.cell = std::move(res.cell());
 | 
			
		||||
    CHECK(info.cell->get_hash().as_slice() == hash);
 | 
			
		||||
    info.in_db = true;
 | 
			
		||||
    info.db_refcnt = res.refcnt();
 | 
			
		||||
    info.sync_with_db = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // same as update_cell_info_lazy, but with cell provided by a caller
 | 
			
		||||
  void update_cell_info_created_ext(CellInfo &info, Ref<Cell> cell) {
 | 
			
		||||
    if (info.sync_with_db) {
 | 
			
		||||
      CHECK(info.cell.not_null());
 | 
			
		||||
      CHECK(info.cell->get_level_mask() == cell->get_level_mask());
 | 
			
		||||
      CHECK(info.cell->get_hash() == cell->get_hash());
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    if (info.cell.is_null()) {
 | 
			
		||||
      info.cell = std::move(cell);
 | 
			
		||||
      info.in_db = true;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  td::Result<Ref<Cell>> create_empty_ext_cell(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) {
 | 
			
		||||
    TRY_RESULT(res, DynamicBocExtCell::create(PrunnedCellInfo{level_mask, hash, depth},
 | 
			
		||||
                                              DynamicBocExtCellExtra{cell_db_reader_}));
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,6 +21,7 @@
 | 
			
		|||
 | 
			
		||||
#include "td/utils/Slice.h"
 | 
			
		||||
#include "td/utils/Status.h"
 | 
			
		||||
#include "td/actor/PromiseFuture.h"
 | 
			
		||||
 | 
			
		||||
namespace vm {
 | 
			
		||||
class CellLoader;
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +65,16 @@ class DynamicBagOfCellsDb {
 | 
			
		|||
  virtual td::Status set_loader(std::unique_ptr<CellLoader> loader) = 0;
 | 
			
		||||
 | 
			
		||||
  static std::unique_ptr<DynamicBagOfCellsDb> create();
 | 
			
		||||
 | 
			
		||||
  class AsyncExecutor {
 | 
			
		||||
   public:
 | 
			
		||||
    virtual ~AsyncExecutor() {}
 | 
			
		||||
    virtual void execute_async(std::function<void()> f) = 0;
 | 
			
		||||
    virtual void execute_sync(std::function<void()> f) = 0;
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  virtual void load_cell_async(td::Slice hash, std::shared_ptr<AsyncExecutor> executor,
 | 
			
		||||
                               td::Promise<Ref<DataCell>> promise) = 0;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace vm
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,11 +28,46 @@ namespace ton {
 | 
			
		|||
 | 
			
		||||
namespace validator {
 | 
			
		||||
 | 
			
		||||
class CellDbAsyncExecutor : public vm::DynamicBagOfCellsDb::AsyncExecutor {
 | 
			
		||||
 public:
 | 
			
		||||
  explicit CellDbAsyncExecutor(td::actor::ActorId<CellDbBase> cell_db) : cell_db_(std::move(cell_db)) {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void execute_async(std::function<void()> f) {
 | 
			
		||||
    class Runner : public td::actor::Actor {
 | 
			
		||||
     public:
 | 
			
		||||
      explicit Runner(std::function<void()> f) : f_(std::move(f)) {}
 | 
			
		||||
      void start_up() {
 | 
			
		||||
        f_();
 | 
			
		||||
        stop();
 | 
			
		||||
      }
 | 
			
		||||
     private:
 | 
			
		||||
      std::function<void()> f_;
 | 
			
		||||
    };
 | 
			
		||||
    td::actor::create_actor<Runner>("executeasync", std::move(f)).release();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void execute_sync(std::function<void()> f) {
 | 
			
		||||
    td::actor::send_closure(cell_db_, &CellDbBase::execute_sync, std::move(f));
 | 
			
		||||
  }
 | 
			
		||||
 private:
 | 
			
		||||
  td::actor::ActorId<CellDbBase> cell_db_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
void CellDbBase::start_up() {
 | 
			
		||||
  async_executor = std::make_shared<CellDbAsyncExecutor>(actor_id(this));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void CellDbBase::execute_sync(std::function<void()> f) {
 | 
			
		||||
  f();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path)
 | 
			
		||||
    : root_db_(root_db), parent_(parent), path_(std::move(path)) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void CellDbIn::start_up() {
 | 
			
		||||
  CellDbBase::start_up();
 | 
			
		||||
  cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_).move_as_ok());
 | 
			
		||||
 | 
			
		||||
  boc_ = vm::DynamicBagOfCellsDb::create();
 | 
			
		||||
| 
						 | 
				
			
			@ -52,8 +87,7 @@ void CellDbIn::start_up() {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
void CellDbIn::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
 | 
			
		||||
  td::PerfWarningTimer{"loadcell", 0.1};
 | 
			
		||||
  promise.set_result(boc_->load_cell(hash.as_slice()));
 | 
			
		||||
  boc_->load_cell_async(hash.as_slice(), async_executor, std::move(promise));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise) {
 | 
			
		||||
| 
						 | 
				
			
			@ -255,12 +289,15 @@ void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise
 | 
			
		|||
  if (!started_) {
 | 
			
		||||
    td::actor::send_closure(cell_db_, &CellDbIn::load_cell, hash, std::move(promise));
 | 
			
		||||
  } else {
 | 
			
		||||
    auto R = boc_->load_cell(hash.as_slice());
 | 
			
		||||
    auto P = td::PromiseCreator::lambda(
 | 
			
		||||
        [cell_db_in = cell_db_.get(), hash, promise = std::move(promise)](td::Result<td::Ref<vm::DataCell>> R) mutable {
 | 
			
		||||
          if (R.is_error()) {
 | 
			
		||||
      td::actor::send_closure(cell_db_, &CellDbIn::load_cell, hash, std::move(promise));
 | 
			
		||||
            td::actor::send_closure(cell_db_in, &CellDbIn::load_cell, hash, std::move(promise));
 | 
			
		||||
          } else {
 | 
			
		||||
            promise.set_result(R.move_as_ok());
 | 
			
		||||
          }
 | 
			
		||||
    });
 | 
			
		||||
    boc_->load_cell_async(hash.as_slice(), async_executor, std::move(P));
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -273,6 +310,7 @@ void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
void CellDb::start_up() {
 | 
			
		||||
  CellDbBase::start_up();
 | 
			
		||||
  boc_ = vm::DynamicBagOfCellsDb::create();
 | 
			
		||||
  cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,8 +33,19 @@ namespace validator {
 | 
			
		|||
class RootDb;
 | 
			
		||||
 | 
			
		||||
class CellDb;
 | 
			
		||||
class CellDbAsyncExecutor;
 | 
			
		||||
 | 
			
		||||
class CellDbIn : public td::actor::Actor {
 | 
			
		||||
class CellDbBase : public td::actor::Actor {
 | 
			
		||||
 public:
 | 
			
		||||
  virtual void start_up();
 | 
			
		||||
 protected:
 | 
			
		||||
  std::shared_ptr<vm::DynamicBagOfCellsDb::AsyncExecutor> async_executor;
 | 
			
		||||
 private:
 | 
			
		||||
  void execute_sync(std::function<void()> f);
 | 
			
		||||
  friend CellDbAsyncExecutor;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class CellDbIn : public CellDbBase {
 | 
			
		||||
 public:
 | 
			
		||||
  using KeyHash = td::Bits256;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -89,7 +100,7 @@ class CellDbIn : public td::actor::Actor {
 | 
			
		|||
  KeyHash last_gc_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class CellDb : public td::actor::Actor {
 | 
			
		||||
class CellDb : public CellDbBase {
 | 
			
		||||
 public:
 | 
			
		||||
  void load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise);
 | 
			
		||||
  void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue