mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			334 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
    This file is part of TON Blockchain Library.
 | 
						|
 | 
						|
    TON Blockchain Library is free software: you can redistribute it and/or modify
 | 
						|
    it under the terms of the GNU Lesser General Public License as published by
 | 
						|
    the Free Software Foundation, either version 2 of the License, or
 | 
						|
    (at your option) any later version.
 | 
						|
 | 
						|
    TON Blockchain Library is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU Lesser General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public License
 | 
						|
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
    Copyright 2017-2020 Telegram Systems LLP
 | 
						|
*/
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include "td/utils/common.h"
 | 
						|
#include "td/utils/HazardPointers.h"
 | 
						|
#include "td/utils/logging.h"
 | 
						|
#include "td/utils/port/thread_local.h"
 | 
						|
 | 
						|
#include <atomic>
 | 
						|
#include <condition_variable>
 | 
						|
#include <mutex>
 | 
						|
 | 
						|
namespace td {
 | 
						|
 | 
						|
// AtomicHashArray<KeyT, ValueT>
 | 
						|
// Building block for other concurrent hash maps
 | 
						|
//
 | 
						|
// Support one operation:
 | 
						|
//  template <class F>
 | 
						|
//  bool with_value(KeyT key, bool should_create, F &&func);
 | 
						|
//
 | 
						|
//  Finds slot for key, and call func(value)
 | 
						|
//  Creates slot if should_create is true.
 | 
						|
//  Returns true if func was called.
 | 
						|
//
 | 
						|
//  Concurrent calls with the same key may result in concurrent calls to func(value)
 | 
						|
//  It is responsibility of the caller to handle such races.
 | 
						|
//
 | 
						|
//  Key should already be random
 | 
						|
//  It is responsibility of the caller to provide unique random key.
 | 
						|
//  One may use injective hash function, or handle collisions in some other way.
 | 
						|
 | 
						|
template <class KeyT, class ValueT>
 | 
						|
class AtomicHashArray {
 | 
						|
 public:
 | 
						|
  explicit AtomicHashArray(size_t n) : nodes_(n) {
 | 
						|
  }
 | 
						|
  struct Node {
 | 
						|
    std::atomic<KeyT> key{KeyT{}};
 | 
						|
    ValueT value{};
 | 
						|
  };
 | 
						|
  size_t size() const {
 | 
						|
    return nodes_.size();
 | 
						|
  }
 | 
						|
  Node &node_at(size_t i) {
 | 
						|
    return nodes_[i];
 | 
						|
  }
 | 
						|
  static KeyT empty_key() {
 | 
						|
    return KeyT{};
 | 
						|
  }
 | 
						|
 | 
						|
  template <class F>
 | 
						|
  bool with_value(KeyT key, bool should_create, F &&f) {
 | 
						|
    DCHECK(key != empty_key());
 | 
						|
    size_t pos = static_cast<size_t>(key) % nodes_.size();
 | 
						|
    size_t n = td::min(td::max(static_cast<size_t>(300), nodes_.size() / 16 + 2), nodes_.size());
 | 
						|
 | 
						|
    for (size_t i = 0; i < n; i++) {
 | 
						|
      pos++;
 | 
						|
      if (pos >= nodes_.size()) {
 | 
						|
        pos = 0;
 | 
						|
      }
 | 
						|
      auto &node = nodes_[pos];
 | 
						|
      while (true) {
 | 
						|
        auto node_key = node.key.load(std::memory_order_acquire);
 | 
						|
        if (node_key == empty_key()) {
 | 
						|
          if (!should_create) {
 | 
						|
            return false;
 | 
						|
          }
 | 
						|
          KeyT expected_key = empty_key();
 | 
						|
          if (node.key.compare_exchange_strong(expected_key, key, std::memory_order_relaxed,
 | 
						|
                                               std::memory_order_relaxed)) {
 | 
						|
            f(node.value);
 | 
						|
            return true;
 | 
						|
          }
 | 
						|
        } else if (node_key == key) {
 | 
						|
          f(node.value);
 | 
						|
          return true;
 | 
						|
        } else {
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  std::vector<Node> nodes_;
 | 
						|
};
 | 
						|
 | 
						|
// Simple concurrent hash map with multiple limitations
 | 
						|
template <class KeyT, class ValueT>
 | 
						|
class ConcurrentHashMap {
 | 
						|
  using HashMap = AtomicHashArray<KeyT, std::atomic<ValueT>>;
 | 
						|
  static HazardPointers<HashMap> hp_;
 | 
						|
 | 
						|
 public:
 | 
						|
  explicit ConcurrentHashMap(size_t n = 32) {
 | 
						|
    n = 1;
 | 
						|
    hash_map_.store(make_unique<HashMap>(n).release());
 | 
						|
  }
 | 
						|
  ConcurrentHashMap(const ConcurrentHashMap &) = delete;
 | 
						|
  ConcurrentHashMap &operator=(const ConcurrentHashMap &) = delete;
 | 
						|
  ConcurrentHashMap(ConcurrentHashMap &&) = delete;
 | 
						|
  ConcurrentHashMap &operator=(ConcurrentHashMap &&) = delete;
 | 
						|
  ~ConcurrentHashMap() {
 | 
						|
    unique_ptr<HashMap>(hash_map_.load()).reset();
 | 
						|
  }
 | 
						|
 | 
						|
  static std::string get_name() {
 | 
						|
    return "ConcurrrentHashMap";
 | 
						|
  }
 | 
						|
 | 
						|
  static KeyT empty_key() {
 | 
						|
    return KeyT{};
 | 
						|
  }
 | 
						|
  static ValueT empty_value() {
 | 
						|
    return ValueT{};
 | 
						|
  }
 | 
						|
  static ValueT migrate_value() {
 | 
						|
    return (ValueT)(1);  // c-style conversion because reinterpret_cast<int>(1) is CE in MSVC
 | 
						|
  }
 | 
						|
 | 
						|
  ValueT insert(KeyT key, ValueT value) {
 | 
						|
    CHECK(key != empty_key());
 | 
						|
    CHECK(value != migrate_value());
 | 
						|
    typename HazardPointers<HashMap>::Holder holder(hp_, get_thread_id(), 0);
 | 
						|
    while (true) {
 | 
						|
      auto hash_map = holder.protect(hash_map_);
 | 
						|
      if (!hash_map) {
 | 
						|
        do_migrate(nullptr);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      bool ok = false;
 | 
						|
      ValueT inserted_value;
 | 
						|
      hash_map->with_value(key, true, [&](auto &node_value) {
 | 
						|
        ValueT expected_value = this->empty_value();
 | 
						|
        if (node_value.compare_exchange_strong(expected_value, value, std::memory_order_release,
 | 
						|
                                               std::memory_order_acquire)) {
 | 
						|
          ok = true;
 | 
						|
          inserted_value = value;
 | 
						|
        } else {
 | 
						|
          if (expected_value == this->migrate_value()) {
 | 
						|
            ok = false;
 | 
						|
          } else {
 | 
						|
            ok = true;
 | 
						|
            inserted_value = expected_value;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      });
 | 
						|
      if (ok) {
 | 
						|
        return inserted_value;
 | 
						|
      }
 | 
						|
      do_migrate(hash_map);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  ValueT find(KeyT key, ValueT value) {
 | 
						|
    typename HazardPointers<HashMap>::Holder holder(hp_, get_thread_id(), 0);
 | 
						|
    while (true) {
 | 
						|
      auto hash_map = holder.protect(hash_map_);
 | 
						|
      if (!hash_map) {
 | 
						|
        do_migrate(nullptr);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      bool has_value = hash_map->with_value(
 | 
						|
          key, false, [&](auto &node_value) { value = node_value.load(std::memory_order_acquire); });
 | 
						|
      if (!has_value || value != migrate_value()) {
 | 
						|
        return value;
 | 
						|
      }
 | 
						|
      do_migrate(hash_map);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  template <class F>
 | 
						|
  void for_each(F &&f) {
 | 
						|
    auto hash_map = hash_map_.load();
 | 
						|
    CHECK(hash_map);
 | 
						|
    auto size = hash_map->size();
 | 
						|
    for (size_t i = 0; i < size; i++) {
 | 
						|
      auto &node = hash_map->node_at(i);
 | 
						|
      auto key = node.key.load(std::memory_order_relaxed);
 | 
						|
      auto value = node.value.load(std::memory_order_relaxed);
 | 
						|
 | 
						|
      if (key != empty_key()) {
 | 
						|
        CHECK(value != migrate_value());
 | 
						|
        if (value != empty_value()) {
 | 
						|
          f(key, value);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  // use no padding intentionally
 | 
						|
  std::atomic<HashMap *> hash_map_{nullptr};
 | 
						|
 | 
						|
  std::mutex migrate_mutex_;
 | 
						|
  std::condition_variable migrate_cv_;
 | 
						|
 | 
						|
  int migrate_cnt_{0};
 | 
						|
  int migrate_generation_{0};
 | 
						|
  HashMap *migrate_from_hash_map_{nullptr};
 | 
						|
  HashMap *migrate_to_hash_map_{nullptr};
 | 
						|
  struct Task {
 | 
						|
    size_t begin;
 | 
						|
    size_t end;
 | 
						|
    bool empty() const {
 | 
						|
      return begin >= end;
 | 
						|
    }
 | 
						|
    size_t size() const {
 | 
						|
      if (empty()) {
 | 
						|
        return 0;
 | 
						|
      }
 | 
						|
      return end - begin;
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
  struct TaskCreator {
 | 
						|
    size_t chunk_size;
 | 
						|
    size_t size;
 | 
						|
    std::atomic<size_t> pos{0};
 | 
						|
    Task create() {
 | 
						|
      auto i = pos++;
 | 
						|
      auto begin = i * chunk_size;
 | 
						|
      auto end = begin + chunk_size;
 | 
						|
      if (end > size) {
 | 
						|
        end = size;
 | 
						|
      }
 | 
						|
      return {begin, end};
 | 
						|
    }
 | 
						|
  };
 | 
						|
  TaskCreator task_creator;
 | 
						|
 | 
						|
  void do_migrate(HashMap *ptr) {
 | 
						|
    //LOG(ERROR) << "In do_migrate: " << ptr;
 | 
						|
    std::unique_lock<std::mutex> lock(migrate_mutex_);
 | 
						|
    if (hash_map_.load() != ptr) {
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    init_migrate();
 | 
						|
    CHECK(!ptr || migrate_from_hash_map_ == ptr);
 | 
						|
    migrate_cnt_++;
 | 
						|
    auto migrate_generation = migrate_generation_;
 | 
						|
    lock.unlock();
 | 
						|
 | 
						|
    run_migrate();
 | 
						|
 | 
						|
    lock.lock();
 | 
						|
    migrate_cnt_--;
 | 
						|
    if (migrate_cnt_ == 0) {
 | 
						|
      finish_migrate();
 | 
						|
    }
 | 
						|
    migrate_cv_.wait(lock, [&] { return migrate_generation_ != migrate_generation; });
 | 
						|
  }
 | 
						|
 | 
						|
  void finish_migrate() {
 | 
						|
    //LOG(ERROR) << "In finish_migrate";
 | 
						|
    hash_map_.store(migrate_to_hash_map_);
 | 
						|
    hp_.retire(get_thread_id(), migrate_from_hash_map_);
 | 
						|
    migrate_from_hash_map_ = nullptr;
 | 
						|
    migrate_to_hash_map_ = nullptr;
 | 
						|
    migrate_generation_++;
 | 
						|
    migrate_cv_.notify_all();
 | 
						|
  }
 | 
						|
 | 
						|
  void init_migrate() {
 | 
						|
    if (migrate_from_hash_map_ != nullptr) {
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    //LOG(ERROR) << "In init_migrate";
 | 
						|
    CHECK(migrate_cnt_ == 0);
 | 
						|
    migrate_generation_++;
 | 
						|
    migrate_from_hash_map_ = hash_map_.exchange(nullptr);
 | 
						|
    auto new_size = migrate_from_hash_map_->size() * 2;
 | 
						|
    migrate_to_hash_map_ = make_unique<HashMap>(new_size).release();
 | 
						|
    task_creator.chunk_size = 100;
 | 
						|
    task_creator.size = migrate_from_hash_map_->size();
 | 
						|
    task_creator.pos = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  void run_migrate() {
 | 
						|
    //LOG(ERROR) << "In run_migrate";
 | 
						|
    size_t cnt = 0;
 | 
						|
    while (true) {
 | 
						|
      auto task = task_creator.create();
 | 
						|
      cnt += task.size();
 | 
						|
      if (task.empty()) {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      run_task(task);
 | 
						|
    }
 | 
						|
    //LOG(ERROR) << "In run_migrate " << cnt;
 | 
						|
  }
 | 
						|
 | 
						|
  void run_task(Task task) {
 | 
						|
    for (auto i = task.begin; i < task.end; i++) {
 | 
						|
      auto &node = migrate_from_hash_map_->node_at(i);
 | 
						|
      auto old_value = node.value.exchange(migrate_value(), std::memory_order_acq_rel);
 | 
						|
      if (old_value == 0) {
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
      auto node_key = node.key.load(std::memory_order_relaxed);
 | 
						|
      //LOG(ERROR) << node_key << " " << node_key;
 | 
						|
      auto ok = migrate_to_hash_map_->with_value(
 | 
						|
          node_key, true, [&](auto &node_value) { node_value.store(old_value, std::memory_order_relaxed); });
 | 
						|
      LOG_CHECK(ok) << "Migration overflow";
 | 
						|
    }
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
template <class KeyT, class ValueT>
 | 
						|
td::HazardPointers<typename ConcurrentHashMap<KeyT, ValueT>::HashMap> ConcurrentHashMap<KeyT, ValueT>::hp_(64);
 | 
						|
 | 
						|
}  // namespace td
 |