mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1712 lines
		
	
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1712 lines
		
	
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* 
 | |
|     This file is part of TON Blockchain source code.
 | |
| 
 | |
|     TON Blockchain is free software; you can redistribute it and/or
 | |
|     modify it under the terms of the GNU General Public License
 | |
|     as published by the Free Software Foundation; either version 2
 | |
|     of the License, or (at your option) any later version.
 | |
| 
 | |
|     TON Blockchain is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU General Public License
 | |
|     along with TON Blockchain.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
|     In addition, as a special exception, the copyright holders give permission 
 | |
|     to link the code of portions of this program with the OpenSSL library. 
 | |
|     You must obey the GNU General Public License in all respects for all 
 | |
|     of the code used other than OpenSSL. If you modify file(s) with this 
 | |
|     exception, you may extend this exception to your version of the file(s), 
 | |
|     but you are not obligated to do so. If you do not wish to do so, delete this 
 | |
|     exception statement from your version. If you delete this exception statement 
 | |
|     from all source files in the program, then also delete it here.
 | |
| 
 | |
|     Copyright 2017-2020 Telegram Systems LLP
 | |
| */
 | |
| #include "third_party/FAAArrayQueue.h"
 | |
| #include "third_party/HazardPointers.h"
 | |
| #include "third_party/LazyIndexArrayQueue.h"
 | |
| #include "third_party/MoodyCamelQueue.h"
 | |
| 
 | |
| #if TG_LCR_QUEUE
 | |
| #include "third_party/LCRQueue.h"
 | |
| 
 | |
| extern "C" {
 | |
| #include "third_party/mp-queue.h"
 | |
| }
 | |
| 
 | |
| #include <linux/futex.h>
 | |
| #include <sys/syscall.h>
 | |
| #include <unistd.h>
 | |
| #endif
 | |
| 
 | |
| #include "td/actor/core/ActorLocker.h"
 | |
| #include "td/actor/actor.h"
 | |
| 
 | |
| #include "td/utils/benchmark.h"
 | |
| #include "td/utils/crypto.h"
 | |
| #include "td/utils/logging.h"
 | |
| #include "td/utils/misc.h"
 | |
| #include "td/utils/MpmcQueue.h"
 | |
| #include "td/utils/MpmcWaiter.h"
 | |
| #include "td/utils/port/thread.h"
 | |
| #include "td/utils/queue.h"
 | |
| #include "td/utils/Random.h"
 | |
| #include "td/utils/Slice.h"
 | |
| #include "td/utils/Status.h"
 | |
| #include "td/utils/StealingQueue.h"
 | |
| #include "td/utils/ThreadSafeCounter.h"
 | |
| #include "td/utils/UInt.h"
 | |
| #include "td/utils/VectorQueue.h"
 | |
| 
 | |
| #include <algorithm>
 | |
| #include <array>
 | |
| #include <atomic>
 | |
| #include <condition_variable>
 | |
| #include <functional>
 | |
| #include <mutex>
 | |
| #include <queue>
 | |
| #include <string>
 | |
| 
 | |
| using td::int32;
 | |
| using td::uint32;
 | |
| 
 | |
| // Concurrent SHA256 benchmark
 | |
| // Simplified ton Cell and Block structures
 | |
| struct CellRef {
 | |
|   int32 cell_id{0};
 | |
|   td::MutableSlice hash_slice;
 | |
| };
 | |
| struct Cell {
 | |
|   bool has_hash = false;
 | |
|   td::UInt256 hash;
 | |
|   td::MutableSlice data;
 | |
|   std::array<CellRef, 4> next{};
 | |
| };
 | |
| 
 | |
| struct Block {
 | |
|   std::string data;
 | |
|   std::vector<Cell> cells;
 | |
|   Cell &get_cell(int32 id) {
 | |
|     return cells[id];
 | |
|   }
 | |
|   const Cell &get_cell(int32 id) const {
 | |
|     return cells[id];
 | |
|   }
 | |
| };
 | |
| 
 | |
