mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			484 lines
		
	
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			484 lines
		
	
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|     This file is part of TON Blockchain Library.
 | |
| 
 | |
|     TON Blockchain Library is free software: you can redistribute it and/or modify
 | |
|     it under the terms of the GNU Lesser General Public License as published by
 | |
|     the Free Software Foundation, either version 2 of the License, or
 | |
|     (at your option) any later version.
 | |
| 
 | |
|     TON Blockchain Library is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU Lesser General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU Lesser General Public License
 | |
|     along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
|     Copyright 2017-2020 Telegram Systems LLP
 | |
| */
 | |
| #pragma once
 | |
| 
 | |
| #include "td/utils/port/EventFd.h"
 | |
| #include "td/utils/port/thread.h"
 | |
| 
 | |
| #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
 | |
| 
 | |
| #include <atomic>
 | |
| #include <type_traits>
 | |
| #include <utility>
 | |
| 
 | |
| namespace td {
 | |
| 
 | |
| namespace detail {
 | |
| class Backoff {
 | |
|   int cnt = 0;
 | |
| 
 | |
|  public:
 | |
|   bool next() {
 | |
|     // TODO: find out better strategy
 | |
|     // TODO: try adaptive backoff
 | |
|     // TODO: different strategy one core cpu
 | |
|     // return false;
 | |
| 
 | |
|     cnt++;
 | |
|     if (cnt < 1) {  // 50
 | |
|       return true;
 | |
|     } else {
 | |
|       td::this_thread::yield();
 | |
|       return cnt < 3;  // 500
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| class InfBackoff {
 | |
|   int cnt = 0;
 | |
| 
 | |
|  public:
 | |
|   bool next() {
 | |
|     cnt++;
 | |
|     if (cnt < 50) {
 | |
|       return true;
 | |
|     } else {
 | |
|       td::this_thread::yield();
 | |
|       return true;
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| }  // namespace detail
 | |
| 
 | |
| template <class T, int P = 10>
 | |
| class SPSCBlockQueue {
 | |
|  public:
 | |
|   using ValueType = T;
 | |
| 
 | |
|  private:
 | |
|   static constexpr int buffer_size() {
 | |
|     static_assert(P >= 1 && P <= 20, "Bad size of BlockQueue");
 | |
|     return 1 << P;
 | |
|   }
 | |
| 
 | |
|   struct Position {
 | |
|     std::atomic<uint32> i{0};
 | |
|     char pad[64 - sizeof(std::atomic<uint32>)];
 | |
|     uint32 local_writer_i;
 | |
|     char pad2[64 - sizeof(uint32)];
 | |
|     uint32 local_reader_i;
 | |
|     char pad3[64 - sizeof(uint32)];
 | |
| 
 | |
|     void init() {
 | |
|       i = 0;
 | |
|       local_reader_i = 0;
 | |
|       local_writer_i = 0;
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   typename std::aligned_storage<sizeof(ValueType)>::type data_[buffer_size()];
 | |
|   Position writer_;
 | |
|   Position reader_;
 | |
| 
 | |
|   static int fix_i(int i) {
 | |
|     return i & (buffer_size() - 1);
 | |
|   }
 | |
| 
 | |
|   ValueType *at_ptr(int i) {
 | |
|     return reinterpret_cast<ValueType *>(&data_[fix_i(i)]);
 | |
|   }
 | |
| 
 | |
|   ValueType &at(int i) {
 | |
|     return *at_ptr(i);
 | |
|   }
 | |
| 
 | |
|  public:
 | |
|   void init() {
 | |
|     writer_.init();
 | |
|     reader_.init();
 | |
|   }
 | |
| 
 | |
|   void destroy() {
 | |
|   }
 | |
| 
 | |
|   int writer_size() {
 | |
|     return static_cast<int>(writer_.local_reader_i + buffer_size() - writer_.local_writer_i);
 | |
|   }
 | |
| 
 | |
|   bool writer_empty() {
 | |
|     return writer_.local_reader_i + buffer_size() == writer_.local_writer_i;
 | |
|   }
 | |
| 
 | |
|   template <class PutValueType>
 | |
|   void writer_put_unsafe(PutValueType &&value) {
 | |
|     at(writer_.local_writer_i++) = std::forward<PutValueType>(value);
 | |
|   }
 | |
| 
 | |
|   int writer_update() {
 | |
|     writer_.local_reader_i = reader_.i.load(std::memory_order_acquire);
 | |
|     return writer_size();
 | |
|   }
 | |
| 
 | |
|   void writer_flush() {
 | |
|     writer_.i.store(writer_.local_writer_i, std::memory_order_release);
 | |
|   }
 | |
| 
 | |
|   int reader_size() {
 | |
|     return static_cast<int>(reader_.local_writer_i - reader_.local_reader_i);
 | |
|   }
 | |
| 
 | |
|   int reader_empty() {
 | |
|     return reader_.local_writer_i == reader_.local_reader_i;
 | |
|   }
 | |
| 
 | |
|   ValueType reader_get_unsafe() {
 | |
|     return std::move(at(reader_.local_reader_i++));
 | |
|   }
 | |
| 
 | |
|   int reader_update() {
 | |
|     reader_.local_writer_i = writer_.i.load(std::memory_order_acquire);
 | |
|     return reader_size();
 | |
|   }
 | |
| 
 | |
|   void reader_flush() {
 | |
|     reader_.i.store(reader_.local_reader_i, std::memory_order_release);
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <class T, class BlockQueueT = SPSCBlockQueue<T> >
 | |
| class SPSCChainQueue {
 | |
|  public:
 | |
|   using ValueType = T;
 | |
| 
 | |
|   void init() {
 | |
|     head_ = tail_ = create_node();
 | |
|   }
 | |
| 
 | |
|   SPSCChainQueue() = default;
 | |
|   SPSCChainQueue(const SPSCChainQueue &) = delete;
 | |
|   SPSCChainQueue &operator=(const SPSCChainQueue &) = delete;
 | |
|   SPSCChainQueue(SPSCChainQueue &&) = delete;
 | |
|   SPSCChainQueue &operator=(SPSCChainQueue &&) = delete;
 | |
|   ~SPSCChainQueue() {
 | |
|     destroy();
 | |
|   }
 | |
| 
 | |
|   void destroy() {
 | |
|     while (head_ != nullptr) {
 | |
|       Node *to_delete = head_;
 | |
|       head_ = head_->next_;
 | |
|       delete_node(to_delete);
 | |
|     }
 | |
|     tail_ = nullptr;
 | |
|   }
 | |
| 
 | |
|   int writer_size() {
 | |
|     return tail_->q_.writer_size();
 | |
|   }
 | |
| 
 | |
|   bool writer_empty() {
 | |
|     return tail_->q_.writer_empty();
 | |
|   }
 | |
| 
 | |
|   template <class PutValueType>
 | |
|   void writer_put_unsafe(PutValueType &&value) {
 | |
|     tail_->q_.writer_put_unsafe(std::forward<PutValueType>(value));
 | |
|   }
 | |
| 
 | |
|   int writer_update() {
 | |
|     int res = tail_->q_.writer_update();
 | |
|     if (res != 0) {
 | |
|       return res;
 | |
|     }
 | |
| 
 | |
|     writer_flush();
 | |
| 
 | |
|     Node *new_tail = create_node();
 | |
|     tail_->next_ = new_tail;
 | |
|     tail_->is_closed_.store(true, std::memory_order_release);
 | |
|     tail_ = new_tail;
 | |
|     return tail_->q_.writer_update();
 | |
|   }
 | |
| 
 | |
|   void writer_flush() {
 | |
|     tail_->q_.writer_flush();
 | |
|   }
 | |
| 
 | |
|   int reader_size() {
 | |
|     return head_->q_.reader_size();
 | |
|   }
 | |
| 
 | |
|   int reader_empty() {
 | |
|     return head_->q_.reader_empty();
 | |
|   }
 | |
| 
 | |
|   ValueType reader_get_unsafe() {
 | |
|     return std::move(head_->q_.reader_get_unsafe());
 | |
|   }
 | |
| 
 | |
|   int reader_update() {
 | |
|     int res = head_->q_.reader_update();
 | |
|     if (res != 0) {
 | |
|       return res;
 | |
|     }
 | |
| 
 | |
|     if (!head_->is_closed_.load(std::memory_order_acquire)) {
 | |
|       return 0;
 | |
|     }
 | |
| 
 | |
|     res = head_->q_.reader_update();
 | |
|     if (res != 0) {
 | |
|       return res;
 | |
|     }
 | |
| 
 | |
|     // reader_flush();
 | |
| 
 | |
|     Node *old_head = head_;
 | |
|     head_ = head_->next_;
 | |
|     delete_node(old_head);
 | |
| 
 | |
|     return head_->q_.reader_update();
 | |
|   }
 | |
| 
 | |
|   void reader_flush() {
 | |
|     head_->q_.reader_flush();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   struct Node {
 | |
|     BlockQueueT q_;
 | |
|     std::atomic<bool> is_closed_{false};
 | |
|     Node *next_;
 | |
| 
 | |
|     void init() {
 | |
|       q_.init();
 | |
|       is_closed_ = false;
 | |
|       next_ = nullptr;
 | |
|     }
 | |
| 
 | |
|     void destroy() {
 | |
|       q_.destroy();
 | |
|       next_ = nullptr;
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   Node *head_;
 | |
|   char pad[64 - sizeof(Node *)];
 | |
|   Node *tail_;
 | |
|   char pad2[64 - sizeof(Node *)];
 | |
| 
 | |
|   Node *create_node() {
 | |
|     Node *res = new Node();
 | |
|     res->init();
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   void delete_node(Node *node) {
 | |
|     node->destroy();
 | |
|     delete node;
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <class T, class QueueT = SPSCChainQueue<T>, class BackoffT = detail::Backoff>
 | |
| class BackoffQueue : public QueueT {
 | |
|  public:
 | |
|   using ValueType = T;
 | |
| 
 | |
|   template <class PutValueType>
 | |
|   void writer_put(PutValueType &&value) {
 | |
|     if (this->writer_empty()) {
 | |
|       int sz = this->writer_update();
 | |
|       CHECK(sz != 0);
 | |
|     }
 | |
|     this->writer_put_unsafe(std::forward<PutValueType>(value));
 | |
|   }
 | |
| 
 | |
|   int reader_wait() {
 | |
|     BackoffT backoff;
 | |
|     int res = 0;
 | |
|     do {
 | |
|       res = this->reader_update();
 | |
|     } while (res == 0 && backoff.next());
 | |
|     return res;
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <class T, class QueueT = SPSCChainQueue<T> >
 | |
| using InfBackoffQueue = BackoffQueue<T, QueueT, detail::InfBackoff>;
 | |
| 
 | |
| template <class T, class QueueT = BackoffQueue<T> >
 | |
| class PollQueue : public QueueT {
 | |
|  public:
 | |
|   using ValueType = T;
 | |
|   using QueueType = QueueT;
 | |
| 
 | |
|   void init() {
 | |
|     QueueType::init();
 | |
|     event_fd_.init();
 | |
|     wait_state_ = 0;
 | |
|     writer_wait_state_ = 0;
 | |
|   }
 | |
| 
 | |
|   PollQueue() = default;
 | |
|   PollQueue(const PollQueue &) = delete;
 | |
|   PollQueue &operator=(const PollQueue &) = delete;
 | |
|   PollQueue(PollQueue &&) = delete;
 | |
|   PollQueue &operator=(PollQueue &&) = delete;
 | |
|   ~PollQueue() {
 | |
|     destroy_impl();
 | |
|   }
 | |
|   void destroy() {
 | |
|     destroy_impl();
 | |
|     QueueType::destroy();
 | |
|   }
 | |
| 
 | |
|   void writer_flush() {
 | |
|     int old_wait_state = get_wait_state();
 | |
| 
 | |
|     std::atomic_thread_fence(std::memory_order_seq_cst);
 | |
| 
 | |
|     QueueType::writer_flush();
 | |
| 
 | |
|     std::atomic_thread_fence(std::memory_order_seq_cst);
 | |
| 
 | |
|     int wait_state = get_wait_state();
 | |
|     if ((wait_state & 1) && wait_state != writer_wait_state_) {
 | |
|       event_fd_.release();
 | |
|       writer_wait_state_ = old_wait_state;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   EventFd &reader_get_event_fd() {
 | |
|     return event_fd_;
 | |
|   }
 | |
| 
 | |
|   // if 0 is returned than it is useless to rerun it before fd is
 | |
|   // ready to read.
 | |
|   int reader_wait_nonblock() {
 | |
|     int res;
 | |
| 
 | |
|     if ((get_wait_state() & 1) == 0) {
 | |
|       res = this->QueueType::reader_wait();
 | |
|       if (res != 0) {
 | |
|         return res;
 | |
|       }
 | |
| 
 | |
|       inc_wait_state();
 | |
| 
 | |
|       std::atomic_thread_fence(std::memory_order_seq_cst);
 | |
| 
 | |
|       res = this->reader_update();
 | |
|       if (res != 0) {
 | |
|         inc_wait_state();
 | |
|         return res;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     event_fd_.acquire();
 | |
|     std::atomic_thread_fence(std::memory_order_seq_cst);
 | |
|     res = this->reader_update();
 | |
|     if (res != 0) {
 | |
|       inc_wait_state();
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   // Just an example of usage
 | |
|   int reader_wait() {
 | |
|     int res;
 | |
|     while ((res = reader_wait_nonblock()) == 0) {
 | |
|       reader_get_event_fd().wait(1000);
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   EventFd event_fd_;
 | |
|   std::atomic<int> wait_state_{0};
 | |
|   int writer_wait_state_;
 | |
| 
 | |
|   int get_wait_state() {
 | |
|     return wait_state_.load(std::memory_order_relaxed);
 | |
|   }
 | |
| 
 | |
|   void inc_wait_state() {
 | |
|     wait_state_.store(get_wait_state() + 1, std::memory_order_relaxed);
 | |
|   }
 | |
| 
 | |
|   void destroy_impl() {
 | |
|     if (!event_fd_.empty()) {
 | |
|       event_fd_.close();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| }  // namespace td
 | |
| 
 | |
| #else
 | |
| 
 | |
| #include "td/utils/common.h"
 | |
| 
 | |
| namespace td {
 | |
| 
 | |
| // dummy implementation which shouldn't be used
 | |
| 
 | |
| template <class T>
 | |
| class PollQueue {
 | |
|  public:
 | |
|   using ValueType = T;
 | |
| 
 | |
|   void init() {
 | |
|     UNREACHABLE();
 | |
|   }
 | |
| 
 | |
|   template <class PutValueType>
 | |
|   void writer_put(PutValueType &&value) {
 | |
|     UNREACHABLE();
 | |
|   }
 | |
| 
 | |
|   void writer_flush() {
 | |
|     UNREACHABLE();
 | |
|   }
 | |
| 
 | |
|   int reader_wait_nonblock() {
 | |
|     UNREACHABLE();
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   ValueType reader_get_unsafe() {
 | |
|     UNREACHABLE();
 | |
|     return ValueType();
 | |
|   }
 | |
| 
 | |
|   void reader_flush() {
 | |
|     UNREACHABLE();
 | |
|   }
 | |
| 
 | |
|   PollQueue() = default;
 | |
|   PollQueue(const PollQueue &) = delete;
 | |
|   PollQueue &operator=(const PollQueue &) = delete;
 | |
|   PollQueue(PollQueue &&) = delete;
 | |
|   PollQueue &operator=(PollQueue &&) = delete;
 | |
|   ~PollQueue() = default;
 | |
| };
 | |
| 
 | |
| }  // namespace td
 | |
| 
 | |
| #endif
 |