From 9bbf38717a6a7678514c3a18cc9fcefeabe1963c Mon Sep 17 00:00:00 2001 From: Topilski Date: Thu, 26 Dec 2024 09:06:54 +0300 Subject: [PATCH] New protocol --- CMakeLists.txt | 2 +- src/server/CMakeLists.txt | 2 - src/server/daemon/client.cpp | 417 ++++---- src/server/daemon/client.h | 93 +- src/server/daemon/commands.cpp | 52 +- src/server/daemon/commands.h | 29 +- src/server/daemon/commands_factory.cpp | 311 ------ src/server/daemon/commands_factory.h | 96 -- .../commands_info/stream/get_log_info.cpp | 20 +- .../commands_info/stream/get_log_info.h | 9 +- .../commands_info/stream/restart_info.cpp | 8 +- .../commands_info/stream/restart_info.h | 8 +- .../daemon/commands_info/stream/stream_info.h | 5 +- src/server/daemon/server.h | 6 +- src/server/daemon_slave.cpp | 4 +- src/server/http/client.h | 8 +- src/server/http/handler.cpp | 7 +- src/server/pipe/client.h | 9 +- src/server/process_slave_wrapper.cpp | 955 ++++++++---------- src/server/process_slave_wrapper.h | 56 +- src/server/process_slave_wrapper_posix.cpp | 8 +- src/server/tcp/client.cpp | 2 + src/server/tcp/client.h | 11 +- src/server/utils/utils.h | 5 +- src/stream/dumpers/htmldump.cpp | 15 +- src/stream/dumpers/htmldump.h | 2 +- src/stream/dumpers/idumper.h | 2 +- src/stream/elements/depay/audio.h | 5 +- src/stream/stream_controller.cpp | 4 +- src/stream_commands/commands.h | 4 +- src/stream_commands/commands_factory.cpp | 4 +- src/utils/m3u8_reader.cpp | 4 +- src/utils/m3u8_reader.h | 4 +- 33 files changed, 846 insertions(+), 1321 deletions(-) delete mode 100644 src/server/daemon/commands_factory.cpp delete mode 100644 src/server/daemon/commands_factory.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 9022540..39dc239 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.16 FATAL_ERROR) SET(BRANDING_PROJECT_NAME "fastocloud" CACHE STRING "Branding for ${BRANDING_PROJECT_NAME}") #default -SET(BRANDING_PROJECT_VERSION "1.4.4.17" CACHE STRING "Branding version for ${BRANDING_PROJECT_NAME}") #default +SET(BRANDING_PROJECT_VERSION "1.5.0.18" CACHE STRING "Branding version for ${BRANDING_PROJECT_NAME}") #default SET(BRANDING_PROJECT_BUILD_TYPE_VERSION "release" CACHE STRING "Build version type for ${BRANDING_PROJECT_NAME}") #default alfa,beta,rc,release SET(BRANDING_PROJECT_DOMAIN "www.fastogt.com" CACHE STRING "Branding domain url for ${BRANDING_PROJECT_NAME}") #default SET(BRANDING_PROJECT_COMPANYNAME "FastoGT" CACHE STRING "Company name for ${BRANDING_PROJECT_NAME}") #default diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 287b059..428facf 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -109,7 +109,6 @@ SET(SERVER_DAEMON_HEADERS ${CMAKE_SOURCE_DIR}/src/server/daemon/client.h ${CMAKE_SOURCE_DIR}/src/server/daemon/server.h ${CMAKE_SOURCE_DIR}/src/server/daemon/commands.h - ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_factory.h ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/details/shots.h ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/server_info.h @@ -126,7 +125,6 @@ SET(SERVER_DAEMON_SOURCES ${CMAKE_SOURCE_DIR}/src/server/daemon/client.cpp ${CMAKE_SOURCE_DIR}/src/server/daemon/server.cpp ${CMAKE_SOURCE_DIR}/src/server/daemon/commands.cpp - ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_factory.cpp ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/details/shots.cpp ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/server_info.cpp diff --git a/src/server/daemon/client.cpp b/src/server/daemon/client.cpp index e3bb9f2..9dbe1a8 100644 --- a/src/server/daemon/client.cpp +++ b/src/server/daemon/client.cpp @@ -12,259 +12,326 @@ along with fastocloud. If not, see . */ -#include - #include "server/daemon/client.h" -#include "server/daemon/commands_factory.h" +#include +#include +#include + +#include + +#include +#include + +namespace { +const std::string kErrorInvalid = "{\"error\":{\"code\":-1, \"message\":\"NOTREACHED\"}}"; +const auto kDataOK = common::json::MakeSuccessDataJson(); +} // namespace namespace fastocloud { namespace server { ProtocoledDaemonClient::ProtocoledDaemonClient(common::libev::IoLoop* server, const common::net::socket_info& info) - : base_class(std::make_shared(), server, info) {} + : base_class(server, info), is_verified_(false), exp_time_(0) {} -common::ErrnoError ProtocoledDaemonClient::StopMe() { - const common::daemon::commands::StopInfo stop_req; - fastotv::protocol::request_t req; - common::Error err_ser = StopServiceRequest(NextRequestID(), stop_req, &req); +bool ProtocoledDaemonClient::IsVerified() const { + return is_verified_; +} + +common::time64_t ProtocoledDaemonClient::GetExpTime() const { + return exp_time_; +} + +void ProtocoledDaemonClient::SetVerified(bool verified, common::time64_t exp_time) { + is_verified_ = verified; + exp_time_ = exp_time; +} + +bool ProtocoledDaemonClient::IsExpired() const { + return exp_time_ < common::time::current_utc_mstime(); +} + +bool ProtocoledDaemonClient::HaveFullAccess() const { + return IsVerified() && !IsExpired(); +} + +common::http::http_protocol ProtocoledDaemonClient::GetProtocol() const { + return common::http::HP_1_0; +} + +common::libev::http::HttpServerInfo ProtocoledDaemonClient::GetServerInfo() const { + return common::libev::http::HttpServerInfo(); +} + +common::ErrnoError ProtocoledDaemonClient::StopMe(const common::uri::GURL& url) { + const common::daemon::commands::StopInfo params; + std::string result; + common::Error err_ser = params.SerializeToString(&result); if (err_ser) { return common::make_errno_error(err_ser->GetDescription(), EAGAIN); } - return WriteRequest(req); + return PostJson(url, result.c_str(), result.size(), false); } -common::ErrnoError ProtocoledDaemonClient::RestartMe() { - const common::daemon::commands::RestartInfo stop_req; - fastotv::protocol::request_t req; - common::Error err_ser = RestartServiceRequest(NextRequestID(), stop_req, &req); +common::ErrnoError ProtocoledDaemonClient::Broadcast(const common::json::WsDataJson& request) { + std::string result; + ignore_result(request.SerializeToString(&result)); + return SendFrame(result.c_str(), result.size()); +} + +common::ErrnoError ProtocoledDaemonClient::RestartMe(const common::uri::GURL& url) { + const common::daemon::commands::RestartInfo params; + std::string result; + common::Error err_ser = params.SerializeToString(&result); if (err_ser) { return common::make_errno_error(err_ser->GetDescription(), EAGAIN); } - - return WriteRequest(req); + return PostJson(url, result.c_str(), result.size(), false); } -common::ErrnoError ProtocoledDaemonClient::StopFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = StopServiceResponseFail(id, error_str, &resp); +common::ErrnoError ProtocoledDaemonClient::StopFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); +} + +common::ErrnoError ProtocoledDaemonClient::StopSuccess() { + return SendDataJson(common::http::HS_OK, kDataOK); +} + +common::ErrnoError ProtocoledDaemonClient::RestartFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); +} + +common::ErrnoError ProtocoledDaemonClient::RestartSuccess() { + return SendDataJson(common::http::HS_OK, kDataOK); +} + +common::ErrnoError ProtocoledDaemonClient::GetHardwareHashFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); +} + +common::ErrnoError ProtocoledDaemonClient::GetHardwareHashSuccess( + const common::daemon::commands::HardwareHashInfo& params) { + json_object* jrequest_init = nullptr; + common::Error err_ser = params.Serialize(&jrequest_init); if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser); } - return WriteResponse(resp); -} - -common::ErrnoError ProtocoledDaemonClient::StopSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = StopServiceResponseSuccess(id, &resp); + std::string result; + common::json::DataJson js = common::json::MakeSuccessDataJson(jrequest_init); // take ownerships + err_ser = js.SerializeToString(&result); if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser); } - return WriteResponse(resp); + return SendDataJson(common::http::HS_OK, js); } -common::ErrnoError ProtocoledDaemonClient::Ping() { - common::daemon::commands::ClientPingInfo server_ping_info; - return Ping(server_ping_info); +common::ErrnoError ProtocoledDaemonClient::GetStatsFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::Ping(const common::daemon::commands::ClientPingInfo& server_ping_info) { - fastotv::protocol::request_t ping_request; - common::Error err_ser = PingRequest(NextRequestID(), server_ping_info, &ping_request); +common::ErrnoError ProtocoledDaemonClient::GetStatsSuccess(const service::FullServiceInfo& stats) { + json_object* jrequest_init = nullptr; + common::Error err_ser = stats.Serialize(&jrequest_init); if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser); } - return WriteRequest(ping_request); -} - -common::ErrnoError ProtocoledDaemonClient::PongFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = PingServiceResponseFail(id, error_str, &resp); + std::string result; + common::json::DataJson js = common::json::MakeSuccessDataJson(jrequest_init); // take ownerships + err_ser = js.SerializeToString(&result); if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + return GetStatsFail(common::http::HS_INTERNAL_ERROR, err_ser); } - return WriteResponse(resp); + return SendDataJson(common::http::HS_OK, js); } -common::ErrnoError ProtocoledDaemonClient::Pong(fastotv::protocol::sequance_id_t id) { - common::daemon::commands::ServerPingInfo server_ping_info; - return Pong(id, server_ping_info); +common::ErrnoError ProtocoledDaemonClient::GetLogServiceFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::Pong(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ServerPingInfo& pong) { - fastotv::protocol::response_t resp; - common::Error err_ser = PingServiceResponseSuccess(id, pong, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - return WriteResponse(resp); -} - -common::ErrnoError ProtocoledDaemonClient::ActivateFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = ActivateResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); +common::ErrnoError ProtocoledDaemonClient::GetLogServiceSuccess(const std::string& path) { + const char* file_path_str_ptr = path.c_str(); + struct stat sb; + if (stat(file_path_str_ptr, &sb) < 0) { + return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("File not found.")); } - return WriteResponse(resp); -} - -common::ErrnoError ProtocoledDaemonClient::ActivateSuccess(fastotv::protocol::sequance_id_t id, - const std::string& result) { - fastotv::protocol::response_t resp; - common::Error err_ser = ActivateResponseSuccess(id, result, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + if (S_ISDIR(sb.st_mode)) { + return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename.")); } - return WriteResponse(resp); -} - -common::ErrnoError ProtocoledDaemonClient::GetLogServiceFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = GetLogServiceResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + int file = open(file_path_str_ptr, O_RDONLY); + if (file == INVALID_DESCRIPTOR) { /* open the file for reading */ + return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("File is protected.")); } - return WriteResponse(resp); -} - -common::ErrnoError ProtocoledDaemonClient::GetLogServiceSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = GetLogServiceResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + size_t cast = sb.st_size; + common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime); + if (err) { + ::close(file); + return GetLogServiceFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription())); } - return WriteResponse(resp); + err = SendFileByFd(file, sb.st_size); + ::close(file); + return err; } -common::ErrnoError ProtocoledDaemonClient::GetLogStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = GetLogStreamResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::GetLogStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::GetLogStreamSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = GetLogStreamResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); +common::ErrnoError ProtocoledDaemonClient::GetLogStreamSuccess(const std::string& path) { + const char* file_path_str_ptr = path.c_str(); + struct stat sb; + if (stat(file_path_str_ptr, &sb) < 0) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found.")); } - return WriteResponse(resp); + if (S_ISDIR(sb.st_mode)) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename.")); + } + + int file = open(file_path_str_ptr, O_RDONLY); + if (file == INVALID_DESCRIPTOR) { /* open the file for reading */ + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected.")); + } + + size_t cast = sb.st_size; + common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime); + if (err) { + ::close(file); + return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription())); + } + + err = SendFileByFd(file, sb.st_size); + ::close(file); + return err; } -common::ErrnoError ProtocoledDaemonClient::GetPipeStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = GetPipeStreamResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::GetPipeStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::GetPipeStreamSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = GetPipeStreamResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); +common::ErrnoError ProtocoledDaemonClient::GetPipeStreamSuccess(const std::string& path) { + const char* file_path_str_ptr = path.c_str(); + struct stat sb; + if (stat(file_path_str_ptr, &sb) < 0) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found.")); } - return WriteResponse(resp); + if (S_ISDIR(sb.st_mode)) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename.")); + } + + int file = open(file_path_str_ptr, O_RDONLY); + if (file == INVALID_DESCRIPTOR) { /* open the file for reading */ + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected.")); + } + + size_t cast = sb.st_size; + common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime); + if (err) { + ::close(file); + return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription())); + } + + err = SendFileByFd(file, sb.st_size); + ::close(file); + return err; } -common::ErrnoError ProtocoledDaemonClient::StartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = StartStreamResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::GetConfigStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::StartStreamSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = StartStreamResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); +common::ErrnoError ProtocoledDaemonClient::GetConfigStreamSuccess(const std::string& path) { + const char* file_path_str_ptr = path.c_str(); + struct stat sb; + if (stat(file_path_str_ptr, &sb) < 0) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found.")); } - return WriteResponse(resp); + if (S_ISDIR(sb.st_mode)) { + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename.")); + } + + int file = open(file_path_str_ptr, O_RDONLY); + if (file == INVALID_DESCRIPTOR) { /* open the file for reading */ + return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected.")); + } + + size_t cast = sb.st_size; + common::ErrnoError err = + SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime); // "application/json" + if (err) { + ::close(file); + return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription())); + } + + err = SendFileByFd(file, sb.st_size); + ::close(file); + return err; } -common::ErrnoError ProtocoledDaemonClient::ReStartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = RestartStreamResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::StartStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::ReStartStreamSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = RestartStreamResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::StartStreamSuccess() { + return SendDataJson(common::http::HS_OK, kDataOK); } -common::ErrnoError ProtocoledDaemonClient::StopStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) { - const std::string error_str = err->GetDescription(); - fastotv::protocol::response_t resp; - common::Error err_ser = StopStreamResponseFail(id, error_str, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::ReStartStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); } -common::ErrnoError ProtocoledDaemonClient::StopStreamSuccess(fastotv::protocol::sequance_id_t id) { - fastotv::protocol::response_t resp; - common::Error err_ser = StopStreamResponseSuccess(id, &resp); - if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); - } - - return WriteResponse(resp); +common::ErrnoError ProtocoledDaemonClient::ReStartStreamSuccess() { + return SendDataJson(common::http::HS_OK, kDataOK); } -common::ErrnoError ProtocoledDaemonClient::UnknownMethodError(fastotv::protocol::sequance_id_t id, - const std::string& method) { - fastotv::protocol::response_t resp; - common::Error err_ser = UnknownMethodResponse(id, method, &resp); +common::ErrnoError ProtocoledDaemonClient::StopStreamFail(common::http::http_status code, common::Error err) { + return SendErrorJson(code, err); +} + +common::ErrnoError ProtocoledDaemonClient::StopStreamSuccess() { + return SendDataJson(common::http::HS_OK, kDataOK); +} + +common::ErrnoError ProtocoledDaemonClient::UnknownMethodError(common::http::http_method method, + const std::string& route) { + common::Error err = + common::make_error(common::MemSPrintf("not handled %s for route: %s", common::ConvertToString(method), route)); + return SendErrorJson(common::http::HS_NOT_FOUND, err); +} + +common::ErrnoError ProtocoledDaemonClient::SendHeadersInternal(common::http::http_status code, + const char* mime_type, + size_t* length, + time_t* mod) { + return SendHeaders(GetProtocol(), code, {}, mime_type, length, mod, false, GetServerInfo()); +} + +common::ErrnoError ProtocoledDaemonClient::SendErrorJson(common::http::http_status code, common::Error err) { + auto errj = common::json::ErrorJson(common::json::MakeErrorJsonMessage(code, err)); + std::string result = kErrorInvalid; + ignore_result(errj.SerializeToString(&result)); + return SendJson(GetProtocol(), code, {}, result.c_str(), result.size(), false, GetServerInfo()); +} + +common::ErrnoError ProtocoledDaemonClient::SendDataJson(common::http::http_status code, + const common::json::DataJson& data) { + std::string result; + common::Error err_ser = data.SerializeToString(&result); if (err_ser) { - return common::make_errno_error(err_ser->GetDescription(), EAGAIN); + return SendErrorJson(common::http::HS_INTERNAL_ERROR, err_ser); } - return WriteResponse(resp); + return SendJson(GetProtocol(), code, {}, result.c_str(), result.size(), false, GetServerInfo()); } } // namespace server diff --git a/src/server/daemon/client.h b/src/server/daemon/client.h index 28c413f..fd40ac9 100644 --- a/src/server/daemon/client.h +++ b/src/server/daemon/client.h @@ -14,61 +14,86 @@ #pragma once -#include - -#include -#include -#include +#include +#include +#include #include - #include #include +#include + +#include "server/daemon/commands_info/service/server_info.h" + namespace fastocloud { namespace server { -class ProtocoledDaemonClient : public fastotv::protocol::ProtocolClient { +class ProtocoledDaemonClient : public common::libev::websocket::WebSocketServerClient { public: - typedef fastotv::protocol::ProtocolClient base_class; + typedef common::libev::websocket::WebSocketServerClient base_class; ProtocoledDaemonClient(common::libev::IoLoop* server, const common::net::socket_info& info); - common::ErrnoError StopMe() WARN_UNUSED_RESULT; - common::ErrnoError RestartMe() WARN_UNUSED_RESULT; + bool IsVerified() const; + void SetVerified(bool verified, common::time64_t exp_time); - common::ErrnoError StopFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError StopSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::time64_t GetExpTime() const; + bool IsExpired() const; + bool HaveFullAccess() const; - common::ErrnoError Ping() WARN_UNUSED_RESULT; - common::ErrnoError Ping(const common::daemon::commands::ClientPingInfo& server_ping_info) WARN_UNUSED_RESULT; + common::http::http_protocol GetProtocol() const; + common::libev::http::HttpServerInfo GetServerInfo() const; - common::ErrnoError PongFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError Pong(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; - common::ErrnoError Pong(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ServerPingInfo& pong) WARN_UNUSED_RESULT; + common::ErrnoError StopMe(const common::uri::GURL& url) WARN_UNUSED_RESULT; + common::ErrnoError RestartMe(const common::uri::GURL& url) WARN_UNUSED_RESULT; - common::ErrnoError ActivateFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError ActivateSuccess(fastotv::protocol::sequance_id_t id, const std::string& result) WARN_UNUSED_RESULT; + common::ErrnoError StopFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError StopSuccess() WARN_UNUSED_RESULT; - common::ErrnoError GetLogServiceFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError GetLogServiceSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError RestartFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError RestartSuccess() WARN_UNUSED_RESULT; - common::ErrnoError GetLogStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError GetLogStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError GetHardwareHashFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetHardwareHashSuccess(const common::daemon::commands::HardwareHashInfo& params) + WARN_UNUSED_RESULT; - common::ErrnoError GetPipeStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError GetPipeStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError GetStatsFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetStatsSuccess(const service::FullServiceInfo& stats) WARN_UNUSED_RESULT; - common::ErrnoError StartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError StartStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError GetLogServiceFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetLogServiceSuccess(const std::string& path) WARN_UNUSED_RESULT; - common::ErrnoError ReStartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError ReStartStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError GetLogStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetLogStreamSuccess(const std::string& path) WARN_UNUSED_RESULT; - common::ErrnoError StopStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT; - common::ErrnoError StopStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT; + common::ErrnoError GetPipeStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetPipeStreamSuccess(const std::string& path) WARN_UNUSED_RESULT; - common::ErrnoError UnknownMethodError(fastotv::protocol::sequance_id_t id, - const std::string& method) WARN_UNUSED_RESULT; + common::ErrnoError StartStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError StartStreamSuccess() WARN_UNUSED_RESULT; + + common::ErrnoError ReStartStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError ReStartStreamSuccess() WARN_UNUSED_RESULT; + + common::ErrnoError StopStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError StopStreamSuccess() WARN_UNUSED_RESULT; + + common::ErrnoError GetConfigStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT; + common::ErrnoError GetConfigStreamSuccess(const std::string& path) WARN_UNUSED_RESULT; + + common::ErrnoError UnknownMethodError(common::http::http_method method, const std::string& route) WARN_UNUSED_RESULT; + + common::ErrnoError Broadcast(const common::json::WsDataJson& request) WARN_UNUSED_RESULT; + + private: + common::ErrnoError SendDataJson(common::http::http_status code, const common::json::DataJson& data); + common::ErrnoError SendErrorJson(common::http::http_status code, common::Error err); + common::ErrnoError SendHeadersInternal(common::http::http_status code, + const char* mime_type, + size_t* length, + time_t* mod); + + bool is_verified_; + common::time64_t exp_time_; }; } // namespace server diff --git a/src/server/daemon/commands.cpp b/src/server/daemon/commands.cpp index 32f489d..5bb2627 100644 --- a/src/server/daemon/commands.cpp +++ b/src/server/daemon/commands.cpp @@ -14,36 +14,39 @@ #include "server/daemon/commands.h" +// broadcast +#define SERVICE_STATISTIC_SERVICE "statistic_service" + namespace fastocloud { namespace server { -common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, fastotv::protocol::request_t* req) { +common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, common::json::WsDataJson* req) { if (!req) { return common::make_error_inval(); } - std::string changed_json; - common::Error err_ser = params.SerializeToString(&changed_json); + json_object* result_json = nullptr; + common::Error err_ser = params.Serialize(&result_json); if (err_ser) { return err_ser; } - *req = fastotv::protocol::request_t::MakeNotification(STREAM_CHANGED_SOURCES_STREAM, changed_json); + *req = common::json::WsDataJson(BROADCAST_CHANGED_SOURCES_STREAM, result_json); return common::Error(); } -common::Error StatisitcStreamBroadcast(const StatisticInfo& params, fastotv::protocol::request_t* req) { +common::Error StatisitcStreamBroadcast(const StatisticInfo& params, common::json::WsDataJson* req) { if (!req) { return common::make_error_inval(); } - std::string stat_json; - common::Error err_ser = params.SerializeToString(&stat_json); + json_object* result_json = nullptr; + common::Error err_ser = params.Serialize(&result_json); if (err_ser) { return err_ser; } - *req = fastotv::protocol::request_t::MakeNotification(STREAM_STATISTIC_STREAM, stat_json); + *req = common::json::WsDataJson(BROADCAST_STATISTIC_STREAM, result_json); return common::Error(); } @@ -65,28 +68,33 @@ common::Error MlNotificationStreamBroadcast(const fastotv::commands_info::ml::No } #endif -common::Error StatisitcServiceBroadcast(fastotv::protocol::serializet_params_t params, - fastotv::protocol::request_t* req) { +common::Error StatisitcServiceBroadcast(const service::ServerInfo& params, common::json::WsDataJson* req) { if (!req) { return common::make_error_inval(); } - *req = fastotv::protocol::request_t::MakeNotification(STREAM_STATISTIC_SERVICE, params); - return common::Error(); -} - -common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, fastotv::protocol::request_t* req) { - if (!req) { - return common::make_error_inval(); - } - - std::string quit_json; - common::Error err_ser = params.SerializeToString(&quit_json); + json_object* stat_json = nullptr; + common::Error err_ser = params.Serialize(&stat_json); if (err_ser) { return err_ser; } - *req = fastotv::protocol::request_t::MakeNotification(STREAM_QUIT_STATUS_STREAM, quit_json); + *req = common::json::WsDataJson(SERVICE_STATISTIC_SERVICE, stat_json); + return common::Error(); +} + +common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, common::json::WsDataJson* req) { + if (!req) { + return common::make_error_inval(); + } + + json_object* quit_json = nullptr; + common::Error err_ser = params.Serialize(&quit_json); + if (err_ser) { + return err_ser; + } + + *req = common::json::WsDataJson(STREAM_QUIT_STATUS_STREAM, quit_json); return common::Error(); } diff --git a/src/server/daemon/commands.h b/src/server/daemon/commands.h index e53d9c4..2873127 100644 --- a/src/server/daemon/commands.h +++ b/src/server/daemon/commands.h @@ -14,15 +14,19 @@ #pragma once +#include + #include -#include +#include #if defined(MACHINE_LEARNING) #include #endif +#include "server/daemon/commands_info/service/server_info.h" #include "server/daemon/commands_info/stream/quit_status_info.h" + #include "stream_commands/commands_info/changed_sources_info.h" #include "stream_commands/commands_info/statistic_info.h" @@ -34,20 +38,17 @@ #define DAEMON_RESTART_STREAM "restart_stream" #define DAEMON_GET_LOG_STREAM "get_log_stream" #define DAEMON_GET_PIPELINE_STREAM "get_pipeline_stream" +#define DAEMON_GET_CONFIG_JSON_STREAM "get_config_json_stream" -#define DAEMON_ACTIVATE "activate_request" // {"key": "XXXXXXXXXXXXXXXXXX"} #define DAEMON_STOP_SERVICE "stop_service" // {"delay": 0 } #define DAEMON_RESTART_SERVICE "restart_service" // {"delay": 0 } +#define DAEMON_STATS_SERVICE "stats" -#define DAEMON_GET_CONFIG_JSON_STREAM "get_config_json_stream" -#define DAEMON_PING_SERVICE "ping_service" #define DAEMON_GET_LOG_SERVICE "get_log_service" // {"path":"http://localhost/service/id"} -#define DAEMON_SERVER_PING "ping_client" - // Broadcast -#define STREAM_CHANGED_SOURCES_STREAM "changed_source_stream" -#define STREAM_STATISTIC_STREAM "statistic_stream" +#define BROADCAST_CHANGED_SOURCES_STREAM "changed_source_stream" +#define BROADCAST_STATISTIC_STREAM "statistic_stream" #define STREAM_QUIT_STATUS_STREAM "quit_status_stream" #if defined(MACHINE_LEARNING) #define STREAM_ML_NOTIFICATION_STREAM "ml_notification_stream" @@ -58,15 +59,17 @@ namespace fastocloud { namespace server { // Broadcast -common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, fastotv::protocol::request_t* req); -common::Error StatisitcStreamBroadcast(const StatisticInfo& params, fastotv::protocol::request_t* req); +common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, + common::json::WsDataJson* req) WARN_UNUSED_RESULT; +common::Error StatisitcStreamBroadcast(const StatisticInfo& params, common::json::WsDataJson* req) WARN_UNUSED_RESULT; #if defined(MACHINE_LEARNING) common::Error MlNotificationStreamBroadcast(const fastotv::commands_info::ml::NotificationInfo& params, fastotv::protocol::request_t* req); #endif -common::Error StatisitcServiceBroadcast(fastotv::protocol::serializet_params_t params, - fastotv::protocol::request_t* req); -common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, fastotv::protocol::request_t* req); +common::Error StatisitcServiceBroadcast(const service::ServerInfo& params, + common::json::WsDataJson* req) WARN_UNUSED_RESULT; + +common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, common::json::WsDataJson* req) WARN_UNUSED_RESULT; } // namespace server } // namespace fastocloud diff --git a/src/server/daemon/commands_factory.cpp b/src/server/daemon/commands_factory.cpp deleted file mode 100644 index 02b1ea5..0000000 --- a/src/server/daemon/commands_factory.cpp +++ /dev/null @@ -1,311 +0,0 @@ -/* Copyright (C) 2014-2022 FastoGT. All right reserved. - This file is part of fastocloud. - fastocloud is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - fastocloud is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with fastocloud. If not, see . -*/ - -#include "server/daemon/commands_factory.h" - -#include - -#include "server/daemon/commands.h" - -namespace fastocloud { -namespace server { - -common::Error StopServiceRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::StopInfo& params, - fastotv::protocol::request_t* req) { - if (!req) { - return common::make_error_inval(); - } - - std::string req_str; - common::Error err_ser = params.SerializeToString(&req_str); - if (err_ser) { - return err_ser; - } - - fastotv::protocol::request_t lreq; - lreq.id = id; - lreq.method = DAEMON_STOP_SERVICE; - lreq.params = req_str; - *req = lreq; - return common::Error(); -} - -common::Error RestartServiceRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::RestartInfo& params, - fastotv::protocol::request_t* req) { - if (!req) { - return common::make_error_inval(); - } - - std::string req_str; - common::Error err_ser = params.SerializeToString(&req_str); - if (err_ser) { - return err_ser; - } - - fastotv::protocol::request_t lreq; - lreq.id = id; - lreq.method = DAEMON_RESTART_SERVICE; - lreq.params = req_str; - *req = lreq; - return common::Error(); -} - -common::Error PingRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ClientPingInfo& params, - fastotv::protocol::request_t* req) { - if (!req) { - return common::make_error_inval(); - } - - std::string req_str; - common::Error err_ser = params.SerializeToString(&req_str); - if (err_ser) { - return err_ser; - } - - fastotv::protocol::request_t lreq; - lreq.id = id; - lreq.method = DAEMON_SERVER_PING; - lreq.params = req_str; - *req = lreq; - return common::Error(); -} - -common::Error StopServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error StopServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error GetLogServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error GetLogServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error ActivateResponseSuccess(fastotv::protocol::sequance_id_t id, - const std::string& result, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeMessage( - id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage(result)); - return common::Error(); -} - -common::Error ActivateResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error StartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error StartStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error StopStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error StopStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error RestartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error RestartStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error GetLogStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error GetLogStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - - return common::Error(); -} - -common::Error GetPipeStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage()); - return common::Error(); -} - -common::Error GetPipeStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - - return common::Error(); -} - -common::Error PingServiceResponseSuccess(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ServerPingInfo& ping, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - std::string ping_server_json; - common::Error err_ser = ping.SerializeToString(&ping_server_json); - if (err_ser) { - return err_ser; - } - - *resp = fastotv::protocol::response_t::MakeMessage( - id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage(ping_server_json)); - return common::Error(); -} - -common::Error PingServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = fastotv::protocol::response_t::MakeError( - id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text)); - return common::Error(); -} - -common::Error UnknownMethodResponse(fastotv::protocol::sequance_id_t id, - const std::string& method, - fastotv::protocol::response_t* resp) { - if (!resp) { - return common::make_error_inval(); - } - - *resp = - fastotv::protocol::response_t::MakeError(id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText( - common::MemSPrintf("Unknown method: %s", method))); - return common::Error(); -} - -} // namespace server -} // namespace fastocloud diff --git a/src/server/daemon/commands_factory.h b/src/server/daemon/commands_factory.h deleted file mode 100644 index dc3571b..0000000 --- a/src/server/daemon/commands_factory.h +++ /dev/null @@ -1,96 +0,0 @@ -/* Copyright (C) 2014-2022 FastoGT. All right reserved. - This file is part of fastocloud. - fastocloud is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - fastocloud is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with fastocloud. If not, see . -*/ - -#pragma once - -#include - -#include -#include -#include -#include - -#include - -namespace fastocloud { -namespace server { - -// requests -common::Error StopServiceRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::StopInfo& params, - fastotv::protocol::request_t* req); -common::Error RestartServiceRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::RestartInfo& params, - fastotv::protocol::request_t* req); -common::Error PingRequest(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ClientPingInfo& params, - fastotv::protocol::request_t* req); - -// responses service -common::Error StopServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error StopServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error GetLogServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error GetLogServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error ActivateResponseSuccess(fastotv::protocol::sequance_id_t id, - const std::string& result, - fastotv::protocol::response_t* resp); -common::Error ActivateResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error PingServiceResponseSuccess(fastotv::protocol::sequance_id_t id, - const common::daemon::commands::ServerPingInfo& ping, - fastotv::protocol::response_t* resp); -common::Error PingServiceResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error UnknownMethodResponse(fastotv::protocol::sequance_id_t id, - const std::string& method, - fastotv::protocol::response_t* resp) WARN_UNUSED_RESULT; - -// responces streams -common::Error StartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error StartStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error StopStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error StopStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error RestartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error RestartStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error GetLogStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error GetLogStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -common::Error GetPipeStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp); -common::Error GetPipeStreamResponseFail(fastotv::protocol::sequance_id_t id, - const std::string& error_text, - fastotv::protocol::response_t* resp); - -} // namespace server -} // namespace fastocloud diff --git a/src/server/daemon/commands_info/stream/get_log_info.cpp b/src/server/daemon/commands_info/stream/get_log_info.cpp index 17f68e6..a3dd64d 100644 --- a/src/server/daemon/commands_info/stream/get_log_info.cpp +++ b/src/server/daemon/commands_info/stream/get_log_info.cpp @@ -16,21 +16,16 @@ #include -#define PATH_FIELD "path" #define FEEDBACK_DIR_FIELD "feedback_directory" namespace fastocloud { namespace server { namespace stream { -GetLogInfo::GetLogInfo() : base_class(), feedback_dir_(), path_() {} +GetLogInfo::GetLogInfo() : base_class(), feedback_dir_() {} -GetLogInfo::GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir, const url_t& path) - : base_class(stream_id), feedback_dir_(feedback_dir), path_(path) {} - -GetLogInfo::url_t GetLogInfo::GetLogPath() const { - return path_; -} +GetLogInfo::GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir) + : base_class(stream_id), feedback_dir_(feedback_dir) {} std::string GetLogInfo::GetFeedbackDir() const { return feedback_dir_; @@ -50,20 +45,11 @@ common::Error GetLogInfo::DoDeSerialize(json_object* serialized) { } inf.feedback_dir_ = feedback_dir; - std::string path; - err = GetStringField(serialized, PATH_FIELD, &path); - if (err) { - return err; - } - inf.path_ = url_t(path); - *this = inf; return common::Error(); } common::Error GetLogInfo::SerializeFields(json_object* out) const { - const std::string path_str = path_.spec(); - ignore_result(SetStringField(out, PATH_FIELD, path_str)); ignore_result(SetStringField(out, FEEDBACK_DIR_FIELD, feedback_dir_)); return base_class::SerializeFields(out); } diff --git a/src/server/daemon/commands_info/stream/get_log_info.h b/src/server/daemon/commands_info/stream/get_log_info.h index a0e7fd5..4e5d69e 100644 --- a/src/server/daemon/commands_info/stream/get_log_info.h +++ b/src/server/daemon/commands_info/stream/get_log_info.h @@ -14,10 +14,10 @@ #pragma once -#include - #include +#include + #include "server/daemon/commands_info/stream/stream_info.h" namespace fastocloud { @@ -27,12 +27,10 @@ namespace stream { class GetLogInfo : public StreamInfo { public: typedef StreamInfo base_class; - typedef common::uri::GURL url_t; GetLogInfo(); - explicit GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir, const url_t& log_path); + explicit GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir); - url_t GetLogPath() const; std::string GetFeedbackDir() const; protected: @@ -41,7 +39,6 @@ class GetLogInfo : public StreamInfo { private: std::string feedback_dir_; - url_t path_; }; typedef GetLogInfo GetPipelineInfo; diff --git a/src/server/daemon/commands_info/stream/restart_info.cpp b/src/server/daemon/commands_info/stream/restart_info.cpp index 126969d..b36ba46 100644 --- a/src/server/daemon/commands_info/stream/restart_info.cpp +++ b/src/server/daemon/commands_info/stream/restart_info.cpp @@ -16,12 +16,6 @@ namespace fastocloud { namespace server { -namespace stream { - -RestartInfo::RestartInfo() : base_class() {} - -RestartInfo::RestartInfo(const fastotv::stream_id_t& stream_id) : base_class(stream_id) {} - -} // namespace stream +namespace stream {} // namespace stream } // namespace server } // namespace fastocloud diff --git a/src/server/daemon/commands_info/stream/restart_info.h b/src/server/daemon/commands_info/stream/restart_info.h index bc74894..9476cd7 100644 --- a/src/server/daemon/commands_info/stream/restart_info.h +++ b/src/server/daemon/commands_info/stream/restart_info.h @@ -20,13 +20,7 @@ namespace fastocloud { namespace server { namespace stream { -class RestartInfo : public StreamInfo { - public: - typedef StreamInfo base_class; - - RestartInfo(); - explicit RestartInfo(const fastotv::stream_id_t& stream_id); -}; +typedef StreamInfo RestartInfo; } // namespace stream } // namespace server diff --git a/src/server/daemon/commands_info/stream/stream_info.h b/src/server/daemon/commands_info/stream/stream_info.h index 3b8d8f9..81e5c3d 100644 --- a/src/server/daemon/commands_info/stream/stream_info.h +++ b/src/server/daemon/commands_info/stream/stream_info.h @@ -14,12 +14,11 @@ #pragma once -#include - #include - #include +#include + namespace fastocloud { namespace server { namespace stream { diff --git a/src/server/daemon/server.h b/src/server/daemon/server.h index eb73ebd..7f96052 100644 --- a/src/server/daemon/server.h +++ b/src/server/daemon/server.h @@ -14,14 +14,14 @@ #pragma once -#include +#include namespace fastocloud { namespace server { -class DaemonServer : public common::daemon::DaemonServer { +class DaemonServer : public common::libev::tcp::TcpServer { public: - typedef common::daemon::DaemonServer base_class; + typedef common::libev::tcp::TcpServer base_class; explicit DaemonServer(const common::net::HostAndPort& host, common::libev::IoLoopObserver* observer = nullptr); private: diff --git a/src/server/daemon_slave.cpp b/src/server/daemon_slave.cpp index 15899f5..031e1e6 100644 --- a/src/server/daemon_slave.cpp +++ b/src/server/daemon_slave.cpp @@ -76,7 +76,7 @@ int main(int argc, char** argv, char** envp) { return EXIT_FAILURE; } - err = fastocloud::server::ProcessSlaveWrapper::SendStopDaemonRequest(config); + err = fastocloud::server::ProcessSlaveWrapper::SendStopDaemonRequest(config.host); sleep(fastocloud::server::ProcessSlaveWrapper::cleanup_seconds + 1); if (err) { std::cerr << "Stop command failed error: " << err->GetDescription() << std::endl; @@ -92,7 +92,7 @@ int main(int argc, char** argv, char** envp) { return EXIT_FAILURE; } - err = fastocloud::server::ProcessSlaveWrapper::SendRestartDaemonRequest(config); + err = fastocloud::server::ProcessSlaveWrapper::SendRestartDaemonRequest(config.host); sleep(fastocloud::server::ProcessSlaveWrapper::cleanup_seconds + 1); if (err) { std::cerr << "Reload command failed error: " << err->GetDescription() << std::endl; diff --git a/src/server/http/client.h b/src/server/http/client.h index 4f6642b..8790ee6 100644 --- a/src/server/http/client.h +++ b/src/server/http/client.h @@ -21,18 +21,18 @@ typedef struct ssl_st SSL; namespace fastocloud { namespace server { -class HttpClient : public common::libev::http::HttpClient { +class HttpClient : public common::libev::http::HttpServerClient { public: - typedef common::libev::http::HttpClient base_class; + typedef common::libev::http::HttpServerClient base_class; HttpClient(common::libev::IoLoop* server, const common::net::socket_info& info); const char* ClassName() const override; }; -class HttpsClient : public common::libev::http::HttpClient { +class HttpsClient : public common::libev::http::HttpServerClient { public: - typedef common::libev::http::HttpClient base_class; + typedef common::libev::http::HttpServerClient base_class; HttpsClient(common::libev::IoLoop* server, const common::net::socket_info& info, SSL* ssl); diff --git a/src/server/http/handler.cpp b/src/server/http/handler.cpp index 15c144a..2dc7ba2 100644 --- a/src/server/http/handler.cpp +++ b/src/server/http/handler.cpp @@ -112,8 +112,9 @@ void HttpHandler::ProcessReceived(HttpClient* hclient, const char* request, size // keep alive common::http::header_t connection_field; - bool is_find_connection = hrequest.FindHeaderByKey("Connection", false, &connection_field); - bool IsKeepAlive = is_find_connection ? common::EqualsASCII(connection_field.value, "Keep-Alive", false) : false; + // bool is_find_connection = hrequest.FindHeaderByKey("Connection", false, &connection_field); + bool IsKeepAlive = + false; // is_find_connection ? common::EqualsASCII(connection_field.value, "Keep-Alive", false) : false; const common::http::http_protocol protocol = hrequest.GetProtocol(); const common::http::http_method method = hrequest.GetMethod(); if (method == common::http::http_method::HM_GET || method == common::http::http_method::HM_HEAD) { @@ -181,7 +182,7 @@ void HttpHandler::ProcessReceived(HttpClient* hclient, const char* request, size } if (hrequest.GetMethod() == common::http::http_method::HM_GET) { - common::ErrnoError err = hclient->SendFileByFd(protocol, file, sb.st_size); + common::ErrnoError err = hclient->SendFileByFd(protocol, file); if (err) { DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR); } else { diff --git a/src/server/pipe/client.h b/src/server/pipe/client.h index a3d4abc..333d9c7 100644 --- a/src/server/pipe/client.h +++ b/src/server/pipe/client.h @@ -14,10 +14,15 @@ #pragma once -#include - #include +namespace common { +namespace libev { +class PipeReadClient; +class PipeWriteClient; +} // namespace libev +} // namespace common + namespace fastocloud { namespace server { namespace pipe { diff --git a/src/server/process_slave_wrapper.cpp b/src/server/process_slave_wrapper.cpp index cad6958..9dc4d14 100644 --- a/src/server/process_slave_wrapper.cpp +++ b/src/server/process_slave_wrapper.cpp @@ -56,17 +56,13 @@ #undef SetPort #endif +#define API_KEY_PARAM "API-KEY" + +#define WS_UPDATES "updates" + namespace { -common::Error PostHttpOrHttpsFile(const common::file_system::ascii_file_string_path& file_path, - const common::uri::GURL& url) { - struct timeval tv = {1, 0}; - const common::http::headers_t extra = {}; - if (url.SchemeIs("http")) { - return common::net::PostHttpFile(file_path, url, extra, &tv); - } - return common::net::PostHttpsFile(file_path, url, extra, &tv); -} +const auto kClosedDaemon = common::make_errno_error("Connection closed", EAGAIN); common::Optional MakeStreamConfigJsonPath( const std::string& feedback_dir) { @@ -90,22 +86,42 @@ namespace fastocloud { namespace server { namespace { template -common::ErrnoError ParseClientRequest(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req, - common::serializer::JsonSerializer* result, - bool access = true) { - if (!req->params || !dclient || !req || !result) { +common::ErrnoError ParsePostClientRequest(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req, + common::serializer::JsonSerializer* result, + bool access = true) { + if (!dclient || !result) { return common::make_errno_error_inval(); } if (access) { + common::http::header_t found_key_in_headers; + if (!req.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) { + return common::make_errno_error("Don't have permissions", EACCES); + } + + const auto expire_key = common::license::make_license(found_key_in_headers.value); + if (!expire_key) { + return common::make_errno_error("Invalid access key", EACCES); + } + + common::time64_t tm; + bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm); + if (!is_valid) { + common::Error err = common::make_error("Invalid expire key"); + return common::make_errno_error(err->GetDescription(), EINVAL); + } + + dclient->SetVerified(true, tm); + if (!dclient->HaveFullAccess()) { return common::make_errno_error("Don't have permissions", EACCES); } } - const char* params_ptr = req->params->c_str(); - json_object* jinfo = json_tokener_parse(params_ptr); + const auto params_ptr = req.GetBody(); + const char* data = params_ptr.data(); + json_object* jinfo = json_tokener_parse(data); if (!jinfo) { return common::make_errno_error("Can't parse request", EINVAL); } @@ -119,6 +135,65 @@ common::ErrnoError ParseClientRequest(ProtocoledDaemonClient* dclient, return common::ErrnoError(); } +common::ErrnoError ParseGetClientRequest(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req, + bool access = true) { + if (!dclient) { + return common::make_errno_error_inval(); + } + + if (access) { + common::http::header_t found_key_in_headers; + if (!req.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) { + return common::make_errno_error("Don't have permissions", EACCES); + } + + const auto expire_key = common::license::make_license(found_key_in_headers.value); + if (!expire_key) { + return common::make_errno_error("Invalid access key", EACCES); + } + + common::time64_t tm; + bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm); + if (!is_valid) { + common::Error err = common::make_error("Invalid expire key"); + return common::make_errno_error(err->GetDescription(), EINVAL); + } + + dclient->SetVerified(true, tm); + + if (!dclient->HaveFullAccess()) { + return common::make_errno_error("Don't have permissions", EACCES); + } + } + + return common::ErrnoError(); +} + +template +common::ErrnoError ParseStreamRequest(ProcessSlaveWrapper::stream_client_t* pclient, + const fastotv::protocol::request_t* req, + common::serializer::JsonSerializer* result) { + if (!req->params || !pclient || !req || !result) { + return common::make_errno_error_inval(); + } + + const char* params_ptr = req->params->c_str(); + json_object* jinfo = json_tokener_parse(params_ptr); + if (!jinfo) { + return common::make_errno_error_inval(); + } + + common::Error err_des = result->DeSerialize(jinfo); + json_object_put(jinfo); + if (err_des) { + const std::string err_str = err_des->GetDescription(); + return common::make_errno_error(err_str, EAGAIN); + } + + return common::ErrnoError(); +} + typedef HttpHandler VodsHandler; typedef HttpServer VodsServer; typedef VodsHandler CodsHandler; @@ -251,19 +326,16 @@ ProcessSlaveWrapper::ProcessSlaveWrapper(const Config& config) cods_server_->SetName("cods_server"); } -common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const Config& config) { - if (!config.IsValid()) { - return common::make_errno_error_inval(); - } - +common::ErrnoError ProcessSlaveWrapper::SendStopDaemonRequest(const common::net::HostAndPort& host) { common::net::socket_info client_info; - common::ErrnoError err = common::net::connect(config.host, common::net::ST_SOCK_STREAM, nullptr, &client_info); + common::ErrnoError err = common::net::connect(host, common::net::ST_SOCK_STREAM, nullptr, &client_info); if (err) { return err; } std::unique_ptr connection(new ProtocoledDaemonClient(nullptr, client_info)); - err = connection->RestartMe(); + auto url = common::uri::GURL("http://" + common::ConvertToString(host) + "/" + DAEMON_STOP_SERVICE); + err = connection->StopMe(url); if (err) { ignore_result(connection->Close()); return err; @@ -273,19 +345,16 @@ common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const Config& c return common::ErrnoError(); } -common::ErrnoError ProcessSlaveWrapper::SendStopDaemonRequest(const Config& config) { - if (!config.IsValid()) { - return common::make_errno_error_inval(); - } - +common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const common::net::HostAndPort& host) { common::net::socket_info client_info; - common::ErrnoError err = common::net::connect(config.host, common::net::ST_SOCK_STREAM, nullptr, &client_info); + common::ErrnoError err = common::net::connect(host, common::net::ST_SOCK_STREAM, nullptr, &client_info); if (err) { return err; } std::unique_ptr connection(new ProtocoledDaemonClient(nullptr, client_info)); - err = connection->StopMe(); + auto url = common::uri::GURL("http://" + common::ConvertToString(host) + "/" + DAEMON_RESTART_SERVICE); + err = connection->RestartMe(url); if (err) { ignore_result(connection->Close()); return err; @@ -357,10 +426,6 @@ common::ErrnoError ProcessSlaveWrapper::Prepare() { return common::ErrnoError(); } -bool ProcessSlaveWrapper::IsStoped() const { - return stoped_; -} - int ProcessSlaveWrapper::Exec(int argc, char** argv) { process_argc_ = argc; process_argv_ = argv; @@ -373,7 +438,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) { // gpu statistic monitor HttpServer* http_server = static_cast(http_server_); - std::thread http_thread = std::thread([http_server] { + auto http_thread = std::thread([http_server] { common::ErrnoError err = http_server->Bind(true); if (err) { ERROR_LOG() << "HttpServer bind error: " << err->GetDescription(); @@ -386,13 +451,13 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) { return; } - INFO_LOG() << "HttpServer have started"; + INFO_LOG() << "HttpServer have started address: " << common::ConvertToString(http_server->GetHost()); int res = http_server->Exec(); INFO_LOG() << "HttpServer have finished: " << res; }); HttpServer* vods_server = static_cast(vods_server_); - std::thread vods_thread = std::thread([vods_server] { + auto vods_thread = std::thread([vods_server] { common::ErrnoError err = vods_server->Bind(true); if (err) { ERROR_LOG() << "VodsServer bind error: " << err->GetDescription(); @@ -405,13 +470,13 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) { return; } - INFO_LOG() << "VodsServer have started"; + INFO_LOG() << "VodsServer have started address: " << common::ConvertToString(vods_server->GetHost()); int res = vods_server->Exec(); INFO_LOG() << "VodsServer have finished: " << res; }); HttpServer* cods_server = static_cast(cods_server_); - std::thread cods_thread = std::thread([cods_server] { + auto cods_thread = std::thread([cods_server] { common::ErrnoError err = cods_server->Bind(true); if (err) { ERROR_LOG() << "CodsServer bind error: " << err->GetDescription(); @@ -424,7 +489,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) { return; } - INFO_LOG() << "CodsServer have started"; + INFO_LOG() << "CodsServer have started address: " << common::ConvertToString(cods_server->GetHost()); int res = cods_server->Exec(); INFO_LOG() << "CodsServer have finished: " << res; }); @@ -454,6 +519,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) { node_stats_->prev_nshot = service::GetMachineNetShot(); node_stats_->timestamp = common::time::current_utc_mstime(); + INFO_LOG() << "DaemonServer have started address: " << common::ConvertToString(server->GetHost()); res = server->Exec(); finished: @@ -470,6 +536,10 @@ finished: return res; } +bool ProcessSlaveWrapper::IsStoped() const { + return stoped_; +} + void ProcessSlaveWrapper::PreLooped(common::libev::IoLoop* server) { ping_client_timer_ = server->CreateTimer(ping_timeout_clients_seconds, true); check_cods_vods_timer_ = server->CreateTimer(config_.cods_ttl / 2, true); @@ -488,7 +558,13 @@ void ProcessSlaveWrapper::Moved(common::libev::IoLoop* server, common::libev::Io } void ProcessSlaveWrapper::Closed(common::libev::IoClient* client) { - UNUSED(client); + ProtocoledDaemonClient* dclient = dynamic_cast(client); + if (dclient && dclient->IsVerified()) { + auto step = dclient->Step(); + if (step != common::libev::websocket::ZERO) { + ignore_result(dclient->SendEOS()); + } + } } void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::libev::timer_id_t id) { @@ -508,24 +584,14 @@ void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::lib } continue; } - - common::ErrnoError err = dclient->Ping(); - if (err) { - DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR); - ignore_result(dclient->Close()); - delete dclient; - } else { - INFO_LOG() << "Sent ping to client[" << client->GetFormatedName() << "], from server[" - << server->GetFormatedName() << "], " << online_clients.size() << " client(s) connected."; - } } } } else if (check_cods_vods_timer_ == id) { fastotv::timestamp_t current_time = common::time::current_utc_mstime(); auto childs = server->GetChilds(); for (auto* child : childs) { - auto channel = static_cast(child); - if (channel->IsCOD()) { + auto channel = dynamic_cast(child); + if (channel && channel->IsCOD()) { fastotv::timestamp_t cod_last_update = channel->GetLastUpdate(); fastotv::timestamp_t ts_diff = current_time - cod_last_update; if (ts_diff > config_.cods_ttl * 1000) { @@ -540,8 +606,8 @@ void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::lib RemoveOldFilesByTime(folder, max_life_time, "*" CHUNK_EXT, true); } } else if (node_stats_timer_ == id) { - const std::string node_stats = MakeServiceStats(common::Optional()); - fastotv::protocol::request_t req; + const auto node_stats = MakeServiceInfoStats(); + common::json::WsDataJson req; common::Error err_ser = StatisitcServiceBroadcast(node_stats, &req); if (err_ser) { return; @@ -577,7 +643,7 @@ void ProcessSlaveWrapper::ChildStatusChanged(common::libev::IoChild* child, int delete channel; stream::QuitStatusInfo ch_status_info(sid, status, signal); - fastotv::protocol::request_t req; + common::json::WsDataJson req; common::Error err_ser = QuitStatusStreamBroadcast(ch_status_info, &req); if (err_ser) { return; @@ -615,12 +681,12 @@ ChildStream* ProcessSlaveWrapper::FindChildByID(fastotv::stream_id_t cid) const return nullptr; } -void ProcessSlaveWrapper::BroadcastClients(const fastotv::protocol::request_t& req) { +void ProcessSlaveWrapper::BroadcastClients(const common::json::WsDataJson& req) { std::vector clients = loop_->GetClients(); for (size_t i = 0; i < clients.size(); ++i) { ProtocoledDaemonClient* dclient = dynamic_cast(clients[i]); if (dclient && dclient->IsVerified()) { - common::ErrnoError err = dclient->WriteRequest(req); + common::ErrnoError err = dclient->Broadcast(req); if (err) { WARNING_LOG() << "BroadcastClients error: " << err->GetDescription(); } @@ -628,47 +694,115 @@ void ProcessSlaveWrapper::BroadcastClients(const fastotv::protocol::request_t& r } } -common::ErrnoError ProcessSlaveWrapper::DaemonDataReceived(ProtocoledDaemonClient* dclient) { - CHECK(loop_->IsLoopThread()); - std::string input_command; - common::ErrnoError err = dclient->ReadCommand(&input_command); - if (err) { - return err; // i don't want handle spam, comand must be foramated according protocol - } +common::ErrnoError ProcessSlaveWrapper::ProcessReceived(ProtocoledDaemonClient* hclient, + const char* request, + size_t req_len) { + static const common::libev::http::HttpServerInfo hinf(PROJECT_NAME_TITLE "/" PROJECT_VERSION, PROJECT_DOMAIN); + common::http::HttpRequest hrequest; + const auto result = common::http::parse_http_request(request, req_len, &hrequest); + DEBUG_LOG() << "Http request:\n" << request; - fastotv::protocol::request_t* req = nullptr; - fastotv::protocol::response_t* resp = nullptr; - common::Error err_parse = common::protocols::json_rpc::ParseJsonRPC(input_command, &req, &resp); - if (err_parse) { - const std::string err_str = err_parse->GetDescription(); + common::http::headers_t extra_headers = {{"Access-Control-Allow-Origin", "*"}}; + if (result.second) { + const std::string err_str = result.second->GetDescription(); return common::make_errno_error(err_str, EAGAIN); } - if (req) { - if (req->IsNotification()) { - delete req; - return common::make_errno_error("Don't handle notificastions for now.", EINVAL); + auto url = hrequest.GetURL(); + auto route = url.path(); + auto method = hrequest.GetMethod(); + if (method == common::http::HM_POST) { + if (route == "/" DAEMON_STOP_SERVICE) { // + + return HandleRequestClientStopService(hclient, hrequest); + } else if (route == "/" DAEMON_RESTART_SERVICE) { // + + return HandleRequestClientRestartService(hclient, hrequest); + } else if (route == "/" DAEMON_START_STREAM) { // + + return HandleRequestClientStartStream(hclient, hrequest); + } else if (route == "/" DAEMON_RESTART_STREAM) { // + + return HandleRequestClientRestartStream(hclient, hrequest); + } else if (route == "/" DAEMON_STOP_STREAM) { // + + return HandleRequestClientStopStream(hclient, hrequest); + } else if (route == "/" DAEMON_GET_PIPELINE_STREAM) { // + + return HandleRequestClientGetPipelineStream(hclient, hrequest); + } else if (route == "/" DAEMON_GET_LOG_STREAM) { // + + return HandleRequestClientGetLogStream(hclient, hrequest); + } else if (route == "/" DAEMON_GET_CONFIG_JSON_STREAM) { // + + return HandleRequestClientGetConfigJsonStream(hclient, hrequest); } - DEBUG_LOG() << "Received daemon request: " << input_command; - err = HandleRequestServiceCommand(dclient, req); - if (err) { - DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR); + } else if (method == common::http::http_method::HM_GET || method == common::http::http_method::HM_HEAD) { + if (method == common::http::http_method::HM_GET) { + if (route == "/" DAEMON_STATS_SERVICE) { // + + return HandleRequestClientGetStats(hclient, hrequest); + } else if (route == "/" DAEMON_GET_LOG_SERVICE) { // + + return HandleRequestClientGetLogService(hclient, hrequest); + } else if (route == "/" WS_UPDATES) { + common::http::header_t head; + if (!hrequest.FindHeaderByKey("Sec-WebSocket-Key", false, &head)) { + return common::make_errno_error("not found Sec-WebSocket-Key header", EAGAIN); + } + + common::http::header_t found_key_in_headers; + if (!hrequest.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) { + auto query_str = url.query(); + common::uri::Component key, value; + common::uri::Component query(0, query_str.length()); + common::Optional found_key_in_params; + while (common::uri::ExtractQueryKeyValue(query_str.c_str(), &query, &key, &value)) { + std::string key_string(query_str.substr(key.begin, key.len)); + std::string param_text(query_str.substr(value.begin, value.len)); + if (common::EqualsASCII(key_string, API_KEY_PARAM, false)) { + found_key_in_params = param_text; + break; + } + } + + if (!found_key_in_params) { + return common::make_errno_error("Don't have permissions", EACCES); + } + found_key_in_headers.value = *found_key_in_params; + } + + const auto expire_key = + common::license::make_license(found_key_in_headers.value); + if (!expire_key) { + common::Error err = common::make_error("Invalid expire key"); + return common::make_errno_error(err->GetDescription(), EINVAL); + } + + common::time64_t tm; + bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm); + if (!is_valid) { + common::Error err = common::make_error("Failed to get expire key time"); + return common::make_errno_error(err->GetDescription(), EINVAL); + } + + hclient->SetVerified(true, tm); + if (!hclient->HaveFullAccess()) { + return common::make_errno_error("Don't have permissions", EACCES); + } + + common::http::headers_t extra_headers = {{"Access-Control-Allow-Origin", "*"}}; + return hclient->SendSwitchProtocolsResponse(head.value, extra_headers, hclient->GetServerInfo()); + } } - delete req; - return common::ErrnoError(); - } else if (resp) { - DEBUG_LOG() << "Received daemon responce: " << input_command; - err = HandleResponceServiceCommand(dclient, resp); - if (err) { - DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR); + } + return hclient->UnknownMethodError(method, route); +} + +common::ErrnoError ProcessSlaveWrapper::DaemonDataReceived(ProtocoledDaemonClient* dclient) { + CHECK(loop_->IsLoopThread()); + char buff[BUF_SIZE] = {0}; + size_t nread = 0; + common::ErrnoError errn = dclient->SingleRead(buff, BUF_SIZE - 1, &nread); + if (errn || nread == 0) { + if (nread == 0) { + return kClosedDaemon; } - delete resp; - return common::ErrnoError(); + return errn; } - WARNING_LOG() << "Received unknown daemon message: " << input_command; - return common::make_errno_error("Invalid command type.", EINVAL); + return ProcessReceived(dclient, buff, nread); } common::ErrnoError ProcessSlaveWrapper::StreamDataReceived(stream_client_t* pipe_client) { @@ -711,11 +845,17 @@ common::ErrnoError ProcessSlaveWrapper::StreamDataReceived(stream_client_t* pipe void ProcessSlaveWrapper::DataReceived(common::libev::IoClient* client) { if (ProtocoledDaemonClient* dclient = dynamic_cast(client)) { - common::ErrnoError err = DaemonDataReceived(dclient); + auto err = DaemonDataReceived(dclient); if (err) { - DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR); + ERROR_LOG() << "DataReceived daemon error: " << err->GetDescription(); ignore_result(dclient->Close()); delete dclient; + } else { + auto step = dclient->Step(); + if (step == common::libev::websocket::ZERO) { + ignore_result(dclient->Close()); + delete dclient; + } } } else if (stream_client_t* pipe_client = dynamic_cast(client)) { common::ErrnoError err = StreamDataReceived(pipe_client); @@ -776,108 +916,62 @@ void ProcessSlaveWrapper::PostLooped(common::libev::IoLoop* server) { } common::ErrnoError ProcessSlaveWrapper::HandleRequestClientRestartService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); - if (!dclient->IsVerified()) { - const auto info = dclient->GetInfo(); - if (!common::net::IsLocalHost(info.host())) { - common::Error err(common::MemSPrintf("Skipped stop request from: %s", info.host())); - ignore_result(dclient->StopFail(req->id, err)); - return common::make_errno_error(err->GetDescription(), EINVAL); - } + const auto info = dclient->GetInfo(); + if (!common::net::IsLocalHost(info.host())) { + common::Error err(common::MemSPrintf("Skipped restart request from: %s", info.host())); + ignore_result(dclient->RestartFail(common::http::HS_FORBIDDEN, err)); + return common::make_errno_error(err->GetDescription(), EINVAL); + }; + + common::daemon::commands::RestartInfo restart_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &restart_info, false); + if (errn) { + ignore_result(dclient->RestartFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); + return errn; } - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jstop = json_tokener_parse(params_ptr); - if (!jstop) { - return common::make_errno_error_inval(); - } - - common::daemon::commands::RestartInfo stop_info; - common::Error err_des = stop_info.DeSerialize(jstop); - json_object_put(jstop); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - if (quit_cleanup_timer_ != INVALID_TIMER_ID) { - // in progress - return dclient->StopFail(req->id, common::make_error("Restart service in progress...")); - } - - StopImpl(); - return dclient->StopSuccess(req->id); + if (quit_cleanup_timer_ != INVALID_TIMER_ID) { + // in progress + common::Error err = common::make_error("Restart service in progress..."); + ignore_result(dclient->RestartFail(common::http::HS_INTERNAL_ERROR, err)); + return common::make_errno_error(err->GetDescription(), EAGAIN); } - return common::make_errno_error_inval(); + StopImpl(); + return dclient->RestartSuccess(); } common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); - if (!dclient->IsVerified()) { - const auto info = dclient->GetInfo(); - if (!common::net::IsLocalHost(info.host())) { - common::Error err(common::MemSPrintf("Skipped stop request from: %s", info.host())); - ignore_result(dclient->StopFail(req->id, err)); - return common::make_errno_error(err->GetDescription(), EINVAL); - } + const auto info = dclient->GetInfo(); + if (!common::net::IsLocalHost(info.host())) { + common::Error err(common::MemSPrintf("Skipped restart request from: %s", info.host())); + ignore_result(dclient->StopFail(common::http::HS_FORBIDDEN, err)); + return common::make_errno_error(err->GetDescription(), EINVAL); } - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jstop = json_tokener_parse(params_ptr); - if (!jstop) { - return common::make_errno_error_inval(); - } - - common::daemon::commands::StopInfo stop_info; - common::Error err_des = stop_info.DeSerialize(jstop); - json_object_put(jstop); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - if (quit_cleanup_timer_ != INVALID_TIMER_ID) { - // in progress - return dclient->StopFail(req->id, common::make_error("Stop service in progress...")); - } - - StopImpl(); - stoped_ = true; - return dclient->StopSuccess(req->id); + common::daemon::commands::StopInfo stop_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &stop_info, false); + if (errn) { + ignore_result(dclient->StopFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); + return errn; } - return common::make_errno_error_inval(); -} - -common::ErrnoError ProcessSlaveWrapper::HandleResponcePingService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::response_t* resp) { - UNUSED(dclient); - CHECK(loop_->IsLoopThread()); - - if (resp->IsMessage()) { - const char* params_ptr = resp->message->result.c_str(); - json_object* jclient_ping = json_tokener_parse(params_ptr); - if (!jclient_ping) { - return common::make_errno_error_inval(); - } - - common::daemon::commands::ClientPingInfo client_ping_info; - common::Error err_des = client_ping_info.DeSerialize(jclient_ping); - json_object_put(jclient_ping); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - return common::ErrnoError(); + if (quit_cleanup_timer_ != INVALID_TIMER_ID) { + // in progress + common::Error err = common::make_error("Stop service in progress..."); + ignore_result(dclient->StopFail(common::http::HS_INTERNAL_ERROR, err)); + return common::make_errno_error(err->GetDescription(), EAGAIN); } - return common::ErrnoError(); + + StopImpl(); + stoped_ = true; + return dclient->StopSuccess(); } common::ErrnoError ProcessSlaveWrapper::CreateChildStream(const serialized_stream_t& config_args) { @@ -925,68 +1019,44 @@ common::ErrnoError ProcessSlaveWrapper::StopChildStreamImpl(fastotv::stream_id_t common::ErrnoError ProcessSlaveWrapper::HandleRequestChangedSourcesStream(stream_client_t* pclient, const fastotv::protocol::request_t* req) { - UNUSED(pclient); CHECK(loop_->IsLoopThread()); - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jrequest_changed_sources = json_tokener_parse(params_ptr); - if (!jrequest_changed_sources) { - return common::make_errno_error_inval(); - } - ChangedSouresInfo ch_sources_info; - common::Error err_des = ch_sources_info.DeSerialize(jrequest_changed_sources); - json_object_put(jrequest_changed_sources); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - fastotv::protocol::request_t req; - common::Error err_ser = ChangedSourcesStreamBroadcast(ch_sources_info, &req); - if (err_ser) { - const std::string err_str = err_ser->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - BroadcastClients(req); - return common::ErrnoError(); + ChangedSouresInfo ch_sources_info; + common::ErrnoError errn = ParseStreamRequest(pclient, req, &ch_sources_info); + if (errn) { + return errn; } - return common::make_errno_error_inval(); + common::json::WsDataJson breq; + common::Error err_ser = ChangedSourcesStreamBroadcast(ch_sources_info, &breq); + if (err_ser) { + const std::string err_str = err_ser->GetDescription(); + return common::make_errno_error(err_str, EAGAIN); + } + + BroadcastClients(breq); + return common::ErrnoError(); } common::ErrnoError ProcessSlaveWrapper::HandleRequestStatisticStream(stream_client_t* pclient, const fastotv::protocol::request_t* req) { - UNUSED(pclient); CHECK(loop_->IsLoopThread()); - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jrequest_stat = json_tokener_parse(params_ptr); - if (!jrequest_stat) { - return common::make_errno_error_inval(); - } - StatisticInfo stat; - common::Error err_des = stat.DeSerialize(jrequest_stat); - json_object_put(jrequest_stat); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - fastotv::protocol::request_t req; - common::Error err_ser = StatisitcStreamBroadcast(stat, &req); - if (err_ser) { - const std::string err_str = err_ser->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - BroadcastClients(req); - return common::ErrnoError(); + StatisticInfo stat; + common::ErrnoError errn = ParseStreamRequest(pclient, req, &stat); + if (errn) { + return errn; } - return common::make_errno_error_inval(); + common::json::WsDataJson breq; + common::Error err_ser = StatisitcStreamBroadcast(stat, &breq); + if (err_ser) { + const std::string err_str = err_ser->GetDescription(); + return common::make_errno_error(err_str, EAGAIN); + } + + BroadcastClients(breq); + return common::ErrnoError(); } #if defined(MACHINE_LEARNING) @@ -1024,369 +1094,170 @@ common::ErrnoError ProcessSlaveWrapper::HandleRequestMlNotificationStream(stream } #endif -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStartStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { +common::ErrnoError ProcessSlaveWrapper::RestartChildStreamImpl(fastotv::stream_id_t sid) { CHECK(loop_->IsLoopThread()); - if (!dclient->HaveFullAccess()) { - return common::make_errno_error("Don't have permissions", EINTR); + + auto stream = FindChildByID(sid); + if (!stream) { + return common::make_errno_error(common::MemSPrintf("Stream with id: %s does not exist, skip request.", sid), + EINVAL); } - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jstart_info = json_tokener_parse(params_ptr); - if (!jstart_info) { - return common::make_errno_error_inval(); - } - - stream::StartInfo start_info; - common::Error err_des = start_info.DeSerialize(jstart_info); - json_object_put(jstart_info); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - common::ErrnoError errn = CreateChildStream(start_info.GetConfig()); - if (errn) { - DEBUG_MSG_ERROR(errn, common::logging::LOG_LEVEL_WARNING); - ignore_result(dclient->StartStreamFail(req->id, common::make_error_from_errno(errn))); - return errn; - } - - return dclient->StartStreamSuccess(req->id); - } - - return common::make_errno_error_inval(); -} - -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { - CHECK(loop_->IsLoopThread()); - if (!dclient->HaveFullAccess()) { - return common::make_errno_error("Don't have permissions", EINTR); - } - - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jstop_info = json_tokener_parse(params_ptr); - if (!jstop_info) { - return common::make_errno_error_inval(); - } - - stream::StopInfo stop_info; - common::Error err_des = stop_info.DeSerialize(jstop_info); - json_object_put(jstop_info); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - common::ErrnoError errn = StopChildStreamImpl(stop_info.GetStreamID(), stop_info.GetForce()); - if (errn) { - return dclient->StopFail(req->id, common::make_error_from_errno(errn)); - } - - return dclient->StopStreamSuccess(req->id); - } - - return common::make_errno_error_inval(); + return stream->Restart(); } common::ErrnoError ProcessSlaveWrapper::HandleRequestClientRestartStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { - CHECK(loop_->IsLoopThread()); - if (!dclient->HaveFullAccess()) { - return common::make_errno_error("Don't have permissions", EINTR); - } - - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jrestart_info = json_tokener_parse(params_ptr); - if (!jrestart_info) { - return common::make_errno_error_inval(); - } - - stream::RestartInfo restart_info; - common::Error err_des = restart_info.DeSerialize(jrestart_info); - json_object_put(jrestart_info); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - Child* chan = FindChildByID(restart_info.GetStreamID()); - if (!chan) { - return dclient->ReStartStreamFail(req->id, common::make_error("Stream not found")); - } - - ignore_result(chan->Restart()); - return dclient->ReStartStreamSuccess(req->id); - } - - return common::make_errno_error_inval(); -} - -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); - stream::GetLogInfo log_info; - common::ErrnoError errn = ParseClientRequest(dclient, req, &log_info); + stream::RestartInfo restart_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &restart_info); if (errn) { - ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->ReStartStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); return errn; } - const auto remote_log_path = log_info.GetLogPath(); - if (!remote_log_path.SchemeIsHTTPOrHTTPS()) { - common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN); - ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn))); + errn = RestartChildStreamImpl(restart_info.GetStreamID()); + if (errn) { + ignore_result(dclient->ReStartStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); + return errn; + } + + return dclient->ReStartStreamSuccess(); +} + +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogService(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { + CHECK(loop_->IsLoopThread()); + + common::ErrnoError errn = ParseGetClientRequest(dclient, req); + if (errn) { + ignore_result(dclient->GetLogServiceFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); + return errn; + } + + return dclient->GetLogServiceSuccess(config_.log_path); +} + +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogStream(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { + CHECK(loop_->IsLoopThread()); + + stream::GetLogInfo log_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &log_info); + if (errn) { + ignore_result(dclient->GetLogStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); return errn; } const auto stream_log_file = MakeStreamLogPath(log_info.GetFeedbackDir()); if (!stream_log_file) { common::ErrnoError errn = common::make_errno_error("Can't generate log stream path", EAGAIN); - ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); return errn; } - common::Error err = PostHttpOrHttpsFile(*stream_log_file, remote_log_path); - if (err) { - ignore_result(dclient->GetLogStreamFail(req->id, err)); - const std::string err_str = err->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - ignore_result(dclient->GetLogStreamSuccess(req->id)); - return common::ErrnoError(); + return dclient->GetLogStreamSuccess(stream_log_file->GetPath()); } common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetPipelineStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); stream::GetPipelineInfo pipeline_info; - common::ErrnoError errn = ParseClientRequest(dclient, req, &pipeline_info); + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &pipeline_info); if (errn) { - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); - return errn; - } - - const auto remote_log_path = pipeline_info.GetLogPath(); - if (!remote_log_path.SchemeIsHTTPOrHTTPS()) { - common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN); - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->GetPipeStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); return errn; } const auto pipe_file = MakeStreamPipelinePath(pipeline_info.GetFeedbackDir()); if (!pipe_file) { common::ErrnoError errn = common::make_errno_error("Can't generate pipeline stream path", EAGAIN); - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->GetPipeStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); return errn; } - common::Error err = PostHttpOrHttpsFile(*pipe_file, remote_log_path); - if (err) { - ignore_result(dclient->GetPipeStreamFail(req->id, err)); - const std::string err_str = err->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - ignore_result(dclient->GetPipeStreamSuccess(req->id)); - return common::ErrnoError(); + return dclient->GetPipeStreamSuccess(pipe_file->GetPath()); } -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetConfigJsonStream( - ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetConfigJsonStream(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); stream::GetConfigJsonInfo config_json; - common::ErrnoError errn = ParseClientRequest(dclient, req, &config_json); + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &config_json); if (errn) { - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); - return errn; - } - - const auto remote_log_path = config_json.GetLogPath(); - if (!remote_log_path.SchemeIsHTTPOrHTTPS()) { - common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN); - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->GetConfigStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); return errn; } const auto pipe_file = MakeStreamConfigJsonPath(config_json.GetFeedbackDir()); if (!pipe_file) { common::ErrnoError errn = common::make_errno_error("Can't generate config.json stream path", EAGAIN); - ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->GetConfigStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); return errn; } - common::Error err = PostHttpOrHttpsFile(*pipe_file, remote_log_path); - if (err) { - ignore_result(dclient->GetPipeStreamFail(req->id, err)); - const std::string err_str = err->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - ignore_result(dclient->GetPipeStreamSuccess(req->id)); - return common::ErrnoError(); + return dclient->GetConfigStreamSuccess(pipe_file->GetPath()); } -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientActivate(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { - CHECK(loop_->IsLoopThread()); - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jactivate = json_tokener_parse(params_ptr); - if (!jactivate) { - return common::make_errno_error_inval(); - } - - common::daemon::commands::ActivateInfo activate_info; - common::Error err_des = activate_info.DeSerialize(jactivate); - json_object_put(jactivate); - if (err_des) { - ignore_result(dclient->ActivateFail(req->id, err_des)); - return common::make_errno_error(err_des->GetDescription(), EAGAIN); - } - - const auto expire_key = activate_info.GetLicense(); - common::time64_t tm; - bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm); - if (!is_valid) { - common::Error err = common::make_error("Invalid expire key"); - ignore_result(dclient->ActivateFail(req->id, err)); - return common::make_errno_error(err->GetDescription(), EINVAL); - } - - const std::string node_stats = MakeServiceStats(tm); - common::ErrnoError err_ser = dclient->ActivateSuccess(req->id, node_stats); - if (err_ser) { - return err_ser; - } - - dclient->SetVerified(true, tm); - return common::ErrnoError(); - } - - return common::make_errno_error_inval(); -} - -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientPingService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { - CHECK(loop_->IsLoopThread()); - if (!dclient->IsVerified()) { - return common::make_errno_error_inval(); - } - - if (req->params) { - const char* params_ptr = req->params->c_str(); - json_object* jstop = json_tokener_parse(params_ptr); - if (!jstop) { - return common::make_errno_error_inval(); - } - - common::daemon::commands::ClientPingInfo client_ping_info; - common::Error err_des = client_ping_info.DeSerialize(jstop); - json_object_put(jstop); - if (err_des) { - const std::string err_str = err_des->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - return dclient->Pong(req->id); - } - - return common::make_errno_error_inval(); -} - -common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopStream(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); - common::daemon::commands::GetLogInfo get_log_info; - common::ErrnoError errn = ParseClientRequest(dclient, req, &get_log_info); + stream::StopInfo stop_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &stop_info); if (errn) { - ignore_result(dclient->PongFail(req->id, common::make_error_from_errno(errn))); + ignore_result(dclient->StopStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); return errn; } - const auto remote_log_path = get_log_info.GetLogPath(); - if (!remote_log_path.SchemeIsHTTPOrHTTPS()) { - common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN); - ignore_result(dclient->GetLogServiceFail(req->id, common::make_error_from_errno(errn))); + errn = StopChildStreamImpl(stop_info.GetStreamID(), stop_info.GetForce()); + if (errn) { + ignore_result(dclient->StopStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); return errn; } - common::Error err = - PostHttpOrHttpsFile(common::file_system::ascii_file_string_path(config_.log_path), remote_log_path); - if (err) { - ignore_result(dclient->GetLogServiceFail(req->id, err)); - const std::string err_str = err->GetDescription(); - return common::make_errno_error(err_str, EAGAIN); - } - - ignore_result(dclient->GetLogServiceSuccess(req->id)); - return common::ErrnoError(); + return dclient->StopStreamSuccess(); } -common::ErrnoError ProcessSlaveWrapper::HandleRequestServiceCommand(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) { - if (req->method == DAEMON_START_STREAM) { - return HandleRequestClientStartStream(dclient, req); - } else if (req->method == DAEMON_STOP_STREAM) { - return HandleRequestClientStopStream(dclient, req); - } else if (req->method == DAEMON_RESTART_STREAM) { - return HandleRequestClientRestartStream(dclient, req); - } else if (req->method == DAEMON_GET_LOG_STREAM) { - return HandleRequestClientGetLogStream(dclient, req); - } else if (req->method == DAEMON_GET_PIPELINE_STREAM) { - return HandleRequestClientGetPipelineStream(dclient, req); - } else if (req->method == DAEMON_GET_CONFIG_JSON_STREAM) { - return HandleRequestClientGetConfigJsonStream(dclient, req); - } else if (req->method == DAEMON_RESTART_SERVICE) { - return HandleRequestClientRestartService(dclient, req); - } else if (req->method == DAEMON_STOP_SERVICE) { - return HandleRequestClientStopService(dclient, req); - } else if (req->method == DAEMON_ACTIVATE) { - return HandleRequestClientActivate(dclient, req); - } else if (req->method == DAEMON_PING_SERVICE) { - return HandleRequestClientPingService(dclient, req); - } else if (req->method == DAEMON_GET_LOG_SERVICE) { - return HandleRequestClientGetLogService(dclient, req); - } - - return dclient->UnknownMethodError(req->id, req->method); -} - -common::ErrnoError ProcessSlaveWrapper::HandleResponceServiceCommand(ProtocoledDaemonClient* dclient, - const fastotv::protocol::response_t* resp) { +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStartStream(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { CHECK(loop_->IsLoopThread()); - if (!dclient->IsVerified()) { - return common::make_errno_error_inval(); + + stream::StartInfo start_info; + common::ErrnoError errn = ParsePostClientRequest(dclient, req, &start_info); + if (errn) { + ignore_result(dclient->StartStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); + return errn; } - fastotv::protocol::request_t req; - const auto sid = resp->id; - if (dclient->PopRequestByID(sid, &req)) { - if (req.method == DAEMON_SERVER_PING) { - ignore_result(HandleResponcePingService(dclient, resp)); - } else { - WARNING_LOG() << "HandleResponceServiceCommand not handled responce id: " << *sid << ", command: " << req.method; - } - } else { - WARNING_LOG() << "HandleResponceServiceCommand not found responce id: " << *sid; + errn = CreateChildStream(start_info.GetConfig()); + if (errn) { + ignore_result(dclient->StartStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn))); + return errn; } - return common::ErrnoError(); + return dclient->StartStreamSuccess(); +} + +common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetStats(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) { + CHECK(loop_->IsLoopThread()); + + common::ErrnoError errn = ParseGetClientRequest(dclient, req); + if (errn) { + ignore_result(dclient->GetStatsFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn))); + return errn; + } + + return dclient->GetStatsSuccess(MakeServiceStats(dclient->GetExpTime())); } common::ErrnoError ProcessSlaveWrapper::HandleRequestStreamsCommand(stream_client_t* pclient, const fastotv::protocol::request_t* req) { - if (req->method == CHANGED_SOURCES_STREAM) { + if (req->method == BROADCAST_CHANGED_SOURCES_STREAM) { return HandleRequestChangedSourcesStream(pclient, req); - } else if (req->method == STATISTIC_STREAM) { + } else if (req->method == BROADCAST_STATISTIC_STREAM) { return HandleRequestStatisticStream(pclient, req); } #if defined(MACHINE_LEARNING) @@ -1402,12 +1273,15 @@ common::ErrnoError ProcessSlaveWrapper::HandleRequestStreamsCommand(stream_clien common::ErrnoError ProcessSlaveWrapper::HandleResponceStreamsCommand(stream_client_t* pclient, const fastotv::protocol::response_t* resp) { fastotv::protocol::request_t req; - if (pclient->PopRequestByID(resp->id, &req)) { - if (req.method == STOP_STREAM) { - } else if (req.method == RESTART_STREAM) { + const auto sid = resp->id; + if (pclient->PopRequestByID(sid, &req)) { + if (req.method == REQUEST_STOP_STREAM) { + } else if (req.method == REQUEST_RESTART_STREAM) { } else { - WARNING_LOG() << "HandleResponceStreamsCommand not handled command: " << req.method; + WARNING_LOG() << "HandleResponceStreamsCommand not handled responce id: " << *sid << ", command: " << req.method; } + } else { + WARNING_LOG() << "HandleResponceStreamsCommand not found responce id: " << *sid; } return common::ErrnoError(); } @@ -1427,7 +1301,20 @@ void ProcessSlaveWrapper::CheckLicenseExpired(common::libev::IoLoop* server) { } } -std::string ProcessSlaveWrapper::MakeServiceStats(common::Optional expiration_time) const { +service::FullServiceInfo ProcessSlaveWrapper::MakeServiceStats(common::time64_t expiration_time) const { + common::uri::Replacements replacements; + replacements.SetHost(config_.alias.data(), common::uri::Component(0, static_cast(config_.alias.length()))); + auto stabled_hls_host = config_.hls_host.ReplaceComponents(replacements); + auto stabled_vods_host = config_.vods_host.ReplaceComponents(replacements); + auto stabled_cods_host = config_.cods_host.ReplaceComponents(replacements); + service::FullServiceInfo fstat(stabled_hls_host, stabled_vods_host, stabled_cods_host, expiration_time, + config_.hls_dir, config_.vods_dir, config_.cods_dir, config_.timeshifts_dir, + config_.feedback_dir, config_.proxy_dir, config_.data_dir, MakeServiceInfoStats()); + + return fstat; +} + +service::ServerInfo ProcessSlaveWrapper::MakeServiceInfoStats() const { service::CpuShot next = service::GetMachineCpuShot(); double cpu_load = service::GetCpuMachineLoad(node_stats_->prev, next); node_stats_->prev = next; @@ -1464,29 +1351,7 @@ std::string ProcessSlaveWrapper::MakeServiceStats(common::Optional replacements; - replacements.SetHost(config_.alias.data(), common::uri::Component(0, static_cast(config_.alias.length()))); - auto stabled_hls_host = config_.hls_host.ReplaceComponents(replacements); - auto stabled_vods_host = config_.vods_host.ReplaceComponents(replacements); - auto stabled_cods_host = config_.cods_host.ReplaceComponents(replacements); - service::FullServiceInfo fstat(stabled_hls_host, stabled_vods_host, stabled_cods_host, *expiration_time, - config_.hls_dir, config_.vods_dir, config_.cods_dir, config_.timeshifts_dir, - config_.feedback_dir, config_.proxy_dir, config_.data_dir, stat); - common::Error err_ser = fstat.SerializeToString(&node_stats); - if (err_ser) { - const std::string err_str = err_ser->GetDescription(); - WARNING_LOG() << "Failed to generate node full statistic: " << err_str; - } - } else { - common::Error err_ser = stat.SerializeToString(&node_stats); - if (err_ser) { - const std::string err_str = err_ser->GetDescription(); - WARNING_LOG() << "Failed to generate node statistic: " << err_str; - } - } - return node_stats; + return stat; } } // namespace server diff --git a/src/server/process_slave_wrapper.h b/src/server/process_slave_wrapper.h index cc7f865..69bd70c 100644 --- a/src/server/process_slave_wrapper.h +++ b/src/server/process_slave_wrapper.h @@ -14,10 +14,8 @@ #pragma once -#include -#include -#include - +#include +#include #include #include #include @@ -29,6 +27,7 @@ #include "base/stream_info.h" #include "server/config.h" +#include "server/daemon/commands_info/service/server_info.h" namespace fastocloud { namespace server { @@ -38,6 +37,7 @@ class ProtocoledDaemonClient; class ProcessSlaveWrapper : public common::libev::IoLoopObserver { public: + enum { BUF_SIZE = 4096 }; enum { node_stats_send_seconds = 10, ping_timeout_clients_seconds = 60, @@ -51,9 +51,8 @@ class ProcessSlaveWrapper : public common::libev::IoLoopObserver { explicit ProcessSlaveWrapper(const Config& config); ~ProcessSlaveWrapper() override; - static common::ErrnoError SendStopDaemonRequest(const Config& config); - static common::ErrnoError SendRestartDaemonRequest(const Config& config); - common::net::HostAndPort GetServerHostAndPort(); + static common::ErrnoError SendStopDaemonRequest(const common::net::HostAndPort& host); + static common::ErrnoError SendRestartDaemonRequest(const common::net::HostAndPort& host); int Exec(int argc, char** argv) WARN_UNUSED_RESULT; @@ -75,11 +74,6 @@ class ProcessSlaveWrapper : public common::libev::IoLoopObserver { void DataReadyToWrite(common::libev::IoClient* client) override; void PostLooped(common::libev::IoLoop* server) override; - virtual common::ErrnoError HandleRequestServiceCommand(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; - virtual common::ErrnoError HandleResponceServiceCommand(ProtocoledDaemonClient* dclient, - const fastotv::protocol::response_t* resp) WARN_UNUSED_RESULT; - virtual common::ErrnoError HandleRequestStreamsCommand(stream_client_t* pclient, const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; virtual common::ErrnoError HandleResponceStreamsCommand(stream_client_t* pclient, @@ -91,15 +85,18 @@ class ProcessSlaveWrapper : public common::libev::IoLoopObserver { void StopImpl(); ChildStream* FindChildByID(fastotv::stream_id_t cid) const; - void BroadcastClients(const fastotv::protocol::request_t& req); + void BroadcastClients(const common::json::WsDataJson& req); common::ErrnoError DaemonDataReceived(ProtocoledDaemonClient* dclient) WARN_UNUSED_RESULT; + common::ErrnoError ProcessReceived(ProtocoledDaemonClient* hclient, const char* request, size_t req_len); + common::ErrnoError StreamDataReceived(stream_client_t* pclient) WARN_UNUSED_RESULT; common::ErrnoError CreateChildStream(const serialized_stream_t& config_args) WARN_UNUSED_RESULT; common::ErrnoError CreateChildStreamImpl(const serialized_stream_t& config_args, const StreamInfo& sha) WARN_UNUSED_RESULT; common::ErrnoError StopChildStreamImpl(fastotv::stream_id_t sid, bool force) WARN_UNUSED_RESULT; + common::ErrnoError RestartChildStreamImpl(fastotv::stream_id_t sid) WARN_UNUSED_RESULT; // stream common::ErrnoError HandleRequestChangedSourcesStream(stream_client_t* pclient, @@ -113,35 +110,30 @@ class ProcessSlaveWrapper : public common::libev::IoLoopObserver { #endif common::ErrnoError HandleRequestClientStartStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientStopStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientRestartStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientGetLogStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientGetPipelineStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientGetConfigJsonStream(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; - common::ErrnoError HandleRequestClientActivate(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; - common::ErrnoError HandleRequestClientPingService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientGetLogService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; - common::ErrnoError HandleRequestClientRestartService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; common::ErrnoError HandleRequestClientStopService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::request_t* req) WARN_UNUSED_RESULT; - - common::ErrnoError HandleResponcePingService(ProtocoledDaemonClient* dclient, - const fastotv::protocol::response_t* resp) WARN_UNUSED_RESULT; + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; + common::ErrnoError HandleRequestClientRestartService(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; + common::ErrnoError HandleRequestClientGetStats(ProtocoledDaemonClient* dclient, + const common::http::HttpRequest& req) WARN_UNUSED_RESULT; void CheckLicenseExpired(common::libev::IoLoop* server); - std::string MakeServiceStats(common::Optional expiration_time) const; - + service::FullServiceInfo MakeServiceStats(common::time64_t expiration_time) const; + service::ServerInfo MakeServiceInfoStats() const; struct NodeStats; const Config config_; diff --git a/src/server/process_slave_wrapper_posix.cpp b/src/server/process_slave_wrapper_posix.cpp index d7d1a2c..e299295 100644 --- a/src/server/process_slave_wrapper_posix.cpp +++ b/src/server/process_slave_wrapper_posix.cpp @@ -18,17 +18,15 @@ #include #endif +#include +#include +#include #include #include #include -#include -#include - #include "base/stream_info.h" - #include "server/child_stream.h" -#include "server/daemon/server.h" #include "server/utils/utils.h" #define PIPE diff --git a/src/server/tcp/client.cpp b/src/server/tcp/client.cpp index 8154793..2b7c3aa 100644 --- a/src/server/tcp/client.cpp +++ b/src/server/tcp/client.cpp @@ -14,6 +14,8 @@ #include "server/tcp/client.h" +#include + #include namespace fastocloud { diff --git a/src/server/tcp/client.h b/src/server/tcp/client.h index 3f1a682..0c36537 100644 --- a/src/server/tcp/client.h +++ b/src/server/tcp/client.h @@ -14,10 +14,17 @@ #pragma once -#include - +#include #include +namespace common { +namespace libev { +namespace tcp { +class TcpClient; +} +} // namespace libev +} // namespace common + namespace fastocloud { namespace server { namespace tcp { diff --git a/src/server/utils/utils.h b/src/server/utils/utils.h index 28eff80..4761b4b 100644 --- a/src/server/utils/utils.h +++ b/src/server/utils/utils.h @@ -22,9 +22,10 @@ namespace server { #if defined(OS_POSIX) common::ErrnoError CreatePipe(common::net::socket_descr_t* read_client_fd, - common::net::socket_descr_t* write_client_fd); + common::net::socket_descr_t* write_client_fd) WARN_UNUSED_RESULT; #endif -common::ErrnoError CreateSocketPair(common::net::socket_descr_t* parent_sock, common::net::socket_descr_t* child_sock); +common::ErrnoError CreateSocketPair(common::net::socket_descr_t* parent_sock, + common::net::socket_descr_t* child_sock) WARN_UNUSED_RESULT; } // namespace server } // namespace fastocloud diff --git a/src/stream/dumpers/htmldump.cpp b/src/stream/dumpers/htmldump.cpp index 84eda58..c370296 100644 --- a/src/stream/dumpers/htmldump.cpp +++ b/src/stream/dumpers/htmldump.cpp @@ -42,29 +42,20 @@ bool HtmlDump::Dump(GstBin* pipeline, const common::file_system::ascii_file_stri return false; } -#if GST_CHECK_VERSION(1, 11, 1) char* dot_description = gst_debug_bin_to_dot_data(pipeline, GST_DEBUG_GRAPH_SHOW_ALL); if (!dot_description) { return false; } std::string pipeline_description(dot_description); - dumpfile << "" - << "" - << " " - << " " - << "" - << ""; + dumpfile << "" << "" << " " + << " " << "" << ""; g_free(dot_description); return true; #else return false; #endif -#else - return false; -#endif } } // namespace dumper diff --git a/src/stream/dumpers/htmldump.h b/src/stream/dumpers/htmldump.h index 01a9eea..0550906 100644 --- a/src/stream/dumpers/htmldump.h +++ b/src/stream/dumpers/htmldump.h @@ -22,7 +22,7 @@ namespace dumper { class HtmlDump : public IDumper { public: - bool Dump(GstBin* pipeline, const common::file_system::ascii_file_string_path& path) override; + bool Dump(GstBin* pipeline, const common::file_system::ascii_file_string_path& path) override WARN_UNUSED_RESULT; }; } // namespace dumper diff --git a/src/stream/dumpers/idumper.h b/src/stream/dumpers/idumper.h index 1e8ee41..07735ef 100644 --- a/src/stream/dumpers/idumper.h +++ b/src/stream/dumpers/idumper.h @@ -24,7 +24,7 @@ namespace dumper { class IDumper { public: - virtual bool Dump(GstBin* pipeline, const common::file_system::ascii_file_string_path& path) = 0; + virtual bool Dump(GstBin* pipeline, const common::file_system::ascii_file_string_path& path) WARN_UNUSED_RESULT = 0; virtual ~IDumper(); }; diff --git a/src/stream/elements/depay/audio.h b/src/stream/elements/depay/audio.h index 592f10c..4ecce41 100644 --- a/src/stream/elements/depay/audio.h +++ b/src/stream/elements/depay/audio.h @@ -14,14 +14,13 @@ #pragma once -#include - #include -#include "stream/stypes.h" +#include #include "stream/elements/depay/depay.h" // for ElementRtpPay #include "stream/elements/element.h" // for Element (ptr only), SupportedElements:... +#include "stream/stypes.h" namespace fastocloud { namespace stream { diff --git a/src/stream/stream_controller.cpp b/src/stream/stream_controller.cpp index ff58b45..68f2bfd 100644 --- a/src/stream/stream_controller.cpp +++ b/src/stream/stream_controller.cpp @@ -400,9 +400,9 @@ void StreamController::ChildStatusChanged(common::libev::IoChild* child, int sta common::ErrnoError StreamController::HandleRequestCommand(common::libev::IoClient* client, const fastotv::protocol::request_t* req) { - if (req->method == STOP_STREAM) { + if (req->method == REQUEST_STOP_STREAM) { return HandleRequestStopStream(client, req); - } else if (req->method == RESTART_STREAM) { + } else if (req->method == REQUEST_RESTART_STREAM) { return HandleRequestRestartStream(client, req); } diff --git a/src/stream_commands/commands.h b/src/stream_commands/commands.h index 1a1ffcf..caada94 100644 --- a/src/stream_commands/commands.h +++ b/src/stream_commands/commands.h @@ -14,8 +14,8 @@ #pragma once -#define STOP_STREAM "stop" -#define RESTART_STREAM "restart" +#define REQUEST_STOP_STREAM "stop" +#define REQUEST_RESTART_STREAM "restart" #define CHANGED_SOURCES_STREAM "changed_source_stream" #define STATISTIC_STREAM "statistic_stream" diff --git a/src/stream_commands/commands_factory.cpp b/src/stream_commands/commands_factory.cpp index 5bf24a5..59c7e76 100644 --- a/src/stream_commands/commands_factory.cpp +++ b/src/stream_commands/commands_factory.cpp @@ -45,7 +45,7 @@ common::Error RestartStreamRequest(fastotv::protocol::sequance_id_t id, fastotv: fastotv::protocol::request_t lreq; lreq.id = id; - lreq.method = RESTART_STREAM; + lreq.method = REQUEST_RESTART_STREAM; *req = lreq; return common::Error(); } @@ -57,7 +57,7 @@ common::Error StopStreamRequest(fastotv::protocol::sequance_id_t id, fastotv::pr fastotv::protocol::request_t lreq; lreq.id = id; - lreq.method = STOP_STREAM; + lreq.method = REQUEST_STOP_STREAM; *req = lreq; return common::Error(); } diff --git a/src/utils/m3u8_reader.cpp b/src/utils/m3u8_reader.cpp index 35f4a9b..885e48e 100644 --- a/src/utils/m3u8_reader.cpp +++ b/src/utils/m3u8_reader.cpp @@ -14,11 +14,11 @@ #include "utils/m3u8_reader.h" +#include + #include #include -#include - #define CHUNK_EXT ".ts" #define CHUNK_EXT_RE "\\" CHUNK_EXT diff --git a/src/utils/m3u8_reader.h b/src/utils/m3u8_reader.h index f3067fd..8b5e9fb 100644 --- a/src/utils/m3u8_reader.h +++ b/src/utils/m3u8_reader.h @@ -14,11 +14,11 @@ #pragma once +#include + #include #include -#include - #include "utils/chunk_info.h" namespace fastocloud {