1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

updated vm (breaking compatibility)

- updated vm
- new actor scheduler
- updated tonlib
- updated DNS smartcontract
This commit is contained in:
ton 2020-02-28 14:28:47 +04:00
parent 9e4816e7f6
commit e27fb1e09c
100 changed files with 3692 additions and 1299 deletions

View file

@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
@ -41,6 +41,7 @@ class ActorInfo : private HeapNode, private ListNode {
}
~ActorInfo() {
VLOG(actor) << "Destroy actor [" << name_ << "]";
CHECK(!actor_);
}
bool is_alive() const {

View file

@ -14,13 +14,15 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#include "td/actor/core/CpuWorker.h"
#include "td/actor/core/ActorExecutor.h"
#include "td/actor/core/SchedulerContext.h"
#include "td/actor/core/Scheduler.h" // FIXME: afer LocalQueue is in a separate file
namespace td {
namespace actor {
namespace core {
@ -28,20 +30,64 @@ void CpuWorker::run() {
auto thread_id = get_thread_id();
auto &dispatcher = *SchedulerContext::get();
int yields = 0;
MpmcWaiter::Slot slot;
waiter_.init_slot(slot, thread_id);
while (true) {
SchedulerMessage message;
if (queue_.try_pop(message, thread_id)) {
if (try_pop(message, thread_id)) {
waiter_.stop_wait(slot);
if (!message) {
return;
}
ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue());
yields = waiter_.stop_wait(yields, thread_id);
} else {
yields = waiter_.wait(yields, thread_id);
waiter_.wait(slot);
}
}
}
bool CpuWorker::try_pop_local(SchedulerMessage &message) {
SchedulerMessage::Raw *raw_message;
if (local_queues_[id_].try_pop(raw_message)) {
message = SchedulerMessage(SchedulerMessage::acquire_t{}, raw_message);
return true;
}
return false;
}
bool CpuWorker::try_pop_global(SchedulerMessage &message, size_t thread_id) {
SchedulerMessage::Raw *raw_message;
if (queue_.try_pop(raw_message, thread_id)) {
message = SchedulerMessage(SchedulerMessage::acquire_t{}, raw_message);
return true;
}
return false;
}
bool CpuWorker::try_pop(SchedulerMessage &message, size_t thread_id) {
if (++cnt_ == 51) {
cnt_ = 0;
if (try_pop_global(message, thread_id) || try_pop_local(message)) {
return true;
}
} else {
if (try_pop_local(message) || try_pop_global(message, thread_id)) {
return true;
}
}
for (size_t i = 1; i < local_queues_.size(); i++) {
size_t pos = (i + id_) % local_queues_.size();
SchedulerMessage::Raw *raw_message;
if (local_queues_[id_].steal(raw_message, local_queues_[pos])) {
message = SchedulerMessage(SchedulerMessage::acquire_t{}, raw_message);
return true;
}
}
return false;
}
} // namespace core
} // namespace actor
} // namespace td

View file

@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
@ -22,19 +22,32 @@
#include "td/utils/MpmcQueue.h"
#include "td/utils/MpmcWaiter.h"
#include "td/utils/Span.h"
namespace td {
namespace actor {
namespace core {
template <class T>
struct LocalQueue;
class CpuWorker {
public:
CpuWorker(MpmcQueue<SchedulerMessage> &queue, MpmcWaiter &waiter) : queue_(queue), waiter_(waiter) {
CpuWorker(MpmcQueue<SchedulerMessage::Raw *> &queue, MpmcWaiter &waiter, size_t id,
MutableSpan<LocalQueue<SchedulerMessage::Raw *>> local_queues)
: queue_(queue), waiter_(waiter), id_(id), local_queues_(local_queues) {
}
void run();
private:
MpmcQueue<SchedulerMessage> &queue_;
MpmcQueue<SchedulerMessage::Raw *> &queue_;
MpmcWaiter &waiter_;
size_t id_;
MutableSpan<LocalQueue<SchedulerMessage::Raw *>> local_queues_;
size_t cnt_{0};
bool try_pop(SchedulerMessage &message, size_t thread_id);
bool try_pop_local(SchedulerMessage &message);
bool try_pop_global(SchedulerMessage &message, size_t thread_id);
};
} // namespace core
} // namespace actor

View file

@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#include "td/actor/core/Scheduler.h"
@ -24,6 +24,7 @@
namespace td {
namespace actor {
namespace core {
Scheduler::Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count)
: scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) {
scheduler_group_info_->active_scheduler_count++;
@ -31,17 +32,21 @@ Scheduler::Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, S
info_->id = id;
if (cpu_threads_count != 0) {
info_->cpu_threads_count = cpu_threads_count;
info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage>>(1024, max_thread_count());
info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage::Raw *>>(1024, max_thread_count());
info_->cpu_queue_waiter = std::make_unique<MpmcWaiter>();
info_->cpu_local_queue = std::vector<LocalQueue<SchedulerMessage::Raw *>>(cpu_threads_count);
}
info_->io_queue = std::make_unique<MpscPollableQueue<SchedulerMessage>>();
info_->io_queue->init();
info_->cpu_workers.resize(cpu_threads_count);
td::uint8 cpu_worker_id = 0;
for (auto &worker : info_->cpu_workers) {
worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu, true);
worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu, true, CpuWorkerId{cpu_worker_id});
cpu_worker_id++;
}
info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io, !info_->cpu_workers.empty());
info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io, !info_->cpu_workers.empty(), CpuWorkerId{});
poll_.init();
io_worker_ = std::make_unique<IoWorker>(*info_->io_queue);
@ -62,8 +67,9 @@ Scheduler::~Scheduler() {
void Scheduler::start() {
for (size_t i = 0; i < cpu_threads_.size(); i++) {
cpu_threads_[i] = td::thread([this, i] {
this->run_in_context_impl(*this->info_->cpu_workers[i],
[this] { CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter).run(); });
this->run_in_context_impl(*this->info_->cpu_workers[i], [this, i] {
CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter, i, info_->cpu_local_queue).run();
});
});
cpu_threads_[i].set_name(PSLICE() << "#" << info_->id.value() << ":cpu#" << i);
}
@ -121,9 +127,14 @@ void Scheduler::do_stop() {
scheduler_group_info_->active_scheduler_count_condition_variable.notify_all();
}
Scheduler::ContextImpl::ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id,
Scheduler::ContextImpl::ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id, CpuWorkerId cpu_worker_id,
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap)
: creator_(creator), scheduler_id_(scheduler_id), scheduler_group_(scheduler_group), poll_(poll), heap_(heap) {
: creator_(creator)
, scheduler_id_(scheduler_id)
, cpu_worker_id_(cpu_worker_id)
, scheduler_group_(scheduler_group)
, poll_(poll)
, heap_(heap) {
}
SchedulerId Scheduler::ContextImpl::get_scheduler_id() const {
@ -138,7 +149,18 @@ void Scheduler::ContextImpl::add_to_queue(ActorInfoPtr actor_info_ptr, Scheduler
if (need_poll || !info.cpu_queue) {
info.io_queue->writer_put(std::move(actor_info_ptr));
} else {
info.cpu_queue->push(std::move(actor_info_ptr), get_thread_id());
if (scheduler_id == get_scheduler_id() && cpu_worker_id_.is_valid()) {
// may push local
CHECK(actor_info_ptr);
auto raw = actor_info_ptr.release();
auto should_notify = info.cpu_local_queue[cpu_worker_id_.value()].push(
raw, [&](auto value) { info.cpu_queue->push(value, get_thread_id()); });
if (should_notify) {
info.cpu_queue_waiter->notify();
}
return;
}
info.cpu_queue->push(actor_info_ptr.release(), get_thread_id());
info.cpu_queue_waiter->notify();
}
}
@ -254,13 +276,26 @@ void Scheduler::close_scheduler_group(SchedulerGroupInfo &group_info) {
}
// Drain cpu queue
for (auto &q : scheduler_info.cpu_local_queue) {
auto &cpu_queue = q;
while (true) {
SchedulerMessage::Raw *raw_message;
if (!cpu_queue.try_pop(raw_message)) {
break;
}
SchedulerMessage(SchedulerMessage::acquire_t{}, raw_message);
// message's destructor is called
queues_are_empty = false;
}
}
if (scheduler_info.cpu_queue) {
auto &cpu_queue = *scheduler_info.cpu_queue;
while (true) {
SchedulerMessage message;
if (!cpu_queue.try_pop(message, get_thread_id())) {
SchedulerMessage::Raw *raw_message;
if (!cpu_queue.try_pop(raw_message, get_thread_id())) {
break;
}
SchedulerMessage(SchedulerMessage::acquire_t{}, raw_message);
// message's destructor is called
queues_are_empty = false;
}

View file

@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
@ -38,9 +38,11 @@
#include "td/utils/List.h"
#include "td/utils/logging.h"
#include "td/utils/MpmcQueue.h"
#include "td/utils/StealingQueue.h"
#include "td/utils/MpmcWaiter.h"
#include "td/utils/MpscLinkQueue.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/optional.h"
#include "td/utils/port/Poll.h"
#include "td/utils/port/detail/Iocp.h"
#include "td/utils/port/thread.h"
@ -66,16 +68,52 @@ class IoWorker;
struct WorkerInfo {
enum class Type { Io, Cpu } type{Type::Io};
WorkerInfo() = default;
explicit WorkerInfo(Type type, bool allow_shared) : type(type), actor_info_creator(allow_shared) {
explicit WorkerInfo(Type type, bool allow_shared, CpuWorkerId cpu_worker_id)
: type(type), actor_info_creator(allow_shared), cpu_worker_id(cpu_worker_id) {
}
ActorInfoCreator actor_info_creator;
CpuWorkerId cpu_worker_id;
};
template <class T>
struct LocalQueue {
public:
template <class F>
bool push(T value, F &&overflow_f) {
auto res = std::move(next_);
next_ = std::move(value);
if (res) {
queue_.local_push(res.unwrap(), overflow_f);
return true;
}
return false;
}
bool try_pop(T &message) {
if (!next_) {
return queue_.local_pop(message);
}
message = next_.unwrap();
return true;
}
bool steal(T &message, LocalQueue<T> &other) {
return queue_.steal(message, other.queue_);
}
private:
td::optional<T> next_;
StealingQueue<T> queue_;
char pad[TD_CONCURRENCY_PAD - sizeof(optional<T>)];
};
struct SchedulerInfo {
SchedulerId id;
// will be read by all workers is any thread
std::unique_ptr<MpmcQueue<SchedulerMessage>> cpu_queue;
std::unique_ptr<MpmcQueue<SchedulerMessage::Raw *>> cpu_queue;
std::unique_ptr<MpmcWaiter> cpu_queue_waiter;
std::vector<LocalQueue<SchedulerMessage::Raw *>> cpu_local_queue;
//std::vector<td::StealingQueue<SchedulerMessage>> cpu_stealing_queue;
// only scheduler itself may read from io_queue_
std::unique_ptr<MpscPollableQueue<SchedulerMessage>> io_queue;
size_t cpu_threads_count{0};
@ -156,11 +194,12 @@ class Scheduler {
class ContextImpl : public SchedulerContext {
public:
ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id, SchedulerGroupInfo *scheduler_group, Poll *poll,
KHeap<double> *heap);
ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id, CpuWorkerId cpu_worker_id,
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap);
SchedulerId get_scheduler_id() const override;
void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override;
ActorInfoCreator &get_actor_info_creator() override;
bool has_poll() override;
@ -181,6 +220,7 @@ class Scheduler {
ActorInfoCreator *creator_;
SchedulerId scheduler_id_;
CpuWorkerId cpu_worker_id_;
SchedulerGroupInfo *scheduler_group_;
Poll *poll_;
@ -193,8 +233,8 @@ class Scheduler {
td::detail::Iocp::Guard iocp_guard(&scheduler_group_info_->iocp);
#endif
bool is_io_worker = worker_info.type == WorkerInfo::Type::Io;
ContextImpl context(&worker_info.actor_info_creator, info_->id, scheduler_group_info_.get(),
is_io_worker ? &poll_ : nullptr, is_io_worker ? &heap_ : nullptr);
ContextImpl context(&worker_info.actor_info_creator, info_->id, worker_info.cpu_worker_id,
scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr, is_io_worker ? &heap_ : nullptr);
SchedulerContext::Guard guard(&context);
f();
}

View file

@ -14,7 +14,7 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
@ -26,8 +26,7 @@ namespace actor {
namespace core {
class SchedulerId {
public:
SchedulerId() : id_(-1) {
}
SchedulerId() = default;
explicit SchedulerId(uint8 id) : id_(id) {
}
bool is_valid() const {
@ -42,7 +41,27 @@ class SchedulerId {
}
private:
int32 id_{0};
int32 id_{-1};
};
class CpuWorkerId {
public:
CpuWorkerId() = default;
explicit CpuWorkerId(uint8 id) : id_(id) {
}
bool is_valid() const {
return id_ >= 0;
}
uint8 value() const {
CHECK(is_valid());
return static_cast<uint8>(id_);
}
bool operator==(CpuWorkerId other) const {
return id_ == other.id_;
}
private:
int32 id_{-1};
};
} // namespace core
} // namespace actor