mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			334 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|     This file is part of TON Blockchain Library.
 | |
| 
 | |
|     TON Blockchain Library is free software: you can redistribute it and/or modify
 | |
|     it under the terms of the GNU Lesser General Public License as published by
 | |
|     the Free Software Foundation, either version 2 of the License, or
 | |
|     (at your option) any later version.
 | |
| 
 | |
|     TON Blockchain Library is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU Lesser General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU Lesser General Public License
 | |
|     along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
|     Copyright 2017-2020 Telegram Systems LLP
 | |
| */
 | |
| #pragma once
 | |
| 
 | |
| #include "td/utils/common.h"
 | |
| #include "td/utils/HazardPointers.h"
 | |
| #include "td/utils/logging.h"
 | |
| #include "td/utils/port/thread_local.h"
 | |
| 
 | |
| #include <atomic>
 | |
| #include <condition_variable>
 | |
| #include <mutex>
 | |
| 
 | |
| namespace td {
 | |
| 
 | |
| // AtomicHashArray<KeyT, ValueT>
 | |
| // Building block for other concurrent hash maps
 | |
| //
 | |
| // Support one operation:
 | |
| //  template <class F>
 | |
| //  bool with_value(KeyT key, bool should_create, F &&func);
 | |
| //
 | |
| //  Finds slot for key, and call func(value)
 | |
| //  Creates slot if should_create is true.
 | |
| //  Returns true if func was called.
 | |
| //
 | |
| //  Concurrent calls with the same key may result in concurrent calls to func(value)
 | |
| //  It is responsibility of the caller to handle such races.
 | |
| //
 | |
| //  Key should already be random
 | |
| //  It is responsibility of the caller to provide unique random key.
 | |
| //  One may use injective hash function, or handle collisions in some other way.
 | |
| 
 | |
| template <class KeyT, class ValueT>
 | |
| class AtomicHashArray {
 | |
|  public:
 | |
|   explicit AtomicHashArray(size_t n) : nodes_(n) {
 | |
|   }
 | |
|   struct Node {
 | |
|     std::atomic<KeyT> key{KeyT{}};
 | |
|     ValueT value{};
 | |
|   };
 | |
|   size_t size() const {
 | |
|     return nodes_.size();
 | |
|   }
 | |
|   Node &node_at(size_t i) {
 | |
|     return nodes_[i];
 | |
|   }
 | |
|   static KeyT empty_key() {
 | |
|     return KeyT{};
 | |
|   }
 | |
| 
 | |
|   template <class F>
 | |
|   bool with_value(KeyT key, bool should_create, F &&f) {
 | |
|     DCHECK(key != empty_key());
 | |
|     size_t pos = static_cast<size_t>(key) % nodes_.size();
 | |
|     size_t n = td::min(td::max(static_cast<size_t>(300), nodes_.size() / 16 + 2), nodes_.size());
 | |
| 
 | |
|     for (size_t i = 0; i < n; i++) {
 | |
|       pos++;
 | |
|       if (pos >= nodes_.size()) {
 | |
|         pos = 0;
 | |
|       }
 | |
|       auto &node = nodes_[pos];
 | |
|       while (true) {
 | |
|         auto node_key = node.key.load(std::memory_order_acquire);
 | |
|         if (node_key == empty_key()) {
 | |
|           if (!should_create) {
 | |
|             return false;
 | |
|           }
 | |
|           KeyT expected_key = empty_key();
 | |
|           if (node.key.compare_exchange_strong(expected_key, key, std::memory_order_relaxed,
 | |
|                                                std::memory_order_relaxed)) {
 | |
|             f(node.value);
 | |
|             return true;
 | |
|           }
 | |
|         } else if (node_key == key) {
 | |
|           f(node.value);
 | |
|           return true;
 | |
|         } else {
 | |
|           break;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::vector<Node> nodes_;
 | |
| };
 | |
| 
 | |
| // Simple concurrent hash map with multiple limitations
 | |
| template <class KeyT, class ValueT>
 | |
| class ConcurrentHashMap {
 | |
|   using HashMap = AtomicHashArray<KeyT, std::atomic<ValueT>>;
 | |
|   static HazardPointers<HashMap> hp_;
 | |
| 
 | |
|  public:
 | |
|   explicit ConcurrentHashMap(size_t n = 32) {
 | |
|     n = 1;
 | |
|     hash_map_.store(make_unique<HashMap>(n).release());
 | |
|   }
 | |
|   ConcurrentHashMap(const ConcurrentHashMap &) = delete;
 | |
|   ConcurrentHashMap &operator=(const ConcurrentHashMap &) = delete;
 | |
|   ConcurrentHashMap(ConcurrentHashMap &&) = delete;
 | |
|   ConcurrentHashMap &operator=(ConcurrentHashMap &&) = delete;
 | |
|   ~ConcurrentHashMap() {
 | |
|     unique_ptr<HashMap>(hash_map_.load()).reset();
 | |
|   }
 | |
| 
 | |
|   static std::string get_name() {
 | |
|     return "ConcurrrentHashMap";
 | |
|   }
 | |
| 
 | |
|   static KeyT empty_key() {
 | |
|     return KeyT{};
 | |
|   }
 | |
|   static ValueT empty_value() {
 | |
|     return ValueT{};
 | |
|   }
 | |
|   static ValueT migrate_value() {
 | |
|     return (ValueT)(1);  // c-style conversion because reinterpret_cast<int>(1) is CE in MSVC
 | |
|   }
 | |
| 
 | |
|   ValueT insert(KeyT key, ValueT value) {
 | |
|     CHECK(key != empty_key());
 | |
|     CHECK(value != migrate_value());
 | |
|     typename HazardPointers<HashMap>::Holder holder(hp_, get_thread_id(), 0);
 | |
|     while (true) {
 | |
|       auto hash_map = holder.protect(hash_map_);
 | |
|       if (!hash_map) {
 | |
|         do_migrate(nullptr);
 | |
|         continue;
 | |
|       }
 | |
| 
 | |
|       bool ok = false;
 | |
|       ValueT inserted_value;
 | |
|       hash_map->with_value(key, true, [&](auto &node_value) {
 | |
|         ValueT expected_value = this->empty_value();
 | |
|         if (node_value.compare_exchange_strong(expected_value, value, std::memory_order_release,
 | |
|                                                std::memory_order_acquire)) {
 | |
|           ok = true;
 | |
|           inserted_value = value;
 | |
|         } else {
 | |
|           if (expected_value == this->migrate_value()) {
 | |
|             ok = false;
 | |
|           } else {
 | |
|             ok = true;
 | |
|             inserted_value = expected_value;
 | |
|           }
 | |
|         }
 | |
|       });
 | |
|       if (ok) {
 | |
|         return inserted_value;
 | |
|       }
 | |
|       do_migrate(hash_map);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ValueT find(KeyT key, ValueT value) {
 | |
|     typename HazardPointers<HashMap>::Holder holder(hp_, get_thread_id(), 0);
 | |
|     while (true) {
 | |
|       auto hash_map = holder.protect(hash_map_);
 | |
|       if (!hash_map) {
 | |
|         do_migrate(nullptr);
 | |
|         continue;
 | |
|       }
 | |
| 
 | |
|       bool has_value = hash_map->with_value(
 | |
|           key, false, [&](auto &node_value) { value = node_value.load(std::memory_order_acquire); });
 | |
|       if (!has_value || value != migrate_value()) {
 | |
|         return value;
 | |
|       }
 | |
|       do_migrate(hash_map);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   template <class F>
 | |
|   void for_each(F &&f) {
 | |
|     auto hash_map = hash_map_.load();
 | |
|     CHECK(hash_map);
 | |
|     auto size = hash_map->size();
 | |
|     for (size_t i = 0; i < size; i++) {
 | |
|       auto &node = hash_map->node_at(i);
 | |
|       auto key = node.key.load(std::memory_order_relaxed);
 | |
|       auto value = node.value.load(std::memory_order_relaxed);
 | |
| 
 | |
|       if (key != empty_key()) {
 | |
|         CHECK(value != migrate_value());
 | |
|         if (value != empty_value()) {
 | |
|           f(key, value);
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   // use no padding intentionally
 | |
|   std::atomic<HashMap *> hash_map_{nullptr};
 | |
| 
 | |
|   std::mutex migrate_mutex_;
 | |
|   std::condition_variable migrate_cv_;
 | |
| 
 | |
|   int migrate_cnt_{0};
 | |
|   int migrate_generation_{0};
 | |
|   HashMap *migrate_from_hash_map_{nullptr};
 | |
|   HashMap *migrate_to_hash_map_{nullptr};
 | |
|   struct Task {
 | |
|     size_t begin;
 | |
|     size_t end;
 | |
|     bool empty() const {
 | |
|       return begin >= end;
 | |
|     }
 | |
|     size_t size() const {
 | |
|       if (empty()) {
 | |
|         return 0;
 | |
|       }
 | |
|       return end - begin;
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   struct TaskCreator {
 | |
|     size_t chunk_size;
 | |
|     size_t size;
 | |
|     std::atomic<size_t> pos{0};
 | |
|     Task create() {
 | |
|       auto i = pos++;
 | |
|       auto begin = i * chunk_size;
 | |
|       auto end = begin + chunk_size;
 | |
|       if (end > size) {
 | |
|         end = size;
 | |
|       }
 | |
|       return {begin, end};
 | |
|     }
 | |
|   };
 | |
|   TaskCreator task_creator;
 | |
| 
 | |
|   void do_migrate(HashMap *ptr) {
 | |
|     //LOG(ERROR) << "In do_migrate: " << ptr;
 | |
|     std::unique_lock<std::mutex> lock(migrate_mutex_);
 | |
|     if (hash_map_.load() != ptr) {
 | |
|       return;
 | |
|     }
 | |
|     init_migrate();
 | |
|     CHECK(!ptr || migrate_from_hash_map_ == ptr);
 | |
|     migrate_cnt_++;
 | |
|     auto migrate_generation = migrate_generation_;
 | |
|     lock.unlock();
 | |
| 
 | |
|     run_migrate();
 | |
| 
 | |
|     lock.lock();
 | |
|     migrate_cnt_--;
 | |
|     if (migrate_cnt_ == 0) {
 | |
|       finish_migrate();
 | |
|     }
 | |
|     migrate_cv_.wait(lock, [&] { return migrate_generation_ != migrate_generation; });
 | |
|   }
 | |
| 
 | |
|   void finish_migrate() {
 | |
|     //LOG(ERROR) << "In finish_migrate";
 | |
|     hash_map_.store(migrate_to_hash_map_);
 | |
|     hp_.retire(get_thread_id(), migrate_from_hash_map_);
 | |
|     migrate_from_hash_map_ = nullptr;
 | |
|     migrate_to_hash_map_ = nullptr;
 | |
|     migrate_generation_++;
 | |
|     migrate_cv_.notify_all();
 | |
|   }
 | |
| 
 | |
|   void init_migrate() {
 | |
|     if (migrate_from_hash_map_ != nullptr) {
 | |
|       return;
 | |
|     }
 | |
|     //LOG(ERROR) << "In init_migrate";
 | |
|     CHECK(migrate_cnt_ == 0);
 | |
|     migrate_generation_++;
 | |
|     migrate_from_hash_map_ = hash_map_.exchange(nullptr);
 | |
|     auto new_size = migrate_from_hash_map_->size() * 2;
 | |
|     migrate_to_hash_map_ = make_unique<HashMap>(new_size).release();
 | |
|     task_creator.chunk_size = 100;
 | |
|     task_creator.size = migrate_from_hash_map_->size();
 | |
|     task_creator.pos = 0;
 | |
|   }
 | |
| 
 | |
|   void run_migrate() {
 | |
|     //LOG(ERROR) << "In run_migrate";
 | |
|     size_t cnt = 0;
 | |
|     while (true) {
 | |
|       auto task = task_creator.create();
 | |
|       cnt += task.size();
 | |
|       if (task.empty()) {
 | |
|         break;
 | |
|       }
 | |
|       run_task(task);
 | |
|     }
 | |
|     //LOG(ERROR) << "In run_migrate " << cnt;
 | |
|   }
 | |
| 
 | |
|   void run_task(Task task) {
 | |
|     for (auto i = task.begin; i < task.end; i++) {
 | |
|       auto &node = migrate_from_hash_map_->node_at(i);
 | |
|       auto old_value = node.value.exchange(migrate_value(), std::memory_order_acq_rel);
 | |
|       if (old_value == 0) {
 | |
|         continue;
 | |
|       }
 | |
|       auto node_key = node.key.load(std::memory_order_relaxed);
 | |
|       //LOG(ERROR) << node_key << " " << node_key;
 | |
|       auto ok = migrate_to_hash_map_->with_value(
 | |
|           node_key, true, [&](auto &node_value) { node_value.store(old_value, std::memory_order_relaxed); });
 | |
|       LOG_CHECK(ok) << "Migration overflow";
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <class KeyT, class ValueT>
 | |
| td::HazardPointers<typename ConcurrentHashMap<KeyT, ValueT>::HashMap> ConcurrentHashMap<KeyT, ValueT>::hp_(64);
 | |
| 
 | |
| }  // namespace td
 |