diff --git a/adnl/adnl-message.cpp b/adnl/adnl-message.cpp index 0d712978..1d3ba5f8 100644 --- a/adnl/adnl-message.cpp +++ b/adnl/adnl-message.cpp @@ -46,6 +46,9 @@ AdnlMessage::AdnlMessage(tl_object_ptr message) { [&](ton_api::adnl_message_part &msg) { message_ = adnlmessage::AdnlMessagePart{msg.hash_, static_cast(msg.total_size_), static_cast(msg.offset_), std::move(msg.data_)}; + }, + [&](ton_api::adnl_message_queryError &msg) { + message_ = adnlmessage::AdnlMessageQueryError{msg.query_id_}; })); } diff --git a/adnl/adnl-message.h b/adnl/adnl-message.h index 43849e98..d557c319 100644 --- a/adnl/adnl-message.h +++ b/adnl/adnl-message.h @@ -170,6 +170,24 @@ class AdnlMessageAnswer { td::BufferSlice data_; }; +class AdnlMessageQueryError { + public: + explicit AdnlMessageQueryError(AdnlQueryId query_id) : query_id_(query_id) { + } + const auto &query_id() const { + return query_id_; + } + td::uint32 size() const { + return 36; + } + tl_object_ptr tl() const { + return create_tl_object(query_id_); + } + + private: + AdnlQueryId query_id_; +}; + class AdnlMessagePart { public: AdnlMessagePart(td::Bits256 hash, td::uint32 total_size, td::uint32 offset, td::BufferSlice data) @@ -220,7 +238,8 @@ class AdnlMessage { private: td::Variant + adnlmessage::AdnlMessageQuery, adnlmessage::AdnlMessageAnswer, adnlmessage::AdnlMessagePart, + adnlmessage::AdnlMessageQueryError> message_{Empty{}}; public: diff --git a/adnl/adnl-peer.cpp b/adnl/adnl-peer.cpp index 35ba2a11..1d73b8f6 100644 --- a/adnl/adnl-peer.cpp +++ b/adnl/adnl-peer.cpp @@ -524,10 +524,14 @@ void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageQuery &mess flags = static_cast(0)](td::Result R) { if (R.is_error()) { LOG(WARNING) << "failed to answer query: " << R.move_as_error(); + td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_message, + OutboundAdnlMessage{adnlmessage::AdnlMessageQueryError{query_id}, flags}); } else { auto data = R.move_as_ok(); if (data.size() > Adnl::huge_packet_max_size()) { LOG(WARNING) << "dropping too big answer query: size=" << data.size(); + td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_message, + OutboundAdnlMessage{adnlmessage::AdnlMessageQueryError{query_id}, flags}); } else { td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_message, OutboundAdnlMessage{adnlmessage::AdnlMessageAnswer{query_id, std::move(data)}, flags}); @@ -609,6 +613,18 @@ void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessagePart &messa } } +void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageQueryError &message) { + auto Q = out_queries_.find(message.query_id()); + + if (Q == out_queries_.end()) { + VLOG(ADNL_NOTICE) << this << ": dropping IN query error: unknown query id " << message.query_id(); + return; + } + + td::actor::send_closure_later(Q->second, &AdnlQuery::reject_query); + out_queries_.erase(Q); +} + void AdnlPeerPairImpl::delete_query(AdnlQueryId id) { auto Q = out_queries_.find(id); diff --git a/adnl/adnl-peer.hpp b/adnl/adnl-peer.hpp index 410c0f75..ec7166ed 100644 --- a/adnl/adnl-peer.hpp +++ b/adnl/adnl-peer.hpp @@ -104,6 +104,7 @@ class AdnlPeerPairImpl : public AdnlPeerPair { void process_message(const adnlmessage::AdnlMessageQuery &message); void process_message(const adnlmessage::AdnlMessageAnswer &message); void process_message(const adnlmessage::AdnlMessagePart &message); + void process_message(const adnlmessage::AdnlMessageQueryError &message); void process_message(const AdnlMessage::Empty &message) { UNREACHABLE(); } diff --git a/adnl/adnl-query.cpp b/adnl/adnl-query.cpp index 5bc767d2..40e9be80 100644 --- a/adnl/adnl-query.cpp +++ b/adnl/adnl-query.cpp @@ -25,13 +25,17 @@ namespace ton { namespace adnl { void AdnlQuery::alarm() { - promise_.set_error(td::Status::Error(ErrorCode::timeout, "adnl query timeout")); + promise_.set_error(td::Status::Error(ErrorCode::timeout, PSTRING() << "timeout for adnl query " << name_)); stop(); } void AdnlQuery::result(td::BufferSlice data) { promise_.set_value(std::move(data)); stop(); } +void AdnlQuery::reject_query() { + promise_.set_error(td::Status::Error(ErrorCode::timeout, PSTRING() << "rejected adnl query " << name_)); + stop(); +} AdnlQueryId AdnlQuery::random_query_id() { AdnlQueryId q_id; diff --git a/adnl/adnl-query.h b/adnl/adnl-query.h index 6e24a49f..2a78615a 100644 --- a/adnl/adnl-query.h +++ b/adnl/adnl-query.h @@ -48,6 +48,7 @@ class AdnlQuery : public td::actor::Actor { } void alarm() override; void result(td::BufferSlice data); + void reject_query(); void start_up() override { alarm_timestamp() = timeout_; } diff --git a/rldp/rldp-in.hpp b/rldp/rldp-in.hpp index b4981999..8850ccf9 100644 --- a/rldp/rldp-in.hpp +++ b/rldp/rldp-in.hpp @@ -78,6 +78,8 @@ class RldpIn : public RldpImpl { td::uint64 max_answer_size) override; void answer_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, adnl::AdnlQueryId query_id, TransferId transfer_id, td::BufferSlice data); + void reject_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, + adnl::AdnlQueryId query_id, TransferId transfer_id); void alarm_query(adnl::AdnlQueryId query_id, TransferId transfer_id); @@ -93,6 +95,8 @@ class RldpIn : public RldpImpl { ton_api::rldp_query &message); void process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, ton_api::rldp_answer &message); + void process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, + ton_api::rldp_queryError &message); void receive_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, td::BufferSlice data); diff --git a/rldp/rldp.cpp b/rldp/rldp.cpp index 9b38dcb8..a1d45926 100644 --- a/rldp/rldp.cpp +++ b/rldp/rldp.cpp @@ -87,6 +87,13 @@ void RldpIn::answer_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, transfer(src, dst, timeout, std::move(B), transfer_id); } +void RldpIn::reject_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, + adnl::AdnlQueryId query_id, TransferId transfer_id) { + auto B = serialize_tl_object(create_tl_object(query_id), true); + + transfer(src, dst, timeout, std::move(B), transfer_id); +} + void RldpIn::alarm_query(adnl::AdnlQueryId query_id, TransferId transfer_id) { queries_.erase(query_id); max_size_.erase(transfer_id); @@ -199,12 +206,16 @@ void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort auto data = R.move_as_ok(); if (data.size() > max_answer_size) { VLOG(RLDP_NOTICE) << "rldp query failed: answer too big"; + td::actor::send_closure(SelfId, &RldpIn::reject_query, local_id, source, timeout, query_id, + transfer_id ^ TransferId::ones()); } else { td::actor::send_closure(SelfId, &RldpIn::answer_query, local_id, source, timeout, query_id, transfer_id ^ TransferId::ones(), std::move(data)); } } else { VLOG(RLDP_NOTICE) << "rldp query failed: " << R.move_as_error(); + td::actor::send_closure(SelfId, &RldpIn::reject_query, local_id, source, timeout, query_id, + transfer_id ^ TransferId::ones()); } }); VLOG(RLDP_DEBUG) << "delivering rldp query"; @@ -223,6 +234,17 @@ void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort } } +void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, + ton_api::rldp_queryError &message) { + auto it = queries_.find(message.query_id_); + if (it != queries_.end()) { + td::actor::send_closure(it->second, &adnl::AdnlQuery::reject_query); + queries_.erase(it); + } else { + VLOG(RLDP_INFO) << "received reject to unknown query " << message.query_id_; + } +} + void RldpIn::transfer_completed(TransferId transfer_id) { senders_.erase(transfer_id); VLOG(RLDP_DEBUG) << "rldp: completed transfer " << transfer_id << "; " << senders_.size() << " out transfer pending "; diff --git a/rldp2/rldp-in.hpp b/rldp2/rldp-in.hpp index c2e46d2a..a815004c 100644 --- a/rldp2/rldp-in.hpp +++ b/rldp2/rldp-in.hpp @@ -76,6 +76,8 @@ class RldpIn : public RldpImpl { td::uint64 max_answer_size) override; void answer_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, adnl::AdnlQueryId query_id, TransferId transfer_id, td::BufferSlice data); + void reject_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, + adnl::AdnlQueryId query_id, TransferId transfer_id); void receive_message_part(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, td::BufferSlice data); @@ -85,6 +87,8 @@ class RldpIn : public RldpImpl { ton_api::rldp_query &message); void process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, ton_api::rldp_answer &message); + void process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, + ton_api::rldp_queryError &message); void receive_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, td::Result data); diff --git a/rldp2/rldp.cpp b/rldp2/rldp.cpp index 765e38a5..34abd78e 100644 --- a/rldp2/rldp.cpp +++ b/rldp2/rldp.cpp @@ -118,6 +118,13 @@ void RldpIn::answer_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, send_closure(create_connection(src, dst), &RldpConnectionActor::send, transfer_id, std::move(B), timeout); } +void RldpIn::reject_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout, + adnl::AdnlQueryId query_id, TransferId transfer_id) { + auto B = serialize_tl_object(create_tl_object(query_id), true); + + send_closure(create_connection(src, dst), &RldpConnectionActor::send, transfer_id, std::move(B), timeout); +} + void RldpIn::receive_message_part(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, td::BufferSlice data) { send_closure(create_connection(local_id, source), &RldpConnectionActor::receive_raw, std::move(data)); } @@ -174,12 +181,16 @@ void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort auto data = R.move_as_ok(); if (data.size() > max_answer_size) { VLOG(RLDP_NOTICE) << "rldp query failed: answer too big"; + td::actor::send_closure(SelfId, &RldpIn::reject_query, local_id, source, timeout, query_id, + transfer_id ^ TransferId::ones()); } else { td::actor::send_closure(SelfId, &RldpIn::answer_query, local_id, source, timeout, query_id, transfer_id ^ TransferId::ones(), std::move(data)); } } else { VLOG(RLDP_NOTICE) << "rldp query failed: " << R.move_as_error(); + td::actor::send_closure(SelfId, &RldpIn::reject_query, local_id, source, timeout, query_id, + transfer_id ^ TransferId::ones()); } }); VLOG(RLDP_DEBUG) << "delivering rldp query"; @@ -198,6 +209,17 @@ void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort } } +void RldpIn::process_message(adnl::AdnlNodeIdShort source, adnl::AdnlNodeIdShort local_id, TransferId transfer_id, + ton_api::rldp_queryError &message) { + auto it = queries_.find(transfer_id); + if (it != queries_.end()) { + it->second.set_error(td::Status::Error("rejected")); + queries_.erase(it); + } else { + VLOG(RLDP_INFO) << "received reject to unknown query " << message.query_id_; + } +} + void RldpIn::on_sent(TransferId transfer_id, td::Result state) { //TODO: completed transfer } diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 3a81ab58..9f1072ce 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -136,6 +136,7 @@ adnl.message.reinit date:int = adnl.Message; adnl.message.query query_id:int256 query:bytes = adnl.Message; adnl.message.answer query_id:int256 answer:bytes = adnl.Message; +adnl.message.queryError query_id:int256 = adnl.Message; adnl.message.part hash:int256 total_size:int offset:int data:bytes = adnl.Message; @@ -161,6 +162,7 @@ rldp.complete transfer_id:int256 part:int = rldp.MessagePart; rldp.message id:int256 data:bytes = rldp.Message; rldp.query query_id:int256 max_answer_size:long timeout:int data:bytes = rldp.Message; rldp.answer query_id:int256 data:bytes = rldp.Message; +rldp.queryError query_id:int256 = rldp.Message; ---functions--- diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 479455fe..95a2144a 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