| class Generator {
 | |
|  public:
 | |
|   static std::string random_bytes(int length) {
 | |
|     std::string res(length, ' ');
 | |
|     for (auto &c : res) {
 | |
|       c = static_cast<char>(td::Random::fast_uint32() % 256);
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   static Block random_block(int cells_count) {
 | |
|     const size_t cell_size = 256;
 | |
|     Block block;
 | |
|     block.data = random_bytes(td::narrow_cast<int>(cell_size * cells_count));
 | |
|     block.cells.reserve(cells_count);
 | |
|     for (int i = 0; i < cells_count; i++) {
 | |
|       Cell cell;
 | |
|       cell.data = td::MutableSlice(block.data).substr(i * cell_size, cell_size);
 | |
|       for (int j = 0; j < 4; j++) {
 | |
|         cell.next[j] = [&] {
 | |
|           CellRef cell_ref;
 | |
|           if (i == 0) {
 | |
|             return cell_ref;
 | |
|           }
 | |
|           cell_ref.cell_id = td::Random::fast(0, i - 1);
 | |
|           cell_ref.hash_slice = cell.data.substr(cell_size - 128 + j * 32, 32);
 | |
|           return cell_ref;
 | |
|         }();
 | |
|       }
 | |
|       block.cells.push_back(std::move(cell));
 | |
|     }
 | |
|     return block;
 | |
|   }
 | |
| };
 | |
| 
 | |
| class BlockSha256Baseline {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "Baseline";
 | |
|   }
 | |
|   static void calc_hash(Block &block) {
 | |
|     for (auto &cell : block.cells) {
 | |
|       td::sha256(cell.data, as_slice(cell.hash));
 | |
|     }
 | |
|   }
 | |
|   static td::Status check(Block &block) {
 | |
|     for (auto &cell : block.cells) {
 | |
|       for (auto &cell_ref : cell.next) {
 | |
|         if (cell_ref.hash_slice.empty()) {
 | |
|           continue;
 | |
|         }
 | |
|         if (cell_ref.hash_slice != as_slice(block.get_cell(cell_ref.cell_id).hash)) {
 | |
|           return td::Status::Error("Sha mismatch");
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     return td::Status::OK();
 | |
|   }
 | |
|   static void calc_refs(Block &block) {
 | |
|     for (auto &cell : block.cells) {
 | |
|       for (auto &cell_ref : cell.next) {
 | |
|         if (cell_ref.hash_slice.empty()) {
 | |
|           continue;
 | |
|         }
 | |
|         cell_ref.hash_slice.copy_from(as_slice(block.get_cell(cell_ref.cell_id).hash));
 | |
|       }
 | |
|       td::sha256(cell.data, as_slice(cell.hash));
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| class BlockSha256Threads {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "Threads";
 | |
|   }
 | |
|   template <class Iterator, class F>
 | |
|   static void parallel_map(Iterator begin, Iterator end, F &&f) {
 | |
|     size_t size = end - begin;
 | |
|     auto threads_count = std::max(td::thread::hardware_concurrency(), 1u) * 2;
 | |
|     auto thread_part_size = (size + threads_count - 1) / threads_count;
 | |
|     std::vector<td::thread> threads;
 | |
|     for (size_t i = 0; i < size; i += thread_part_size) {
 | |
|       auto part_begin = begin + i;
 | |
|       auto part_size = std::min(thread_part_size, size - i);
 | |
|       auto part_end = part_begin + part_size;
 | |
|       threads.push_back(td::thread([part_begin, part_end, &f] {
 | |
|         for (auto it = part_begin; it != part_end; it++) {
 | |
|           f(*it);
 | |
|         }
 | |
|       }));
 | |
|     }
 | |
|     for (auto &thread : threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|   }
 | |
|   static void calc_hash(Block &block) {
 | |
|     parallel_map(block.cells.begin(), block.cells.end(),
 | |
|                  [](Cell &cell) { td::sha256(cell.data, as_slice(cell.hash)); });
 | |
|   }
 | |
|   static td::Status check_refs(Block &block) {
 | |
|     std::atomic<bool> mismatch{false};
 | |
|     parallel_map(block.cells.begin(), block.cells.end(), [&](Cell &cell) {
 | |
|       for (auto &cell_ref : cell.next) {
 | |
|         if (cell_ref.hash_slice.empty()) {
 | |
|           continue;
 | |
|         }
 | |
|         if (cell_ref.hash_slice != as_slice(block.get_cell(cell_ref.cell_id).hash)) {
 | |
|           mismatch = true;
 | |
|           break;
 | |
|         }
 | |
|       }
 | |
|     });
 | |
|     if (mismatch) {
 | |
|       return td::Status::Error("sha256 mismatch");
 | |
|     }
 | |
|     return td::Status::OK();
 | |
|   }
 | |
| };
 | |
| 
 | |
| class InfBackoff {
 | |
|  private:
 | |
|   int cnt = 0;
 | |
| 
 | |
|  public:
 | |
|   bool next() {
 | |
|     cnt++;
 | |
|     if (cnt < 50) {
 | |
|       return true;
 | |
|     } else {
 | |
|       td::this_thread::yield();
 | |
|       return true;
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <class Q>
 | |
| class BlockSha256MpmcQueue {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return Q::get_description();
 | |
|   }
 | |
|   static void calc_hash(Block &block) {
 | |
|     std::vector<td::thread> threads;
 | |
|     auto threads_count = std::max(td::thread::hardware_concurrency(), 1u) * 2;
 | |
|     auto queue = std::make_unique<Q>(threads_count + 1);
 | |
|     for (size_t thread_id = 0; thread_id < threads_count; thread_id++) {
 | |
|       threads.push_back(td::thread([&, thread_id] {
 | |
|         while (true) {
 | |
|           auto f = queue->pop(thread_id);
 | |
|           if (!f) {
 | |
|             return;
 | |
|           }
 | |
|           f();
 | |
|         }
 | |
|       }));
 | |
|     }
 | |
|     for (auto &cell : block.cells) {
 | |
|       queue->push([&cell]() { td::sha256(cell.data, as_slice(cell.hash)); }, threads_count);
 | |
|     }
 | |
|     for (size_t thread_id = 0; thread_id < threads_count; thread_id++) {
 | |
|       queue->push(nullptr, threads_count);
 | |
|     }
 | |
|     for (auto &thread : threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| template <class Q>
 | |
| class BlockSha256MpmcQueueCellPtr {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "ptr " + Q::get_description();
 | |
|   }
 | |
|   static void calc_hash(Block &block) {
 | |
|     std::vector<td::thread> threads;
 | |
|     auto threads_count = std::max(td::thread::hardware_concurrency(), 1u) * 2;
 | |
|     auto queue = std::make_unique<Q>(threads_count + 1);
 | |
|     Cell poison;
 | |
|     for (size_t thread_id = 0; thread_id < threads_count; thread_id++) {
 | |
|       threads.push_back(td::thread([&, thread_id] {
 | |
|         while (true) {
 | |
|           auto cell = queue->pop(thread_id);
 | |
|           if (cell == &poison) {
 | |
|             return;
 | |
|           }
 | |
|           td::sha256(cell->data, as_slice(cell->hash));
 | |
|         }
 | |
|       }));
 | |
|     }
 | |
|     for (auto &cell : block.cells) {
 | |
|       queue->push(&cell, threads_count);
 | |
|     }
 | |
|     for (size_t thread_id = 0; thread_id < threads_count; thread_id++) {
 | |
|       queue->push(&poison, threads_count);
 | |
|     }
 | |
|     for (auto &thread : threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| std::atomic<int> flag;
 | |
| class ActorExecutorBenchmark : public td::Benchmark {
 | |
|   std::string get_description() const {
 | |
|     return "Executor Benchmark";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     using namespace td::actor::core;
 | |
|     using namespace td::actor;
 | |
|     using td::actor::detail::ActorMessageCreator;
 | |
|     struct Dispatcher : public SchedulerDispatcher {
 | |
|       void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
 | |
|         //queue.push_back(std::move(ptr));
 | |
|         q.push(ptr, 0);
 | |
|       }
 | |
|       void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) override {
 | |
|         UNREACHABLE();
 | |
|       }
 | |
|       SchedulerId get_scheduler_id() const override {
 | |
|         return SchedulerId{0};
 | |
|       }
 | |
|       std::deque<ActorInfoPtr> queue;
 | |
|       td::MpmcQueue<ActorInfoPtr> q{1};
 | |
|     };
 | |
|     Dispatcher dispatcher;
 | |
| 
 | |
|     class TestActor : public Actor {
 | |
|      public:
 | |
|       void close() {
 | |
|         stop();
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       void start_up() override {
 | |
|         //LOG(ERROR) << "start up";
 | |
|       }
 | |
|       void tear_down() override {
 | |
|       }
 | |
|       void wake_up() override {
 | |
|         //LOG(ERROR) << "wake up";
 | |
|       }
 | |
|     };
 | |
|     ActorInfoCreator actor_info_creator;
 | |
|     auto actor = actor_info_creator.create(std::make_unique<TestActor>(),
 | |
|                                            ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("A"));
 | |
|     auto actor2 = actor_info_creator.create(std::make_unique<TestActor>(),
 | |
|                                             ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("B"));
 | |
| 
 | |
|     {
 | |
|       ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         //int old = i;
 | |
|         //flag.compare_exchange_strong(old, i + 1, std::memory_order_acquire, std::memory_order_relaxed);
 | |
|         ActorExecutor executor2(*actor2, dispatcher, ActorExecutor::Options());
 | |
|       }
 | |
|     }
 | |
|     for (int i = 0; i < 0; i++) {
 | |
|       {
 | |
|         ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
 | |
|         executor.send_immediate(
 | |
|             [&] {
 | |
|               ActorExecutor executor2(*actor2, dispatcher, ActorExecutor::Options());
 | |
|               executor2.send_immediate(
 | |
|                   [&] {
 | |
|                     ActorExecutor executor3(*actor, dispatcher, ActorExecutor::Options());
 | |
|                     executor3.send(td::actor::core::ActorSignals::one(td::actor::core::ActorSignals::Wakeup));
 | |
|                   },
 | |
|                   0);
 | |
|             },
 | |
|             0);
 | |
|       }
 | |
|       dispatcher.q.pop(0);
 | |
|     }
 | |
| 
 | |
|     //{
 | |
|     //ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
 | |
|     //executor.send(
 | |
|     //ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
 | |
|     //}
 | |
|     dispatcher.queue.clear();
 | |
|   }
 | |
| };
 | |
| namespace actor_signal_query_test {
 | |
| using namespace td::actor;
 | |
| class Master;
 | |
| class Worker : public td::actor::Actor {
 | |
|  public:
 | |
|   Worker(std::shared_ptr<td::Destructor> watcher, ActorId<Master> master)
 | |
|       : watcher_(std::move(watcher)), master_(std::move(master)) {
 | |
|   }
 | |
|   void wake_up() override;
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
|   ActorId<Master> master_;
 | |
| };
 | |
| class Master : public td::actor::Actor {
 | |
|  public:
 | |
|   Master(std::shared_ptr<td::Destructor> watcher, int n) : watcher_(std::move(watcher)), n_(n) {
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     worker_ = create_actor<Worker>(ActorOptions().with_name("Worker"), watcher_, actor_id(this));
 | |
|     send_signals(worker_, ActorSignals::wakeup());
 | |
|   }
 | |
| 
 | |
|   void wake_up() override {
 | |
|     n_--;
 | |
|     if (n_ <= 0) {
 | |
|       return stop();
 | |
|     }
 | |
|     send_signals(worker_, ActorSignals::wakeup());
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
|   ActorOwn<Worker> worker_;
 | |
|   int n_;
 | |
| };
 | |
| void Worker::wake_up() {
 | |
|   send_signals(master_, ActorSignals::wakeup());
 | |
| }
 | |
| }  // namespace actor_signal_query_test
 | |
| class ActorSignalQuery : public td::Benchmark {
 | |
|  public:
 | |
|   std::string get_description() const override {
 | |
|     return "ActorSignalQuery";
 | |
|   }
 | |
|   void run(int n) override {
 | |
|     using namespace actor_signal_query_test;
 | |
|     size_t threads_count = 1;
 | |
|     Scheduler scheduler{{threads_count}};
 | |
| 
 | |
|     scheduler.run_in_context([&] {
 | |
|       auto watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
 | |
| 
 | |
|       create_actor<Master>(ActorOptions().with_name(PSLICE() << "Master"), watcher, n).release();
 | |
|     });
 | |
|     scheduler.run();
 | |
|   }
 | |
| };
 | |
| 
 | |
| namespace actor_query_test {
 | |
| using namespace td::actor;
 | |
| class Master;
 | |
| class Worker : public td::actor::Actor {
 | |
|  public:
 | |
|   Worker(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
 | |
|   }
 | |
|   void query(int x, ActorId<Master> master);
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
| };
 | |
| class Master : public td::actor::Actor {
 | |
|  public:
 | |
|   Master(std::shared_ptr<td::Destructor> watcher, int n) : watcher_(std::move(watcher)), n_(n) {
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     worker_ = create_actor<Worker>(ActorOptions().with_name("Worker"), watcher_);
 | |
|     send_closure(worker_, &Worker::query, n_, actor_id(this));
 | |
|   }
 | |
| 
 | |
|   void answer(int x, int y) {
 | |
|     if (x == 0) {
 | |
|       return stop();
 | |
|     }
 | |
|     send_closure(worker_, &Worker::query, x - 1, actor_id(this));
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
|   ActorOwn<Worker> worker_;
 | |
|   int n_;
 | |
| };
 | |
| void Worker::query(int x, ActorId<Master> master) {
 | |
|   send_closure(master, &Master::answer, x, x + x);
 | |
| }
 | |
| }  // namespace actor_query_test
 | |
| class ActorQuery : public td::Benchmark {
 | |
|  public:
 | |
|   std::string get_description() const override {
 | |
|     return "ActorQuery";
 | |
|   }
 | |
|   void run(int n) override {
 | |
|     using namespace actor_query_test;
 | |
|     size_t threads_count = 1;
 | |
|     Scheduler scheduler({threads_count});
 | |
| 
 | |
|     scheduler.run_in_context([&] {
 | |
|       auto watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
 | |
| 
 | |
|       create_actor<Master>(ActorOptions().with_name(PSLICE() << "Master"), watcher, n).release();
 | |
|     });
 | |
|     scheduler.run();
 | |
|   }
 | |
| };
 | |
| 
 | |
| namespace actor_dummy_query_test {
 | |
| using namespace td::actor;
 | |
| class Master;
 | |
| class Worker : public td::actor::Actor {
 | |
|  public:
 | |
|   Worker(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
 | |
|   }
 | |
|   void query(int x, int *y) {
 | |
|     *y = x + x;
 | |
|   }
 | |
|   void start_up() override {
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
| };
 | |
| class Master : public td::actor::Actor {
 | |
|  public:
 | |
|   Master(std::shared_ptr<td::Destructor> watcher, int n) : watcher_(std::move(watcher)), n_(n) {
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     worker_ = create_actor<Worker>(ActorOptions().with_name("Worker"), watcher_);
 | |
|     int res;
 | |
|     for (int i = 0; i < n_; i++) {
 | |
|       send_closure(worker_, &Worker::query, i, &res);
 | |
|       CHECK(res == i + i);
 | |
|     }
 | |
|     stop();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
|   ActorOwn<Worker> worker_;
 | |
|   int n_;
 | |
| };
 | |
| }  // namespace actor_dummy_query_test
 | |
| class ActorDummyQuery : public td::Benchmark {
 | |
|  public:
 | |
|   std::string get_description() const override {
 | |
|     return "ActorDummyQuery";
 | |
|   }
 | |
|   void run(int n) override {
 | |
|     using namespace actor_dummy_query_test;
 | |
|     size_t threads_count = 1;
 | |
|     Scheduler scheduler({threads_count});
 | |
| 
 | |
|     scheduler.run_in_context([&] {
 | |
|       auto watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
 | |
| 
 | |
|       create_actor<Master>(ActorOptions().with_name(PSLICE() << "Master"), watcher, n).release();
 | |
|     });
 | |
| 
 | |
|     scheduler.run();
 | |
|   }
 | |
| };
 | |
| 
 | |
| namespace actor_task_query_test {
 | |
| using namespace td::actor;
 | |
| class Master;
 | |
| class Worker : public td::actor::Actor {
 | |
|  public:
 | |
|   Worker(int x, ActorId<Master> master) : x_(x), master_(std::move(master)) {
 | |
|   }
 | |
|   void start_up() override;
 | |
| 
 | |
|  private:
 | |
|   int x_;
 | |
|   ActorId<Master> master_;
 | |
| };
 | |
| class Master : public td::actor::Actor {
 | |
|  public:
 | |
|   Master(std::shared_ptr<td::Destructor> watcher, int n) : watcher_(std::move(watcher)), n_(n) {
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     create_actor<Worker>(ActorOptions().with_name("Worker"), n_, actor_id(this)).release();
 | |
|   }
 | |
| 
 | |
|   void answer(int x, int y) {
 | |
|     if (x == 0) {
 | |
|       return stop();
 | |
|     }
 | |
|     create_actor<Worker>(ActorOptions().with_name("Worker"), x - 1, actor_id(this)).release();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<td::Destructor> watcher_;
 | |
|   ActorOwn<Worker> worker_;
 | |
|   int n_;
 | |
| };
 | |
| void Worker::start_up() {
 | |
|   send_closure(master_, &Master::answer, x_, x_ + x_);
 | |
|   stop();
 | |
| }
 | |
| }  // namespace actor_task_query_test
 | |
| class ActorTaskQuery : public td::Benchmark {
 | |
|  public:
 | |
|   std::string get_description() const override {
 | |
|     return "ActorTaskQuery";
 | |
|   }
 | |
|   void run(int n) override {
 | |
|     using namespace actor_task_query_test;
 | |
|     size_t threads_count = 1;
 | |
|     Scheduler scheduler({threads_count});
 | |
| 
 | |
|     scheduler.run_in_context([&] {
 | |
|       auto watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
 | |
| 
 | |
|       create_actor<Master>(ActorOptions().with_name(PSLICE() << "Master"), watcher, n).release();
 | |
|     });
 | |
|     scheduler.run();
 | |
|   }
 | |
| };
 | |
| class BlockSha256Actors {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "Actors";
 | |
|   }
 | |
|   template <class Iterator, class F>
 | |
|   static void parallel_map(Iterator begin, Iterator end, F &&f) {
 | |
|     auto threads_count = std::max(td::thread::hardware_concurrency(), 1u) * 2;
 | |
|     using namespace td::actor;
 | |
|     Scheduler scheduler({threads_count});
 | |
| 
 | |
|     scheduler.run_in_context([&] {
 | |
|       auto watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
 | |
|       class Worker : public td::actor::Actor {
 | |
|        public:
 | |
|         Worker(std::shared_ptr<td::Destructor> watcher, td::Promise<> promise)
 | |
|             : watcher_(std::move(watcher)), promise_(std::move(promise)) {
 | |
|         }
 | |
|         void start_up() override {
 | |
|           promise_.set_value(td::Unit());
 | |
|           stop();
 | |
|         }
 | |
| 
 | |
|        private:
 | |
|         std::shared_ptr<td::Destructor> watcher_;
 | |
|         td::Promise<> promise_;
 | |
|       };
 | |
| 
 | |
|       for (auto it = begin; it != end; it++) {
 | |
|         create_actor<Worker>(ActorOptions().with_name(PSLICE() << "Worker#"), watcher,
 | |
|                              td::Promise<>([&, it](td::Unit) { f(*it); }))
 | |
|             .release();
 | |
|       }
 | |
|     });
 | |
|     scheduler.run();
 | |
|   }
 | |
|   static void calc_hash(Block &block) {
 | |
|     parallel_map(block.cells.begin(), block.cells.end(),
 | |
|                  [](Cell &cell) { td::sha256(cell.data, as_slice(cell.hash)); });
 | |
|   }
 | |
| };
 | |
| 
 | |
| class ActorLockerBenchmark : public td::Benchmark {
 | |
|  public:
 | |
|   explicit ActorLockerBenchmark(int threads_n) : threads_n_(threads_n) {
 | |
|   }
 | |
|   std::string get_description() const override {
 | |
|     return PSTRING() << "ActorLockerBenchmark " << threads_n_;
 | |
|   }
 | |
|   void run(int n) override {
 | |
|     std::vector<td::thread> threads(threads_n_);
 | |
|     using namespace td::actor::core;
 | |
|     ActorState state;
 | |
|     std::atomic<int> ready{0};
 | |
|     for (auto &thread : threads) {
 | |
|       thread = td::thread([&] {
 | |
|         ActorLocker locker(&state);
 | |
|         ready++;
 | |
|         while (ready != threads_n_) {
 | |
|           td::this_thread::yield();
 | |
|         }
 | |
|         for (int i = 0; i < n / threads_n_; i++) {
 | |
|           if (locker.add_signals(ActorSignals::one(ActorSignals::Kill))) {
 | |
|             while (!locker.try_unlock(ActorState::Flags{})) {
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|       });
 | |
|     }
 | |
|     for (auto &thread : threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   int threads_n_;
 | |
| };
 | |
| 
 | |
| template <class Impl>
 | |
| class CalcHashSha256Benchmark : public td::Benchmark {
 | |
|  public:
 | |
|   std::string get_description() const override {
 | |
|     return "CheckSha256: " + impl_.get_description();
 | |
|   }
 | |
|   void start_up_n(int n) override {
 | |
|     block_ = Generator::random_block(n);
 | |
|   }
 | |
| 
 | |
|   void run(int n) override {
 | |
|     Impl::calc_hash(block_);
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   Impl impl_;
 | |
|   Block block_;
 | |
| };
 | |
| 
 | |
| /*
 | |
| template <class T>
 | |
| class MpmcQueueInterface {
 | |
|  public:
 | |
|   explicit MpmcQueueInterface(size_t thread_n);
 | |
|   static std::string get_description();
 | |
|   void push(T value, size_t thread_id);
 | |
|   T pop(size_t thread_id);
 | |
|   bool try_pop(T &value, size_t thread_id);
 | |
| };
 | |
| */
 | |
| 
 | |
| // Simple bounded mpmc queue
 | |
| template <class ValueT>
 | |
| class BoundedMpmcQueue {
 | |
|  public:
 | |
|   explicit BoundedMpmcQueue(size_t threads_n) : data_(1 << 20) {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "BoundedMpmc queue";
 | |
|   }
 | |
|   void push(ValueT value, size_t = 0) {
 | |
|     auto pos = write_pos_.fetch_add(1, std::memory_order_relaxed);
 | |
|     auto generation = pos / data_.size() * 2 + 0;
 | |
|     auto &element = data_[pos % data_.size()];
 | |
| 
 | |
|     InfBackoff backoff;
 | |
|     while (element.generation.load(std::memory_order_acquire) != generation) {
 | |
|       backoff.next();
 | |
|     }
 | |
|     element.value = std::move(value);
 | |
|     element.generation.fetch_add(1, std::memory_order_release);
 | |
|   }
 | |
| 
 | |
|   ValueT pop(size_t = 0) {
 | |
|     auto pos = read_pos_.fetch_add(1, std::memory_order_relaxed);
 | |
|     auto generation = pos / data_.size() * 2 + 1;
 | |
|     auto &element = data_[pos % data_.size()];
 | |
| 
 | |
|     InfBackoff backoff;
 | |
|     while (element.generation.load(std::memory_order_acquire) != generation) {
 | |
|       backoff.next();
 | |
|     }
 | |
|     auto result = std::move(element.value);
 | |
|     element.generation.fetch_add(1, std::memory_order_release);
 | |
|     return result;
 | |
|   }
 | |
|   bool try_pop(ValueT &value, size_t = 0) {
 | |
|     auto pos = read_pos_.load(std::memory_order_relaxed);
 | |
|     auto generation = pos / data_.size() * 2 + 1;
 | |
|     auto &element = data_[pos % data_.size()];
 | |
| 
 | |
|     if (element.generation.load(std::memory_order_acquire) != generation) {
 | |
|       return false;
 | |
|     }
 | |
|     if (!read_pos_.compare_exchange_strong(pos, pos + 1, std::memory_order_acq_rel)) {
 | |
|       return false;
 | |
|     }
 | |
|     value = std::move(element.value);
 | |
|     element.generation.fetch_add(1, std::memory_order_release);
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::atomic<td::uint64> write_pos_{0};
 | |
|   char pad[128];
 | |
|   std::atomic<td::uint64> read_pos_{0};
 | |
|   char pad2[128];
 | |
|   struct Element {
 | |
|     std::atomic<td::uint32> generation{0};
 | |
|     ValueT value;
 | |
|     //char pad2[128];
 | |
|   };
 | |
|   std::vector<Element> data_;
 | |
|   char pad3[128];
 | |
| };
 | |
| 
 | |
| template <class Impl>
 | |
| class MpmcQueueBenchmark : public td::Benchmark {
 | |
|  public:
 | |
|   MpmcQueueBenchmark(int n, int m) : n_(n), m_(m) {
 | |
|   }
 | |
|   std::string get_description() const override {
 | |
|     return PSTRING() << "MpmcQueueBenchmark " << n_ << " " << m_ << " " << Impl::get_description();
 | |
|   }
 | |
| 
 | |
|   void run(int n) override {
 | |
|     std::vector<td::thread> n_threads(n_);
 | |
|     std::vector<td::thread> m_threads(m_);
 | |
|     auto impl = std::make_unique<Impl>(n_ + m_ + 1);
 | |
|     size_t thread_id = 0;
 | |
|     td::ThreadSafeCounter counter;
 | |
|     CHECK(counter.sum() == 0);
 | |
|     for (auto &thread : m_threads) {
 | |
|       thread = td::thread([&, thread_id] {
 | |
|         while (true) {
 | |
|           size_t value = impl->pop(thread_id);
 | |
|           counter.add(-static_cast<td::int64>(value));
 | |
|           if (!value) {
 | |
|             break;
 | |
|           }
 | |
|         }
 | |
|       });
 | |
|       thread_id++;
 | |
|     }
 | |
|     for (auto &thread : n_threads) {
 | |
|       thread = td::thread([&, thread_id] {
 | |
|         for (int i = 0; i < n / n_; i++) {
 | |
|           impl->push(static_cast<size_t>(i + 1), thread_id);
 | |
|           counter.add(i + 1);
 | |
|         }
 | |
|       });
 | |
|       thread_id++;
 | |
|     }
 | |
|     for (auto &thread : n_threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|     while (counter.sum() != 0) {
 | |
|       td::this_thread::yield();
 | |
|     }
 | |
|     for (int i = 0; i < m_ + n_ + 1; i++) {
 | |
|       impl->push(0, thread_id);
 | |
|     }
 | |
|     for (auto &thread : m_threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|     impl.reset();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   int n_;
 | |
|   int m_;
 | |
| };
 | |
| 
 | |
| template <class Impl>
 | |
| class MpmcQueueBenchmark2 : public td::Benchmark {
 | |
|  public:
 | |
|   MpmcQueueBenchmark2(int n, int k) : n_(n), k_(k) {
 | |
|   }
 | |
|   std::string get_description() const override {
 | |
|     return PSTRING() << "MpmcQueueBenchmark2 " << n_ << " " << k_ << " " << Impl::get_description();
 | |
|   }
 | |
| 
 | |
|   void run(int n) override {
 | |
|     std::vector<td::thread> n_threads(n_);
 | |
|     auto impl = std::make_unique<Impl>(n_ + 1);
 | |
|     size_t thread_id = 0;
 | |
|     std::atomic<int> left{k_};
 | |
| 
 | |
|     for (int i = 0; i < k_; i++) {
 | |
|       impl->push(n, thread_id);
 | |
|     }
 | |
| 
 | |
|     for (auto &thread : n_threads) {
 | |
|       thread = td::thread([&, thread_id] {
 | |
|         while (true) {
 | |
|           size_t value = impl->pop(thread_id);
 | |
|           if (value > 1) {
 | |
|             impl->push(value - 1, thread_id);
 | |
|           }
 | |
|           if (value == 1) {
 | |
|             left--;
 | |
|           }
 | |
|           if (!value) {
 | |
|             break;
 | |
|           }
 | |
|         }
 | |
|       });
 | |
|       thread_id++;
 | |
|     }
 | |
| 
 | |
|     while (left.load() != 0) {
 | |
|       td::this_thread::yield();
 | |
|     }
 | |
| 
 | |
|     for (int i = 0; i < n_ + 1; i++) {
 | |
|       impl->push(0, thread_id);
 | |
|     }
 | |
| 
 | |
|     for (auto &thread : n_threads) {
 | |
|       thread.join();
 | |
|     }
 | |
|     impl.reset();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   int n_;
 | |
|   int k_;
 | |
| };
 | |
| 
 | |
| class Cheat {
 | |
|  public:
 | |
|   explicit Cheat(size_t thread_n) : impl_(static_cast<int>(thread_n)) {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "td::MpmcQueue (cheat)";
 | |
|   }
 | |
|   void push(size_t value, size_t thread_id) {
 | |
|     impl_.push(reinterpret_cast<size_t *>(value + 1), static_cast<int>(thread_id));
 | |
|   }
 | |
|   size_t pop(size_t thread_id) {
 | |
|     auto res = impl_.pop(thread_id);
 | |
|     return reinterpret_cast<size_t>(res) - 1;
 | |
|   }
 | |
|   bool try_pop(size_t &value, size_t thread_id) {
 | |
|     size_t *was;
 | |
|     if (impl_.try_pop(was, thread_id)) {
 | |
|       value = reinterpret_cast<size_t>(was) - 1;
 | |
|       return true;
 | |
|     }
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   td::MpmcQueue<size_t *> impl_;
 | |
| };
 | |
| 
 | |
| using ConcurrencyFreaks::FAAArrayQueue;
 | |
| using ConcurrencyFreaks::LazyIndexArrayQueue;
 | |
| #if TG_LCR_QUEUE
 | |
| using ConcurrencyFreaks::LCRQueue;
 | |
| #endif
 | |
| 
 | |
| template <class Impl>
 | |
| class CfQueue {
 | |
|  public:
 | |
|   explicit CfQueue(size_t thread_n) : impl_(static_cast<int>(thread_n)) {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "TODO";
 | |
|   }
 | |
|   void push(size_t value, size_t thread_id) {
 | |
|     impl_.enqueue(reinterpret_cast<size_t *>(value + 1), static_cast<int>(thread_id));
 | |
|   }
 | |
|   size_t pop(size_t thread_id) {
 | |
|     size_t res;
 | |
|     while (!try_pop(res, thread_id)) {
 | |
|       td::this_thread::yield();
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
|   bool try_pop(size_t &value, size_t thread_id) {
 | |
|     auto ptr = impl_.dequeue(static_cast<int>(thread_id));
 | |
|     if (!ptr) {
 | |
|       return false;
 | |
|     }
 | |
|     value = reinterpret_cast<size_t>(ptr) - 1;
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   Impl impl_;
 | |
| };
 | |
| 
 | |
| #if TG_LCR_QUEUE
 | |
| template <>
 | |
| std::string CfQueue<LCRQueue<size_t>>::get_description() {
 | |
|   return "LCRQueue (cf)";
 | |
| }
 | |
| #endif
 | |
| template <>
 | |
| std::string CfQueue<LazyIndexArrayQueue<size_t>>::get_description() {
 | |
|   return "LazyIndexArrayQueue (cf)";
 | |
| }
 | |
| template <>
 | |
| std::string CfQueue<FAAArrayQueue<size_t>>::get_description() {
 | |
|   return "FAAArrayQueue (cf)";
 | |
| }
 | |
| template <class Impl, class T>
 | |
| class CfQueueT {
 | |
|  public:
 | |
|   explicit CfQueueT(size_t thread_n) : impl_(static_cast<int>(thread_n)) {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "TODO";
 | |
|   }
 | |
|   void push(T *value, size_t thread_id) {
 | |
|     impl_.enqueue(value, static_cast<int>(thread_id));
 | |
|   }
 | |
|   T *pop(size_t thread_id) {
 | |
|     td::detail::Backoff backoff;
 | |
|     while (true) {
 | |
|       auto ptr = impl_.dequeue(static_cast<int>(thread_id));
 | |
|       if (!ptr) {
 | |
|         backoff.next();
 | |
|       } else {
 | |
|         return ptr;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   bool try_pop(T *&value, size_t thread_id) {
 | |
|     value = impl_.dequeue(static_cast<int>(thread_id));
 | |
|     return value != nullptr;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   Impl impl_;
 | |
| };
 | |
| 
 | |
| #if TG_LCR_QUEUE
 | |
| template <>
 | |
| std::string CfQueueT<LCRQueue<Cell>, Cell>::get_description() {
 | |
|   return "LCRQueue (cf)";
 | |
| }
 | |
| #endif
 | |
| template <>
 | |
| std::string CfQueueT<LazyIndexArrayQueue<Cell>, Cell>::get_description() {
 | |
|   return "LazyIndexArrayQueue (cf)";
 | |
| }
 | |
| template <>
 | |
| std::string CfQueueT<FAAArrayQueue<Cell>, Cell>::get_description() {
 | |
|   return "FAAArrayQueue (cf)";
 | |
| }
 | |
| 
 | |
| template <class Value>
 | |
| class MoodyQueue {
 | |
|  public:
 | |
|   explicit MoodyQueue(size_t n) {
 | |
|     for (size_t i = 0; i < n; i++) {
 | |
|       p.push_back(moodycamel::ProducerToken(q));
 | |
|       c.push_back(moodycamel::ConsumerToken(q));
 | |
|     }
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "moodycamel queue";
 | |
|   }
 | |
|   void push(Value v, size_t tid) {
 | |
|     q.enqueue(p[tid], v);
 | |
|   }
 | |
|   Value pop(size_t tid) {
 | |
|     Value res;
 | |
|     while (!q.try_dequeue(c[tid], res)) {
 | |
|     }
 | |
|     //q.wait_dequeue(c[tid], res);
 | |
|     return res;
 | |
|   }
 | |
|   bool try_pop(Value &value, size_t tid) {
 | |
|     return q.try_dequeue(c[tid], value);
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   moodycamel::ConcurrentQueue<Value> q;
 | |
|   std::vector<moodycamel::ProducerToken> p;
 | |
|   std::vector<moodycamel::ConsumerToken> c;
 | |
| };
 | |
| 
 | |
| struct Sem {
 | |
|  public:
 | |
|   void post() {
 | |
|     if (++cnt_ == 0) {
 | |
|       {
 | |
|         std::unique_lock<std::mutex> lk(mutex_);
 | |
|       }
 | |
|       cnd_.notify_one();
 | |
|     }
 | |
|   }
 | |
|   void wait(int cnt = 1) {
 | |
|     auto was = cnt_.fetch_sub(cnt);
 | |
|     if (was >= cnt) {
 | |
|       return;
 | |
|     }
 | |
|     std::unique_lock<std::mutex> lk(mutex_);
 | |
|     cnd_.wait(lk, [&] { return cnt_ >= 0; });
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::mutex mutex_;
 | |
|   std::condition_variable cnd_;
 | |
|   std::atomic<int> cnt_{0};
 | |
| };
 | |
| 
 | |
| template <class Value>
 | |
| class MagicQueue {
 | |
|  public:
 | |
|   explicit MagicQueue(size_t n) : n_(n), qs_(n_ - 1), pos_{0} {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "magic queue";
 | |
|   }
 | |
|   void push(Value v, size_t tid) {
 | |
|     if (v == 0) {
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     if (tid + 1 == n_) {
 | |
|       qs_[pos_].push(v);
 | |
|       pos_ = (pos_ + 1) % (n_ - 1);
 | |
|     } else {
 | |
|       qs_[tid].push(v);
 | |
|     }
 | |
|   }
 | |
|   Value pop(size_t tid) {
 | |
|     CHECK(tid + 1 != n_);
 | |
|     if (qs_[tid].empty()) {
 | |
|       return 0;
 | |
|     }
 | |
|     return qs_[tid].pop();
 | |
|   }
 | |
|   bool try_pop(Value &value, size_t) {
 | |
|     UNREACHABLE();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   size_t n_;
 | |
|   std::vector<td::VectorQueue<Value>> qs_;
 | |
|   size_t pos_;
 | |
| };
 | |
| 
 | |
| #if TG_LCR_QUEUE
 | |
| class MpQueue {
 | |
|  public:
 | |
|   explicit MpQueue(size_t) {
 | |
|     q_ = alloc_mp_queue();
 | |
|   }
 | |
|   ~MpQueue() {
 | |
|     free_mp_queue(q_);
 | |
|     clear_thread_ids();
 | |
|   }
 | |
|   void push(size_t value, size_t) {
 | |
|     mpq_push(q_, reinterpret_cast<void *>(value + 1), 0);
 | |
|   }
 | |
|   size_t pop(size_t) {
 | |
|     td::detail::Backoff backoff;
 | |
|     while (true) {
 | |
|       auto ptr = mpq_pop(q_, 0);
 | |
|       if (!ptr) {
 | |
|         backoff.next();
 | |
|       } else {
 | |
|         return reinterpret_cast<size_t>(ptr) - 1;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   bool try_pop(size_t &value, size_t) {
 | |
|     auto ptr = mpq_pop(q_, 0);
 | |
|     if (!ptr) {
 | |
|       return false;
 | |
|     }
 | |
|     value = reinterpret_cast<size_t>(ptr) - 1;
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|   static std::string get_description() {
 | |
|     return "mp-queue";
 | |
|   }
 | |
| 
 | |
|  public:
 | |
|   struct mp_queue *q_;
 | |
| };
 | |
| 
 | |
| class Semaphore {
 | |
|  public:
 | |
|   Semaphore() {
 | |
|     impl->value = 0;
 | |
|     impl->waiting = 0;
 | |
|   }
 | |
|   void wait() {
 | |
|     mp_sem_wait(impl.get());
 | |
|   }
 | |
|   void post() {
 | |
|     mp_sem_post(impl.get());
 | |
|   }
 | |
|   std::unique_ptr<mp_sem_t> impl = std::make_unique<mp_semaphore>();
 | |
| };
 | |
| 
 | |
| template <class Q, class T>
 | |
| class SemQueue {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "Sem + " + Q::get_description();
 | |
|   }
 | |
|   explicit SemQueue(size_t threads_n) : impl(threads_n) {
 | |
|   }
 | |
|   T pop(size_t thread_id) {
 | |
|     T res;
 | |
|     td::detail::Backoff backoff;
 | |
|     while (!impl.try_pop(res, thread_id)) {
 | |
|       if (!backoff.next()) {
 | |
|         sem.wait();
 | |
|       }
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
|   void push(T value, size_t thread_id) {
 | |
|     impl.push(std::move(value), thread_id);
 | |
|     sem.post();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   Semaphore sem;
 | |
|   Q impl;
 | |
| };
 | |
| #endif
 | |
| 
 | |
| template <class T>
 | |
| class StupidQueue {
 | |
|  public:
 | |
|   explicit StupidQueue(size_t) {
 | |
|   }
 | |
|   static std::string get_description() {
 | |
|     return "Mutex queue";
 | |
|   }
 | |
|   T pop(size_t) {
 | |
|     std::unique_lock<std::mutex> guard(mutex_);
 | |
|     cv_.wait(guard, [&] { return !queue_.empty(); });
 | |
|     auto front = queue_.front();
 | |
|     queue_.pop();
 | |
|     return front;
 | |
|   }
 | |
|   void push(T value, size_t) {
 | |
|     {
 | |
|       std::unique_lock<std::mutex> guard(mutex_);
 | |
|       queue_.push(std::move(value));
 | |
|     }
 | |
|     cv_.notify_one();
 | |
|   }
 | |
|   bool try_pop(T &value, size_t) {
 | |
|     std::lock_guard<std::mutex> guard(mutex_);
 | |
|     if (!queue_.empty()) {
 | |
|       value = std::move(queue_.front());
 | |
|       queue_.pop();
 | |
|       return true;
 | |
|     }
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::mutex mutex_;
 | |
|   std::queue<T> queue_;
 | |
|   std::condition_variable cv_;
 | |
| };
 | |
| 
 | |
| template <class Q, class W, class T>
 | |
| class WaitQueue {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "Wait + " + Q::get_description();
 | |
|   }
 | |
| 
 | |
|   explicit WaitQueue(size_t threads_n) : impl(threads_n), slots(threads_n) {
 | |
|     for (size_t i = 0; i < threads_n; i++) {
 | |
|       waiter.init_slot(slots[i].slot, static_cast<int>(i));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   T pop(size_t thread_id) {
 | |
|     T res;
 | |
|     while (true) {
 | |
|       if (slots[thread_id].local_queue.try_pop(res)) {
 | |
|         break;
 | |
|       }
 | |
|       if (impl.try_pop(res, thread_id)) {
 | |
|         break;
 | |
|       }
 | |
|       waiter.wait(slots[thread_id].slot);
 | |
|     }
 | |
|     waiter.stop_wait(slots[thread_id].slot);
 | |
|     //LOG(ERROR) << "GOT";
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   void push_local(T value, size_t thread_id) {
 | |
|     auto o_value = slots[thread_id].local_queue.push(value);
 | |
|     if (o_value) {
 | |
|       push(o_value.unwrap, thread_id);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void push(T value, size_t thread_id) {
 | |
|     impl.push(value, static_cast<uint32>(thread_id));
 | |
|     waiter.notify();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   W waiter;
 | |
|   Q impl;
 | |
|   struct Slot {
 | |
|     typename W::Slot slot;
 | |
|     td::actor::core::LocalQueue<T> local_queue;
 | |
|   };
 | |
|   std::vector<Slot> slots;
 | |
| };
 | |
| template <class Q, class W, class T>
 | |
| class StealingWaitQueue {
 | |
|  public:
 | |
|   static std::string get_description() {
 | |
|     return "StealWait + " + Q::get_description();
 | |
|   }
 | |
| 
 | |
|   explicit StealingWaitQueue(size_t threads_n) : impl(threads_n), slots(threads_n) {
 | |
|     for (size_t i = 0; i < threads_n; i++) {
 | |
|       waiter.init_slot(slots[i].slot, static_cast<int>(i));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   T pop(size_t thread_id) {
 | |
|     T res;
 | |
|     while (true) {
 | |
|       if (slots[thread_id].stealing_queue.local_pop(res)) {
 | |
|         break;
 | |
|       }
 | |
|       if (slots[thread_id].local_queue.try_pop(res)) {
 | |
|         break;
 | |
|       }
 | |
|       if (impl.try_pop(res, thread_id)) {
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       bool ok = false;
 | |
|       for (size_t i = 1; i < slots.size(); i++) {
 | |
|         auto pos = (i + thread_id) % slots.size();
 | |
|         if (slots[thread_id].stealing_queue.steal(res, slots[pos].stealing_queue)) {
 | |
|           ok = true;
 | |
|           break;
 | |
|         }
 | |
|       }
 | |
|       if (ok) {
 | |
|         break;
 | |
|       }
 | |
|       waiter.wait(slots[thread_id].slot);
 | |
|     }
 | |
|     waiter.stop_wait(slots[thread_id].slot);
 | |
|     //LOG(ERROR) << "GOT";
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   void push_local(T value, size_t thread_id) {
 | |
|     auto o_value = slots[thread_id].local_queue.push(value);
 | |
|     if (o_value) {
 | |
|       push(o_value.unwrap, thread_id);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void push(T value, size_t thread_id) {
 | |
|     slots[thread_id].stealing_queue.local_push(value,
 | |
|                                                [&](auto value) { impl.push(value, static_cast<uint32>(thread_id)); });
 | |
|     waiter.notify();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   W waiter;
 | |
|   Q impl;
 | |
|   struct Slot {
 | |
|     typename W::Slot slot;
 | |
|     td::actor::core::LocalQueue<T> local_queue;
 | |
|     td::StealingQueue<T> stealing_queue;
 | |
|   };
 | |
|   std::vector<Slot> slots;
 | |
| };
 | |
| 
 | |
| void run_queue_bench(int n, int m) {
 | |
|   bench(MpmcQueueBenchmark<MoodyQueue<size_t>>(n, m), 2);
 | |
|   bench(MpmcQueueBenchmark<StealingWaitQueue<td::MpmcQueue<size_t>, td::MpmcEagerWaiter, size_t>>(n, m), 2);
 | |
|   bench(MpmcQueueBenchmark<StealingWaitQueue<td::MpmcQueue<size_t>, td::MpmcSleepyWaiter, size_t>>(n, m), 2);
 | |
|   bench(MpmcQueueBenchmark<WaitQueue<td::MpmcQueue<size_t>, td::MpmcEagerWaiter, size_t>>(n, m), 2);
 | |
|   bench(MpmcQueueBenchmark<WaitQueue<td::MpmcQueue<size_t>, td::MpmcSleepyWaiter, size_t>>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<td::MpmcQueue<size_t>>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<td::MpmcQueueOld<size_t>>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<Cheat>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<CfQueue<FAAArrayQueue<size_t>>>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<CfQueue<LazyIndexArrayQueue<size_t>>>(n, m), 2);
 | |
|   //bench(MpmcQueueBenchmark<StupidQueue<size_t>>(n, m), 2);
 | |
| 
 | |
|   //bench(MpmcQueueBenchmark<MpQueue>(n, m), 2);
 | |
| #if TG_LCR_QUEUE
 | |
|   bench(MpmcQueueBenchmark<CfQueue<LCRQueue<size_t>>>(n, m), 2);
 | |
| #endif
 | |
| }
 | |
| void run_queue_bench2(int n, int k) {
 | |
|   bench(MpmcQueueBenchmark2<StealingWaitQueue<td::MpmcQueue<size_t>, td::MpmcSleepyWaiter, size_t>>(n, k), 2);
 | |
|   bench(MpmcQueueBenchmark2<StealingWaitQueue<td::MpmcQueue<size_t>, td::MpmcEagerWaiter, size_t>>(n, k), 2);
 | |
|   bench(MpmcQueueBenchmark2<MagicQueue<size_t>>(n, k), 2);
 | |
|   bench(MpmcQueueBenchmark2<MoodyQueue<size_t>>(n, k), 2);
 | |
|   bench(MpmcQueueBenchmark2<WaitQueue<td::MpmcQueue<size_t>, td::MpmcEagerWaiter, size_t>>(n, k), 2);
 | |
|   bench(MpmcQueueBenchmark2<WaitQueue<td::MpmcQueue<size_t>, td::MpmcSleepyWaiter, size_t>>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<td::MpmcQueue<size_t>>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<td::MpmcQueueOld<size_t>>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<Cheat>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<CfQueue<FAAArrayQueue<size_t>>>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<CfQueue<LazyIndexArrayQueue<size_t>>>(n, k), 2);
 | |
|   //bench(MpmcQueueBenchmark2<StupidQueue<size_t>>(n, k), 2);
 | |
| 
 | |
|   //bench(MpmcQueueBenchmark<MpQueue>(n, m), 2);
 | |
| #if TG_LCR_QUEUE
 | |
|   bench(MpmcQueueBenchmark2<CfQueue<LCRQueue<size_t>>>(n, k), 2);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| class ChainedSpawn : public td::Benchmark {
 | |
|  public:
 | |
|   ChainedSpawn(bool use_io) : use_io_(use_io) {
 | |
|   }
 | |
|   std::string get_description() const {
 | |
|     return PSTRING() << "Chained create_actor use_io(" << use_io_ << ")";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     class Task : public td::actor::Actor {
 | |
|      public:
 | |
|       Task(int n, Sem *sem) : n_(n), sem_(sem) {
 | |
|       }
 | |
|       void start_up() override {
 | |
|         if (n_ == 0) {
 | |
|           sem_->post();
 | |
|         } else {
 | |
|           td::actor::create_actor<Task>("Task", n_ - 1, sem_).release();
 | |
|         }
 | |
|         stop();
 | |
|       };
 | |
| 
 | |
|      private:
 | |
|       int n_;
 | |
|       Sem *sem_{nullptr};
 | |
|     };
 | |
|     td::actor::Scheduler scheduler{{8}};
 | |
|     auto sch = td::thread([&] { scheduler.run(); });
 | |
| 
 | |
|     Sem sem;
 | |
|     scheduler.run_in_context_external([&] {
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), 1000, &sem)
 | |
|             .release();
 | |
|         sem.wait();
 | |
|       }
 | |
|       td::actor::SchedulerContext::get()->stop();
 | |
|     });
 | |
| 
 | |
|     sch.join();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   bool use_io_{false};
 | |
| };
 | |
| 
 | |
| class ChainedSpawnInplace : public td::Benchmark {
 | |
|  public:
 | |
|   ChainedSpawnInplace(bool use_io) : use_io_(use_io) {
 | |
|   }
 | |
|   std::string get_description() const {
 | |
|     return PSTRING() << "Chained send_signal(self) use_io(" << use_io_ << ")";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     class Task : public td::actor::Actor {
 | |
|      public:
 | |
|       Task(int n, Sem *sem) : n_(n), sem_(sem) {
 | |
|       }
 | |
|       void loop() override {
 | |
|         if (n_ == 0) {
 | |
|           sem_->post();
 | |
|           stop();
 | |
|         } else {
 | |
|           n_--;
 | |
|           send_signals(actor_id(this), td::actor::ActorSignals::wakeup());
 | |
|         }
 | |
|       };
 | |
| 
 | |
|      private:
 | |
|       int n_;
 | |
|       Sem *sem_;
 | |
|     };
 | |
|     td::actor::Scheduler scheduler{{8}};
 | |
|     auto sch = td::thread([&] { scheduler.run(); });
 | |
| 
 | |
|     Sem sem;
 | |
|     scheduler.run_in_context_external([&] {
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), 1000, &sem)
 | |
|             .release();
 | |
|         sem.wait();
 | |
|       }
 | |
|       td::actor::SchedulerContext::get()->stop();
 | |
|     });
 | |
| 
 | |
|     sch.join();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   bool use_io_{false};
 | |
| };
 | |
| 
 | |
| class PingPong : public td::Benchmark {
 | |
|  public:
 | |
|   PingPong(bool use_io) : use_io_(use_io) {
 | |
|   }
 | |
|   std::string get_description() const {
 | |
|     return PSTRING() << "PingPong use_io(" << use_io_ << ")";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     if (n < 3) {
 | |
|       n = 3;
 | |
|     }
 | |
|     class Task : public td::actor::Actor {
 | |
|      public:
 | |
|       explicit Task(Sem *sem) : sem_(sem) {
 | |
|       }
 | |
|       void set_peer(td::actor::ActorId<Task> peer) {
 | |
|         peer_ = peer;
 | |
|       }
 | |
|       void ping(int n) {
 | |
|         if (n < 0) {
 | |
|           sem_->post();
 | |
|           stop();
 | |
|         }
 | |
|         send_closure(peer_, &Task::ping, n - 1);
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::actor::ActorId<Task> peer_;
 | |
|       Sem *sem_;
 | |
|     };
 | |
|     td::actor::Scheduler scheduler{{8}};
 | |
|     auto sch = td::thread([&] { scheduler.run(); });
 | |
| 
 | |
|     Sem sem;
 | |
|     scheduler.run_in_context_external([&] {
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         auto a = td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), &sem)
 | |
|                      .release();
 | |
|         auto b = td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), &sem)
 | |
|                      .release();
 | |
|         send_closure(a, &Task::set_peer, b);
 | |
|         send_closure(b, &Task::set_peer, a);
 | |
|         send_closure(a, &Task::ping, 1000);
 | |
|         sem.wait(2);
 | |
|       }
 | |
|       td::actor::SchedulerContext::get()->stop();
 | |
|     });
 | |
| 
 | |
|     sch.join();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   bool use_io_{false};
 | |
| };
 | |
| 
 | |
| class SpawnMany : public td::Benchmark {
 | |
|  public:
 | |
|   SpawnMany(bool use_io) : use_io_(use_io) {
 | |
|   }
 | |
|   std::string get_description() const {
 | |
|     return PSTRING() << "Spawn many use_io(" << use_io_ << ")";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     class Task : public td::actor::Actor {
 | |
|      public:
 | |
|       Task(Sem *sem) : sem_(sem) {
 | |
|       }
 | |
|       void start_up() override {
 | |
|         sem_->post();
 | |
|         stop();
 | |
|       };
 | |
| 
 | |
|      private:
 | |
|       Sem *sem_;
 | |
|     };
 | |
|     td::actor::Scheduler scheduler{{8}};
 | |
|     Sem sem;
 | |
|     auto sch = td::thread([&] { scheduler.run(); });
 | |
|     scheduler.run_in_context_external([&] {
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         int spawn_cnt = 10000;
 | |
|         for (int j = 0; j < spawn_cnt; j++) {
 | |
|           td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), &sem).release();
 | |
|         }
 | |
|         sem.wait(spawn_cnt);
 | |
|       }
 | |
|       td::actor::SchedulerContext::get()->stop();
 | |
|     });
 | |
|     sch.join();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   bool use_io_{false};
 | |
| };
 | |
| 
 | |
| class YieldMany : public td::Benchmark {
 | |
|  public:
 | |
|   YieldMany(bool use_io) : use_io_(use_io) {
 | |
|   }
 | |
|   std::string get_description() const {
 | |
|     return PSTRING() << "Yield many use_io(" << use_io_ << ")";
 | |
|   }
 | |
| 
 | |
|   void run(int n) {
 | |
|     int num_yield = 1000;
 | |
|     unsigned tasks_per_cpu = 50;
 | |
|     unsigned cpu_n = td::thread::hardware_concurrency();
 | |
|     class Task : public td::actor::Actor {
 | |
|      public:
 | |
|       explicit Task(int n, Sem *sem) : n_(n), sem_(sem) {
 | |
|       }
 | |
|       void loop() override {
 | |
|         if (n_ == 0) {
 | |
|           sem_->post();
 | |
|           stop();
 | |
|         } else {
 | |
|           n_--;
 | |
|           yield();
 | |
|         }
 | |
|       };
 | |
| 
 | |
|      private:
 | |
|       int n_;
 | |
|       Sem *sem_;
 | |
|     };
 | |
|     td::actor::Scheduler scheduler{{cpu_n}};
 | |
|     auto sch = td::thread([&] { scheduler.run(); });
 | |
|     unsigned tasks = tasks_per_cpu * cpu_n;
 | |
|     Sem sem;
 | |
|     scheduler.run_in_context_external([&] {
 | |
|       for (int i = 0; i < n; i++) {
 | |
|         for (unsigned j = 0; j < tasks; j++) {
 | |
|           td::actor::create_actor<Task>(td::actor::ActorOptions().with_name("Task").with_poll(use_io_), num_yield, &sem)
 | |
|               .release();
 | |
|         }
 | |
|         sem.wait(tasks);
 | |
|       }
 | |
|     });
 | |
| 
 | |
|     scheduler.run_in_context_external([&] { td::actor::SchedulerContext::get()->stop(); });
 | |
|     sch.join();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   bool use_io_{false};
 | |
| };
 | |
| 
 | |
| int main(int argc, char **argv) {
 | |
|   if (argc > 1) {
 | |
|     if (argv[1][0] == 'a') {
 | |
|       bench_n(MpmcQueueBenchmark2<WaitQueue<td::MpmcQueue<size_t>, td::MpmcEagerWaiter, size_t>>(50, 1), 1 << 26);
 | |
|       //bench_n(MpmcQueueBenchmark<td::MpmcQueue<size_t>>(1, 1), 1 << 26);
 | |
|       //bench_n(MpmcQueueBenchmark<MpQueue>(1, 40), 1 << 20);
 | |
|       //bench_n(MpmcQueueBenchmark<CfQueue<LCRQueue<size_t>>>(1, 40), 1 << 20);
 | |
|     } else {
 | |
|       bench_n(MpmcQueueBenchmark2<WaitQueue<td::MpmcQueue<size_t>, td::MpmcSleepyWaiter, size_t>>(50, 1), 1 << 26);
 | |
|       //bench_n(MpmcQueueBenchmark<td::MpmcQueueOld<size_t>>(1, 1), 1 << 26);
 | |
|       //bench_n(MpmcQueueBenchmark<CfQueue<LCRQueue<size_t>>>(1, 40), 1 << 20);
 | |
|       //bench_n(MpmcQueueBenchmark<CfQueue<FAAArrayQueue<size_t>>>(1, 1), 1 << 26);
 | |
|     }
 | |
|     return 0;
 | |
|   }
 | |
|   bench(YieldMany(false));
 | |
|   bench(YieldMany(true));
 | |
|   bench(SpawnMany(false));
 | |
|   bench(SpawnMany(true));
 | |
|   bench(PingPong(false));
 | |
|   bench(PingPong(true));
 | |
|   bench(ChainedSpawnInplace(false));
 | |
|   bench(ChainedSpawnInplace(true));
 | |
|   bench(ChainedSpawn(false));
 | |
|   bench(ChainedSpawn(true));
 | |
| 
 | |
|   run_queue_bench(10, 10);
 | |
|   run_queue_bench(10, 1);
 | |
|   run_queue_bench(1, 10);
 | |
|   run_queue_bench(1, 1);
 | |
|   run_queue_bench(2, 10);
 | |
|   run_queue_bench(2, 2);
 | |
|   run_queue_bench(10, 1);
 | |
| 
 | |
|   run_queue_bench2(50, 1);
 | |
|   run_queue_bench2(50, 2);
 | |
|   run_queue_bench2(1, 100);
 | |
|   run_queue_bench2(1, 1000);
 | |
|   run_queue_bench2(10, 2);
 | |
|   run_queue_bench2(10, 1000);
 | |
| 
 | |
|   return 0;
 | |
| 
 | |
|   bench(ActorDummyQuery());
 | |
|   bench(ActorExecutorBenchmark());
 | |
|   bench(ActorSignalQuery());
 | |
|   bench(ActorQuery());
 | |
|   bench(ActorTaskQuery());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256Actors>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256Threads>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256Baseline>());
 | |
|   bench(ActorLockerBenchmark(1));
 | |
|   bench(ActorLockerBenchmark(2));
 | |
|   bench(ActorLockerBenchmark(5));
 | |
|   bench(ActorLockerBenchmark(20));
 | |
| 
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<td::MpmcQueue<Cell *>>>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<td::MpmcQueueOld<Cell *>>>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<CfQueueT<FAAArrayQueue<Cell>, Cell>>>());
 | |
| 
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<StupidQueue<Cell *>>>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<CfQueueT<LazyIndexArrayQueue<Cell>, Cell>>>());
 | |
| #if TG_LCR_QUEUE
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<CfQueueT<LCRQueue<Cell>, Cell>>>());
 | |
| #endif
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<MoodyQueue<Cell *>>>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueueCellPtr<BoundedMpmcQueue<Cell *>>>());
 | |
| 
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueue<BoundedMpmcQueue<std::function<void()>>>>());
 | |
|   bench(CalcHashSha256Benchmark<BlockSha256MpmcQueue<td::MpmcQueue<std::function<void()>>>>());
 | |
| 
 | |
|   return 0;
 | |
| }
 |