mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
updated submodules, bugfixes
- added new fift/func code for validator complaint creation - bugfixes in validator - updates in tonlib - new versions of rocksdb/abseil - hardfork support
This commit is contained in:
parent
16a4566091
commit
9f008b129f
129 changed files with 8438 additions and 879 deletions
|
@ -44,8 +44,24 @@ class GetArg<R (C::*)(Arg) const> {
|
|||
using type = Arg;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct GetRet : public GetRet<decltype(&T::operator())> {};
|
||||
|
||||
template <class C, class R, class... Arg>
|
||||
class GetRet<R (C::*)(Arg...)> {
|
||||
public:
|
||||
using type = R;
|
||||
};
|
||||
template <class C, class R, class... Arg>
|
||||
class GetRet<R (C::*)(Arg...) const> {
|
||||
public:
|
||||
using type = R;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
using get_arg_t = std::decay_t<typename GetArg<T>::type>;
|
||||
template <class T>
|
||||
using get_ret_t = std::decay_t<typename GetRet<T>::type>;
|
||||
|
||||
template <class T>
|
||||
struct DropResult {
|
||||
|
@ -131,6 +147,7 @@ constexpr bool is_promise_interface_ptr() {
|
|||
template <class ValueT, class FunctionT>
|
||||
class LambdaPromise : public PromiseInterface<ValueT> {
|
||||
public:
|
||||
using ArgT = ValueT;
|
||||
void set_value(ValueT &&value) override {
|
||||
CHECK(has_lambda_.get());
|
||||
do_ok(std::move(value));
|
||||
|
@ -288,12 +305,6 @@ class Promise {
|
|||
std::unique_ptr<PromiseInterface<T>> promise_;
|
||||
};
|
||||
|
||||
template <class F>
|
||||
auto make_promise(F &&f) {
|
||||
using ValueT = detail::drop_result_t<detail::get_arg_t<F>>;
|
||||
return Promise<ValueT>(promise_interface_ptr(std::forward<F>(f)));
|
||||
}
|
||||
|
||||
namespace detail {
|
||||
template <class... ArgsT>
|
||||
class JoinPromise : public PromiseInterface<Unit> {
|
||||
|
@ -331,6 +342,16 @@ class PromiseCreator {
|
|||
}
|
||||
};
|
||||
|
||||
template <class F>
|
||||
auto make_promise(F &&f) {
|
||||
using ValueT = typename decltype(PromiseCreator::lambda(std::move(f)))::ArgT;
|
||||
return Promise<ValueT>(PromiseCreator::lambda(std::move(f)));
|
||||
}
|
||||
template <class T>
|
||||
auto make_promise(Promise<T> &&f) {
|
||||
return std::move(f);
|
||||
}
|
||||
|
||||
template <class T = Unit>
|
||||
class SafePromise {
|
||||
public:
|
||||
|
@ -356,4 +377,145 @@ class SafePromise {
|
|||
Promise<T> promise_;
|
||||
Result<T> result_;
|
||||
};
|
||||
|
||||
template <class PromiseT, typename... ArgsT>
|
||||
class PromiseMerger;
|
||||
|
||||
template <class F>
|
||||
struct SplitPromise {
|
||||
using PromiseT = decltype(make_promise(std::declval<F>()));
|
||||
using ArgT = typename PromiseT::ArgT;
|
||||
|
||||
template <class S, class T>
|
||||
static std::pair<Promise<S>, Promise<T>> split(std::pair<S, T>);
|
||||
template <class... ArgsT>
|
||||
static std::tuple<Promise<ArgsT>...> split(std::tuple<ArgsT...>);
|
||||
using SplittedT = decltype(split(std::declval<ArgT>()));
|
||||
|
||||
template <class S, class T>
|
||||
static PromiseMerger<PromiseT, S, T> merger(std::pair<S, T>);
|
||||
template <class... ArgsT>
|
||||
static PromiseMerger<PromiseT, ArgsT...> merger(std::tuple<ArgsT...>);
|
||||
using MergerT = decltype(merger(std::declval<ArgT>()));
|
||||
};
|
||||
|
||||
template <class PromiseT, typename... ArgsT>
|
||||
class PromiseMerger : public std::enable_shared_from_this<PromiseMerger<PromiseT, ArgsT...>> {
|
||||
public:
|
||||
std::tuple<Result<ArgsT>...> args_;
|
||||
PromiseT promise_;
|
||||
|
||||
PromiseMerger(PromiseT promise) : promise_(std::move(promise)) {
|
||||
}
|
||||
~PromiseMerger() {
|
||||
td::Status status;
|
||||
tuple_for_each(args_, [&status](auto &&arg) {
|
||||
if (status.is_error()) {
|
||||
return;
|
||||
}
|
||||
if (arg.is_error()) {
|
||||
status = arg.move_as_error();
|
||||
}
|
||||
});
|
||||
if (status.is_error()) {
|
||||
promise_.set_error(std::move(status));
|
||||
return;
|
||||
}
|
||||
call_tuple([this](auto &&... args) { promise_.set_value({args.move_as_ok()...}); }, std::move(args_));
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Promise<typename T::ValueT> make_promise(T &arg) {
|
||||
return [&arg, self = this->shared_from_this()](auto res) { arg = std::move(res); };
|
||||
}
|
||||
|
||||
template <class R>
|
||||
auto split() {
|
||||
return call_tuple([this](auto &&... arg) { return R{this->make_promise(arg)...}; }, std::move(args_));
|
||||
}
|
||||
};
|
||||
|
||||
template <class F>
|
||||
auto split_promise(F &&f) {
|
||||
auto merger = std::make_shared<typename SplitPromise<F>::MergerT>(std::move(f));
|
||||
return merger->template split<typename SplitPromise<F>::SplittedT>();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
struct PromiseFuture {
|
||||
Result<Promise<T>> promise_;
|
||||
Result<T> result_;
|
||||
~PromiseFuture() {
|
||||
if (promise_.is_ok()) {
|
||||
promise_.move_as_ok().set_result(std::move(result_));
|
||||
} else {
|
||||
LOG(ERROR) << "Lost PromiseFuture";
|
||||
}
|
||||
}
|
||||
};
|
||||
template <class T>
|
||||
struct Future;
|
||||
|
||||
template <class T>
|
||||
std::pair<Promise<T>, Future<T>> make_promise_future();
|
||||
|
||||
template <class T>
|
||||
struct Future {
|
||||
Promise<Promise<T>> promise_;
|
||||
Future(Promise<Promise<T>> promise) : promise_(std::move(promise)) {
|
||||
}
|
||||
|
||||
void finish(Promise<T> promise) {
|
||||
promise_.set_value(std::move(promise));
|
||||
}
|
||||
|
||||
template <class F>
|
||||
auto map(F &&f) {
|
||||
using R = detail::drop_result_t<decltype(f(std::declval<T>()))>;
|
||||
auto pf = make_promise_future<R>();
|
||||
promise_.set_value([p = std::move(pf.first), f = std::move(f)](Result<T> res) mutable {
|
||||
TRY_RESULT_PROMISE(p, x, std::move(res));
|
||||
p.set_result(f(std::move(x)));
|
||||
});
|
||||
|
||||
return std::move(pf.second);
|
||||
}
|
||||
|
||||
template <class F>
|
||||
auto fmap(F &&f) {
|
||||
return flatten(map(std::move(f)));
|
||||
}
|
||||
|
||||
template <class X>
|
||||
static Future<X> flatten(Future<Future<X>> ff) {
|
||||
auto pf = make_promise_future<X>();
|
||||
ff.promise_.set_value([p = std::move(pf.first)](Result<Future<X>> r_f) mutable {
|
||||
TRY_RESULT_PROMISE(p, f, std::move(r_f));
|
||||
// Promise<X> p
|
||||
// Future<X> f
|
||||
f.promise_.set_value(std::move(p));
|
||||
});
|
||||
return std::move(pf.second);
|
||||
}
|
||||
};
|
||||
|
||||
template <class T>
|
||||
Future<T> make_future(T &&value) {
|
||||
return Future<T>([value = std::move(value)](Result<Promise<T>> r_promise) mutable {
|
||||
if (r_promise.is_ok()) {
|
||||
r_promise.move_as_ok().set_value(std::move(value));
|
||||
} else {
|
||||
LOG(ERROR) << "Lost future";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <class T>
|
||||
std::pair<Promise<T>, Future<T>> make_promise_future() {
|
||||
auto pf = std::make_shared<PromiseFuture<T>>();
|
||||
Future<T> future([pf](Result<Promise<T>> res) mutable { pf->promise_ = std::move(res); });
|
||||
Promise<T> promise = [pf = std::move(pf)](Result<T> res) mutable { pf->result_ = std::move(res); };
|
||||
return std::make_pair(std::move(promise), std::move(future));
|
||||
}
|
||||
|
||||
} // namespace td
|
||||
|
|
|
@ -100,6 +100,28 @@ void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
|||
|
||||
#endif
|
||||
|
||||
template <class ActorIdT, class FunctionT, class... ArgsT, class FunctionClassT = member_function_class_t<FunctionT>,
|
||||
size_t argument_count = member_function_argument_count<FunctionT>(),
|
||||
std::enable_if_t<argument_count == sizeof...(ArgsT), bool> with_promise = false>
|
||||
auto future_send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
||||
using R = ::td::detail::get_ret_t<std::decay_t<FunctionT>>;
|
||||
auto pf = make_promise_future<R>();
|
||||
send_closure(std::forward<ActorIdT>(actor_id), std::move(function), std::forward<ArgsT>(args)...,
|
||||
std::move(pf.first));
|
||||
return std::move(pf.second);
|
||||
}
|
||||
|
||||
template <class R, class ActorIdT, class FunctionT, class... ArgsT,
|
||||
class FunctionClassT = member_function_class_t<FunctionT>,
|
||||
size_t argument_count = member_function_argument_count<FunctionT>(),
|
||||
std::enable_if_t<argument_count != sizeof...(ArgsT), bool> with_promise = true>
|
||||
Future<R> future_send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
||||
auto pf = make_promise_future<R>();
|
||||
send_closure(std::forward<ActorIdT>(actor_id), std::move(function), std::forward<ArgsT>(args)...,
|
||||
std::move(pf.first));
|
||||
return std::move(pf.second);
|
||||
}
|
||||
|
||||
template <typename ActorIdT, typename FunctionT, typename... ArgsT>
|
||||
bool send_closure_bool(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
||||
send_closure(std::forward<ActorIdT>(actor_id), function, std::forward<ArgsT>(args)...);
|
||||
|
|
|
@ -63,6 +63,36 @@ class ActorSignals {
|
|||
using core::Actor;
|
||||
using core::SchedulerContext;
|
||||
using core::SchedulerId;
|
||||
using core::set_debug;
|
||||
|
||||
struct Debug {
|
||||
public:
|
||||
Debug() = default;
|
||||
Debug(std::shared_ptr<core::SchedulerGroupInfo> group_info) : group_info_(std::move(group_info)) {
|
||||
}
|
||||
template <class F>
|
||||
void for_each(F &&f) {
|
||||
for (auto &scheduler : group_info_->schedulers) {
|
||||
f(scheduler.io_worker->debug);
|
||||
for (auto &cpu : scheduler.cpu_workers) {
|
||||
f(cpu->debug);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void dump() {
|
||||
for_each([](core::Debug &debug) {
|
||||
core::DebugInfo info;
|
||||
debug.read(info);
|
||||
if (info.is_active) {
|
||||
LOG(ERROR) << info.name << " " << td::format::as_time(Time::now() - info.start_at);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<core::SchedulerGroupInfo> group_info_;
|
||||
};
|
||||
|
||||
class Scheduler {
|
||||
public:
|
||||
|
@ -110,6 +140,10 @@ class Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
Debug get_debug() {
|
||||
return Debug{group_info_};
|
||||
}
|
||||
|
||||
bool run() {
|
||||
start();
|
||||
while (schedulers_[0]->run(10)) {
|
||||
|
|
|
@ -32,6 +32,7 @@ void CpuWorker::run() {
|
|||
|
||||
MpmcWaiter::Slot slot;
|
||||
waiter_.init_slot(slot, thread_id);
|
||||
auto &debug = dispatcher.get_debug();
|
||||
while (true) {
|
||||
SchedulerMessage message;
|
||||
if (try_pop(message, thread_id)) {
|
||||
|
@ -39,6 +40,7 @@ void CpuWorker::run() {
|
|||
if (!message) {
|
||||
return;
|
||||
}
|
||||
auto lock = debug.start(message->get_name());
|
||||
ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue());
|
||||
} else {
|
||||
waiter_.wait(slot);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "td/actor/core/IoWorker.h"
|
||||
|
||||
#include "td/actor/core/ActorExecutor.h"
|
||||
#include "td/actor/core/Scheduler.h"
|
||||
|
||||
namespace td {
|
||||
namespace actor {
|
||||
|
@ -42,6 +43,7 @@ bool IoWorker::run_once(double timeout) {
|
|||
auto &poll = SchedulerContext::get()->get_poll();
|
||||
#endif
|
||||
auto &heap = SchedulerContext::get()->get_heap();
|
||||
auto &debug = SchedulerContext::get()->get_debug();
|
||||
|
||||
auto now = Time::now(); // update Time::now_cached()
|
||||
while (!heap.empty() && heap.top_key() <= now) {
|
||||
|
@ -49,6 +51,7 @@ bool IoWorker::run_once(double timeout) {
|
|||
auto *actor_info = ActorInfo::from_heap_node(heap_node);
|
||||
|
||||
auto id = actor_info->unpin();
|
||||
auto lock = debug.start(actor_info->get_name());
|
||||
ActorExecutor executor(*actor_info, dispatcher, ActorExecutor::Options().with_has_poll(true));
|
||||
if (executor.can_send_immediate()) {
|
||||
executor.send_immediate(ActorSignals::one(ActorSignals::Alarm));
|
||||
|
@ -68,6 +71,7 @@ bool IoWorker::run_once(double timeout) {
|
|||
dispatcher.set_alarm_timestamp(message);
|
||||
continue;
|
||||
}
|
||||
auto lock = debug.start(message->get_name());
|
||||
ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue().with_has_poll(true));
|
||||
}
|
||||
queue_.reader_flush();
|
||||
|
|
|
@ -25,6 +25,15 @@ namespace td {
|
|||
namespace actor {
|
||||
namespace core {
|
||||
|
||||
std::atomic<bool> debug;
|
||||
void set_debug(bool flag) {
|
||||
debug = flag;
|
||||
}
|
||||
|
||||
bool need_debug() {
|
||||
return debug.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
Scheduler::Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count)
|
||||
: scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) {
|
||||
scheduler_group_info_->active_scheduler_count++;
|
||||
|
@ -128,13 +137,14 @@ void Scheduler::do_stop() {
|
|||
}
|
||||
|
||||
Scheduler::ContextImpl::ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id, CpuWorkerId cpu_worker_id,
|
||||
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap)
|
||||
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap, Debug *debug)
|
||||
: creator_(creator)
|
||||
, scheduler_id_(scheduler_id)
|
||||
, cpu_worker_id_(cpu_worker_id)
|
||||
, scheduler_group_(scheduler_group)
|
||||
, poll_(poll)
|
||||
, heap_(heap) {
|
||||
, heap_(heap)
|
||||
, debug_(debug) {
|
||||
}
|
||||
|
||||
SchedulerId Scheduler::ContextImpl::get_scheduler_id() const {
|
||||
|
@ -184,6 +194,9 @@ KHeap<double> &Scheduler::ContextImpl::get_heap() {
|
|||
CHECK(has_heap());
|
||||
return *heap_;
|
||||
}
|
||||
Debug &Scheduler::ContextImpl::get_debug() {
|
||||
return *debug_;
|
||||
}
|
||||
|
||||
void Scheduler::ContextImpl::set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) {
|
||||
// Ideas for optimization
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "td/actor/core/SchedulerId.h"
|
||||
#include "td/actor/core/SchedulerMessage.h"
|
||||
|
||||
#include "td/utils/AtomicRead.h"
|
||||
#include "td/utils/Closure.h"
|
||||
#include "td/utils/common.h"
|
||||
#include "td/utils/format.h"
|
||||
|
@ -65,6 +66,54 @@ namespace actor {
|
|||
namespace core {
|
||||
class IoWorker;
|
||||
|
||||
struct DebugInfo {
|
||||
bool is_active{false};
|
||||
double start_at{0};
|
||||
static constexpr size_t name_size{32};
|
||||
char name[name_size] = {};
|
||||
void set_name(td::Slice from) {
|
||||
from.truncate(name_size - 1);
|
||||
std::memcpy(name, from.data(), from.size());
|
||||
name[from.size()] = 0;
|
||||
}
|
||||
};
|
||||
|
||||
void set_debug(bool flag);
|
||||
bool need_debug();
|
||||
|
||||
struct Debug {
|
||||
public:
|
||||
bool is_on() const {
|
||||
return need_debug();
|
||||
}
|
||||
struct Destructor {
|
||||
void operator()(Debug *info) {
|
||||
info->info_.lock().value().is_active = false;
|
||||
}
|
||||
};
|
||||
|
||||
void read(DebugInfo &info) {
|
||||
info_.read(info);
|
||||
}
|
||||
|
||||
std::unique_ptr<Debug, Destructor> start(td::Slice name) {
|
||||
if (!is_on()) {
|
||||
return {};
|
||||
}
|
||||
{
|
||||
auto lock = info_.lock();
|
||||
auto &value = lock.value();
|
||||
value.is_active = true;
|
||||
value.start_at = Time::now();
|
||||
value.set_name(name);
|
||||
}
|
||||
return std::unique_ptr<Debug, Destructor>(this);
|
||||
}
|
||||
|
||||
private:
|
||||
AtomicRead<DebugInfo> info_;
|
||||
};
|
||||
|
||||
struct WorkerInfo {
|
||||
enum class Type { Io, Cpu } type{Type::Io};
|
||||
WorkerInfo() = default;
|
||||
|
@ -73,6 +122,7 @@ struct WorkerInfo {
|
|||
}
|
||||
ActorInfoCreator actor_info_creator;
|
||||
CpuWorkerId cpu_worker_id;
|
||||
Debug debug;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
|
@ -195,7 +245,7 @@ class Scheduler {
|
|||
class ContextImpl : public SchedulerContext {
|
||||
public:
|
||||
ContextImpl(ActorInfoCreator *creator, SchedulerId scheduler_id, CpuWorkerId cpu_worker_id,
|
||||
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap);
|
||||
SchedulerGroupInfo *scheduler_group, Poll *poll, KHeap<double> *heap, Debug *debug);
|
||||
|
||||
SchedulerId get_scheduler_id() const override;
|
||||
void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override;
|
||||
|
@ -208,6 +258,8 @@ class Scheduler {
|
|||
bool has_heap() override;
|
||||
KHeap<double> &get_heap() override;
|
||||
|
||||
Debug &get_debug() override;
|
||||
|
||||
void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) override;
|
||||
|
||||
bool is_stop_requested() override;
|
||||
|
@ -225,6 +277,8 @@ class Scheduler {
|
|||
Poll *poll_;
|
||||
|
||||
KHeap<double> *heap_;
|
||||
|
||||
Debug *debug_;
|
||||
};
|
||||
|
||||
template <class F>
|
||||
|
@ -234,7 +288,8 @@ class Scheduler {
|
|||
#endif
|
||||
bool is_io_worker = worker_info.type == WorkerInfo::Type::Io;
|
||||
ContextImpl context(&worker_info.actor_info_creator, info_->id, worker_info.cpu_worker_id,
|
||||
scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr, is_io_worker ? &heap_ : nullptr);
|
||||
scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr, is_io_worker ? &heap_ : nullptr,
|
||||
&worker_info.debug);
|
||||
SchedulerContext::Guard guard(&context);
|
||||
f();
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ class SchedulerDispatcher {
|
|||
virtual void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) = 0;
|
||||
};
|
||||
|
||||
struct Debug;
|
||||
class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispatcher {
|
||||
public:
|
||||
virtual ~SchedulerContext() = default;
|
||||
|
@ -55,6 +56,9 @@ class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispa
|
|||
// Stop all schedulers
|
||||
virtual bool is_stop_requested() = 0;
|
||||
virtual void stop() = 0;
|
||||
|
||||
// Debug
|
||||
virtual Debug &get_debug() = 0;
|
||||
};
|
||||
} // namespace core
|
||||
} // namespace actor
|
||||
|
|
|
@ -675,7 +675,8 @@ TEST(Actor2, actor_function_result) {
|
|||
public:
|
||||
A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
|
||||
}
|
||||
void on_result(uint32 x, uint32 y) {
|
||||
void on_result(uint32 x, td::Result<uint32> r_y) {
|
||||
auto y = r_y.move_as_ok();
|
||||
LOG_CHECK(x * x == y) << x << " " << y;
|
||||
if (--cnt_ == 0) {
|
||||
stop();
|
||||
|
@ -683,7 +684,7 @@ TEST(Actor2, actor_function_result) {
|
|||
}
|
||||
void start_up() {
|
||||
b_ = create_actor<B>(ActorOptions().with_name("B"));
|
||||
cnt_ = 3;
|
||||
cnt_ = 5;
|
||||
send_closure(b_, &B::query, 3, [a = std::make_unique<int>(), self = actor_id(this)](td::Result<uint32> y) {
|
||||
LOG_IF(ERROR, y.is_error()) << y.error();
|
||||
send_closure(self, &A::on_result, 3, y.ok());
|
||||
|
@ -696,6 +697,11 @@ TEST(Actor2, actor_function_result) {
|
|||
CHECK(!self.empty());
|
||||
send_closure(self, &A::on_result, 5, y);
|
||||
});
|
||||
auto future = future_send_closure(b_, &B::query, 7);
|
||||
future.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
|
||||
//TODO: deduce Future type (i.e. Future<td::uint32>)
|
||||
auto future2 = future_send_closure<td::uint32>(b_, &B::query_async, 7);
|
||||
future2.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -714,12 +720,12 @@ TEST(Actor2, actor_function_result) {
|
|||
}
|
||||
|
||||
TEST(Actor2, actor_ping_pong) {
|
||||
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
|
||||
core::Scheduler scheduler{group_info, SchedulerId{0}, 3};
|
||||
Scheduler scheduler{{3}, Scheduler::Paused};
|
||||
sb.clear();
|
||||
scheduler.start();
|
||||
|
||||
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
|
||||
td::actor::set_debug(true);
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
scheduler.run_in_context([watcher] {
|
||||
class PingPong : public Actor {
|
||||
|
@ -781,9 +787,9 @@ TEST(Actor2, actor_ping_pong) {
|
|||
});
|
||||
}
|
||||
watcher.reset();
|
||||
while (scheduler.run(1000)) {
|
||||
while (scheduler.run(0.1)) {
|
||||
//scheduler.get_debug().dump();
|
||||
}
|
||||
core::Scheduler::close_scheduler_group(*group_info);
|
||||
sb.clear();
|
||||
}
|
||||
|
||||
|
|
|
@ -135,6 +135,78 @@ TEST(Actor, safe_promise) {
|
|||
ASSERT_EQ(res, 3);
|
||||
}
|
||||
|
||||
TEST(Actor, split_promise) {
|
||||
using td::Promise;
|
||||
using td::Result;
|
||||
using td::split_promise;
|
||||
using td::SplitPromise;
|
||||
{
|
||||
td::optional<std::pair<int, double>> x;
|
||||
auto pair = [&](Result<std::pair<int, double>> res) { x = res.move_as_ok(); };
|
||||
static_assert(std::is_same<SplitPromise<decltype(pair)>::ArgT, std::pair<int, double>>::value, "A");
|
||||
static_assert(
|
||||
std::is_same<SplitPromise<decltype(pair)>::SplittedT, std::pair<Promise<int>, Promise<double>>>::value, "A");
|
||||
auto splitted = split_promise(pair);
|
||||
static_assert(std::is_same<decltype(splitted), std::pair<Promise<int>, Promise<double>>>::value, "A");
|
||||
|
||||
splitted.first.set_value(1);
|
||||
splitted.second.set_value(2.0);
|
||||
CHECK(x.unwrap() == std::make_pair(1, 2.0));
|
||||
} // namespace td
|
||||
{
|
||||
td::optional<std::tuple<int, double, std::string>> x;
|
||||
auto triple = [&](Result<std::tuple<int, double, std::string>> res) { x = res.move_as_ok(); };
|
||||
static_assert(std::is_same<SplitPromise<decltype(triple)>::ArgT, std::tuple<int, double, std::string>>::value, "A");
|
||||
static_assert(std::is_same<SplitPromise<decltype(triple)>::SplittedT,
|
||||
std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value,
|
||||
"A");
|
||||
auto splitted = split_promise(triple);
|
||||
static_assert(
|
||||
std::is_same<decltype(splitted), std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value, "A");
|
||||
std::get<0>(splitted).set_value(1);
|
||||
std::get<1>(splitted).set_value(2.0);
|
||||
std::get<2>(splitted).set_value("hello");
|
||||
CHECK(x.unwrap() == std::make_tuple(1, 2.0, "hello"));
|
||||
}
|
||||
{
|
||||
int code = 0;
|
||||
auto pair = [&](Result<std::pair<int, double>> res) {
|
||||
res.ensure_error();
|
||||
code = res.error().code();
|
||||
};
|
||||
auto splitted = split_promise(td::Promise<std::pair<int, double>>(pair));
|
||||
splitted.second.set_error(td::Status::Error(123, "123"));
|
||||
CHECK(code == 0);
|
||||
splitted.first.set_value(1);
|
||||
CHECK(code == 123);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Actor, promise_future) {
|
||||
using td::make_promise_future;
|
||||
{
|
||||
auto pf = make_promise_future<int>();
|
||||
td::optional<int> res;
|
||||
pf.second.map([](int x) { return x * 2; }).map([](int x) { return x + 10; }).map([&](int x) {
|
||||
res = x;
|
||||
return td::Unit();
|
||||
});
|
||||
CHECK(!res);
|
||||
pf.first.set_value(6);
|
||||
ASSERT_EQ(22, res.unwrap());
|
||||
}
|
||||
{
|
||||
LOG(ERROR) << "Second test";
|
||||
td::optional<int> res;
|
||||
td::make_future(6)
|
||||
.map([](int x) { return x * 2; })
|
||||
.map([](int x) { return x + 10; })
|
||||
.fmap([&](int x) { return td::make_future(x * 2); })
|
||||
.finish([&](int x) { res = x; });
|
||||
ASSERT_EQ(44, res.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Actor2, actor_lost_promise) {
|
||||
using namespace td::actor;
|
||||
using namespace td;
|
||||
|
@ -459,7 +531,7 @@ class SampleActor : public Actor {
|
|||
detail::current_actor<Printer>().print_a();
|
||||
co_await OnActor(self);
|
||||
LOG(ERROR) << "exit print_a";
|
||||
co_return{};
|
||||
co_return {};
|
||||
}
|
||||
task<Unit> print_b() {
|
||||
auto self = actor_id(this);
|
||||
|
@ -468,7 +540,7 @@ class SampleActor : public Actor {
|
|||
detail::current_actor<Printer>().print_b();
|
||||
co_await OnActor(self);
|
||||
LOG(ERROR) << "exit print_b";
|
||||
co_return{};
|
||||
co_return {};
|
||||
}
|
||||
|
||||
immediate_task run_coroutine() {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue