diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9022540..39dc239 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,7 +1,7 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.16 FATAL_ERROR)
SET(BRANDING_PROJECT_NAME "fastocloud" CACHE STRING "Branding for ${BRANDING_PROJECT_NAME}") #default
-SET(BRANDING_PROJECT_VERSION "1.4.4.17" CACHE STRING "Branding version for ${BRANDING_PROJECT_NAME}") #default
+SET(BRANDING_PROJECT_VERSION "1.5.0.18" CACHE STRING "Branding version for ${BRANDING_PROJECT_NAME}") #default
SET(BRANDING_PROJECT_BUILD_TYPE_VERSION "release" CACHE STRING "Build version type for ${BRANDING_PROJECT_NAME}") #default alfa,beta,rc,release
SET(BRANDING_PROJECT_DOMAIN "www.fastogt.com" CACHE STRING "Branding domain url for ${BRANDING_PROJECT_NAME}") #default
SET(BRANDING_PROJECT_COMPANYNAME "FastoGT" CACHE STRING "Company name for ${BRANDING_PROJECT_NAME}") #default
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
index 287b059..428facf 100644
--- a/src/server/CMakeLists.txt
+++ b/src/server/CMakeLists.txt
@@ -109,7 +109,6 @@ SET(SERVER_DAEMON_HEADERS
${CMAKE_SOURCE_DIR}/src/server/daemon/client.h
${CMAKE_SOURCE_DIR}/src/server/daemon/server.h
${CMAKE_SOURCE_DIR}/src/server/daemon/commands.h
- ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_factory.h
${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/details/shots.h
${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/server_info.h
@@ -126,7 +125,6 @@ SET(SERVER_DAEMON_SOURCES
${CMAKE_SOURCE_DIR}/src/server/daemon/client.cpp
${CMAKE_SOURCE_DIR}/src/server/daemon/server.cpp
${CMAKE_SOURCE_DIR}/src/server/daemon/commands.cpp
- ${CMAKE_SOURCE_DIR}/src/server/daemon/commands_factory.cpp
${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/details/shots.cpp
${CMAKE_SOURCE_DIR}/src/server/daemon/commands_info/service/server_info.cpp
diff --git a/src/server/daemon/client.cpp b/src/server/daemon/client.cpp
index e3bb9f2..9dbe1a8 100644
--- a/src/server/daemon/client.cpp
+++ b/src/server/daemon/client.cpp
@@ -12,259 +12,326 @@
along with fastocloud. If not, see .
*/
-#include
-
#include "server/daemon/client.h"
-#include "server/daemon/commands_factory.h"
+#include
+#include
+#include
+
+#include
+
+#include
+#include
+
+namespace {
+const std::string kErrorInvalid = "{\"error\":{\"code\":-1, \"message\":\"NOTREACHED\"}}";
+const auto kDataOK = common::json::MakeSuccessDataJson();
+} // namespace
namespace fastocloud {
namespace server {
ProtocoledDaemonClient::ProtocoledDaemonClient(common::libev::IoLoop* server, const common::net::socket_info& info)
- : base_class(std::make_shared(), server, info) {}
+ : base_class(server, info), is_verified_(false), exp_time_(0) {}
-common::ErrnoError ProtocoledDaemonClient::StopMe() {
- const common::daemon::commands::StopInfo stop_req;
- fastotv::protocol::request_t req;
- common::Error err_ser = StopServiceRequest(NextRequestID(), stop_req, &req);
+bool ProtocoledDaemonClient::IsVerified() const {
+ return is_verified_;
+}
+
+common::time64_t ProtocoledDaemonClient::GetExpTime() const {
+ return exp_time_;
+}
+
+void ProtocoledDaemonClient::SetVerified(bool verified, common::time64_t exp_time) {
+ is_verified_ = verified;
+ exp_time_ = exp_time;
+}
+
+bool ProtocoledDaemonClient::IsExpired() const {
+ return exp_time_ < common::time::current_utc_mstime();
+}
+
+bool ProtocoledDaemonClient::HaveFullAccess() const {
+ return IsVerified() && !IsExpired();
+}
+
+common::http::http_protocol ProtocoledDaemonClient::GetProtocol() const {
+ return common::http::HP_1_0;
+}
+
+common::libev::http::HttpServerInfo ProtocoledDaemonClient::GetServerInfo() const {
+ return common::libev::http::HttpServerInfo();
+}
+
+common::ErrnoError ProtocoledDaemonClient::StopMe(const common::uri::GURL& url) {
+ const common::daemon::commands::StopInfo params;
+ std::string result;
+ common::Error err_ser = params.SerializeToString(&result);
if (err_ser) {
return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
}
- return WriteRequest(req);
+ return PostJson(url, result.c_str(), result.size(), false);
}
-common::ErrnoError ProtocoledDaemonClient::RestartMe() {
- const common::daemon::commands::RestartInfo stop_req;
- fastotv::protocol::request_t req;
- common::Error err_ser = RestartServiceRequest(NextRequestID(), stop_req, &req);
+common::ErrnoError ProtocoledDaemonClient::Broadcast(const common::json::WsDataJson& request) {
+ std::string result;
+ ignore_result(request.SerializeToString(&result));
+ return SendFrame(result.c_str(), result.size());
+}
+
+common::ErrnoError ProtocoledDaemonClient::RestartMe(const common::uri::GURL& url) {
+ const common::daemon::commands::RestartInfo params;
+ std::string result;
+ common::Error err_ser = params.SerializeToString(&result);
if (err_ser) {
return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
}
-
- return WriteRequest(req);
+ return PostJson(url, result.c_str(), result.size(), false);
}
-common::ErrnoError ProtocoledDaemonClient::StopFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = StopServiceResponseFail(id, error_str, &resp);
+common::ErrnoError ProtocoledDaemonClient::StopFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
+}
+
+common::ErrnoError ProtocoledDaemonClient::StopSuccess() {
+ return SendDataJson(common::http::HS_OK, kDataOK);
+}
+
+common::ErrnoError ProtocoledDaemonClient::RestartFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
+}
+
+common::ErrnoError ProtocoledDaemonClient::RestartSuccess() {
+ return SendDataJson(common::http::HS_OK, kDataOK);
+}
+
+common::ErrnoError ProtocoledDaemonClient::GetHardwareHashFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
+}
+
+common::ErrnoError ProtocoledDaemonClient::GetHardwareHashSuccess(
+ const common::daemon::commands::HardwareHashInfo& params) {
+ json_object* jrequest_init = nullptr;
+ common::Error err_ser = params.Serialize(&jrequest_init);
if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser);
}
- return WriteResponse(resp);
-}
-
-common::ErrnoError ProtocoledDaemonClient::StopSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = StopServiceResponseSuccess(id, &resp);
+ std::string result;
+ common::json::DataJson js = common::json::MakeSuccessDataJson(jrequest_init); // take ownerships
+ err_ser = js.SerializeToString(&result);
if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser);
}
- return WriteResponse(resp);
+ return SendDataJson(common::http::HS_OK, js);
}
-common::ErrnoError ProtocoledDaemonClient::Ping() {
- common::daemon::commands::ClientPingInfo server_ping_info;
- return Ping(server_ping_info);
+common::ErrnoError ProtocoledDaemonClient::GetStatsFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::Ping(const common::daemon::commands::ClientPingInfo& server_ping_info) {
- fastotv::protocol::request_t ping_request;
- common::Error err_ser = PingRequest(NextRequestID(), server_ping_info, &ping_request);
+common::ErrnoError ProtocoledDaemonClient::GetStatsSuccess(const service::FullServiceInfo& stats) {
+ json_object* jrequest_init = nullptr;
+ common::Error err_ser = stats.Serialize(&jrequest_init);
if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ return GetHardwareHashFail(common::http::HS_INTERNAL_ERROR, err_ser);
}
- return WriteRequest(ping_request);
-}
-
-common::ErrnoError ProtocoledDaemonClient::PongFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = PingServiceResponseFail(id, error_str, &resp);
+ std::string result;
+ common::json::DataJson js = common::json::MakeSuccessDataJson(jrequest_init); // take ownerships
+ err_ser = js.SerializeToString(&result);
if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ return GetStatsFail(common::http::HS_INTERNAL_ERROR, err_ser);
}
- return WriteResponse(resp);
+ return SendDataJson(common::http::HS_OK, js);
}
-common::ErrnoError ProtocoledDaemonClient::Pong(fastotv::protocol::sequance_id_t id) {
- common::daemon::commands::ServerPingInfo server_ping_info;
- return Pong(id, server_ping_info);
+common::ErrnoError ProtocoledDaemonClient::GetLogServiceFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::Pong(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ServerPingInfo& pong) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = PingServiceResponseSuccess(id, pong, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
- return WriteResponse(resp);
-}
-
-common::ErrnoError ProtocoledDaemonClient::ActivateFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = ActivateResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+common::ErrnoError ProtocoledDaemonClient::GetLogServiceSuccess(const std::string& path) {
+ const char* file_path_str_ptr = path.c_str();
+ struct stat sb;
+ if (stat(file_path_str_ptr, &sb) < 0) {
+ return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("File not found."));
}
- return WriteResponse(resp);
-}
-
-common::ErrnoError ProtocoledDaemonClient::ActivateSuccess(fastotv::protocol::sequance_id_t id,
- const std::string& result) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = ActivateResponseSuccess(id, result, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ if (S_ISDIR(sb.st_mode)) {
+ return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename."));
}
- return WriteResponse(resp);
-}
-
-common::ErrnoError ProtocoledDaemonClient::GetLogServiceFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetLogServiceResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ int file = open(file_path_str_ptr, O_RDONLY);
+ if (file == INVALID_DESCRIPTOR) { /* open the file for reading */
+ return GetLogServiceFail(common::http::HS_FORBIDDEN, common::make_error("File is protected."));
}
- return WriteResponse(resp);
-}
-
-common::ErrnoError ProtocoledDaemonClient::GetLogServiceSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetLogServiceResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ size_t cast = sb.st_size;
+ common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime);
+ if (err) {
+ ::close(file);
+ return GetLogServiceFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription()));
}
- return WriteResponse(resp);
+ err = SendFileByFd(file, sb.st_size);
+ ::close(file);
+ return err;
}
-common::ErrnoError ProtocoledDaemonClient::GetLogStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetLogStreamResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::GetLogStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::GetLogStreamSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetLogStreamResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+common::ErrnoError ProtocoledDaemonClient::GetLogStreamSuccess(const std::string& path) {
+ const char* file_path_str_ptr = path.c_str();
+ struct stat sb;
+ if (stat(file_path_str_ptr, &sb) < 0) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found."));
}
- return WriteResponse(resp);
+ if (S_ISDIR(sb.st_mode)) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename."));
+ }
+
+ int file = open(file_path_str_ptr, O_RDONLY);
+ if (file == INVALID_DESCRIPTOR) { /* open the file for reading */
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected."));
+ }
+
+ size_t cast = sb.st_size;
+ common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime);
+ if (err) {
+ ::close(file);
+ return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription()));
+ }
+
+ err = SendFileByFd(file, sb.st_size);
+ ::close(file);
+ return err;
}
-common::ErrnoError ProtocoledDaemonClient::GetPipeStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetPipeStreamResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::GetPipeStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::GetPipeStreamSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = GetPipeStreamResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+common::ErrnoError ProtocoledDaemonClient::GetPipeStreamSuccess(const std::string& path) {
+ const char* file_path_str_ptr = path.c_str();
+ struct stat sb;
+ if (stat(file_path_str_ptr, &sb) < 0) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found."));
}
- return WriteResponse(resp);
+ if (S_ISDIR(sb.st_mode)) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename."));
+ }
+
+ int file = open(file_path_str_ptr, O_RDONLY);
+ if (file == INVALID_DESCRIPTOR) { /* open the file for reading */
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected."));
+ }
+
+ size_t cast = sb.st_size;
+ common::ErrnoError err = SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime);
+ if (err) {
+ ::close(file);
+ return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription()));
+ }
+
+ err = SendFileByFd(file, sb.st_size);
+ ::close(file);
+ return err;
}
-common::ErrnoError ProtocoledDaemonClient::StartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = StartStreamResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::GetConfigStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::StartStreamSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = StartStreamResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+common::ErrnoError ProtocoledDaemonClient::GetConfigStreamSuccess(const std::string& path) {
+ const char* file_path_str_ptr = path.c_str();
+ struct stat sb;
+ if (stat(file_path_str_ptr, &sb) < 0) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File not found."));
}
- return WriteResponse(resp);
+ if (S_ISDIR(sb.st_mode)) {
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("Bad filename."));
+ }
+
+ int file = open(file_path_str_ptr, O_RDONLY);
+ if (file == INVALID_DESCRIPTOR) { /* open the file for reading */
+ return GetLogStreamFail(common::http::HS_FORBIDDEN, common::make_error("File is protected."));
+ }
+
+ size_t cast = sb.st_size;
+ common::ErrnoError err =
+ SendHeadersInternal(common::http::HS_OK, "text/plain", &cast, &sb.st_mtime); // "application/json"
+ if (err) {
+ ::close(file);
+ return GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error(err->GetDescription()));
+ }
+
+ err = SendFileByFd(file, sb.st_size);
+ ::close(file);
+ return err;
}
-common::ErrnoError ProtocoledDaemonClient::ReStartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = RestartStreamResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::StartStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::ReStartStreamSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = RestartStreamResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::StartStreamSuccess() {
+ return SendDataJson(common::http::HS_OK, kDataOK);
}
-common::ErrnoError ProtocoledDaemonClient::StopStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) {
- const std::string error_str = err->GetDescription();
- fastotv::protocol::response_t resp;
- common::Error err_ser = StopStreamResponseFail(id, error_str, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::ReStartStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
}
-common::ErrnoError ProtocoledDaemonClient::StopStreamSuccess(fastotv::protocol::sequance_id_t id) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = StopStreamResponseSuccess(id, &resp);
- if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
- }
-
- return WriteResponse(resp);
+common::ErrnoError ProtocoledDaemonClient::ReStartStreamSuccess() {
+ return SendDataJson(common::http::HS_OK, kDataOK);
}
-common::ErrnoError ProtocoledDaemonClient::UnknownMethodError(fastotv::protocol::sequance_id_t id,
- const std::string& method) {
- fastotv::protocol::response_t resp;
- common::Error err_ser = UnknownMethodResponse(id, method, &resp);
+common::ErrnoError ProtocoledDaemonClient::StopStreamFail(common::http::http_status code, common::Error err) {
+ return SendErrorJson(code, err);
+}
+
+common::ErrnoError ProtocoledDaemonClient::StopStreamSuccess() {
+ return SendDataJson(common::http::HS_OK, kDataOK);
+}
+
+common::ErrnoError ProtocoledDaemonClient::UnknownMethodError(common::http::http_method method,
+ const std::string& route) {
+ common::Error err =
+ common::make_error(common::MemSPrintf("not handled %s for route: %s", common::ConvertToString(method), route));
+ return SendErrorJson(common::http::HS_NOT_FOUND, err);
+}
+
+common::ErrnoError ProtocoledDaemonClient::SendHeadersInternal(common::http::http_status code,
+ const char* mime_type,
+ size_t* length,
+ time_t* mod) {
+ return SendHeaders(GetProtocol(), code, {}, mime_type, length, mod, false, GetServerInfo());
+}
+
+common::ErrnoError ProtocoledDaemonClient::SendErrorJson(common::http::http_status code, common::Error err) {
+ auto errj = common::json::ErrorJson(common::json::MakeErrorJsonMessage(code, err));
+ std::string result = kErrorInvalid;
+ ignore_result(errj.SerializeToString(&result));
+ return SendJson(GetProtocol(), code, {}, result.c_str(), result.size(), false, GetServerInfo());
+}
+
+common::ErrnoError ProtocoledDaemonClient::SendDataJson(common::http::http_status code,
+ const common::json::DataJson& data) {
+ std::string result;
+ common::Error err_ser = data.SerializeToString(&result);
if (err_ser) {
- return common::make_errno_error(err_ser->GetDescription(), EAGAIN);
+ return SendErrorJson(common::http::HS_INTERNAL_ERROR, err_ser);
}
- return WriteResponse(resp);
+ return SendJson(GetProtocol(), code, {}, result.c_str(), result.size(), false, GetServerInfo());
}
} // namespace server
diff --git a/src/server/daemon/client.h b/src/server/daemon/client.h
index 28c413f..fd40ac9 100644
--- a/src/server/daemon/client.h
+++ b/src/server/daemon/client.h
@@ -14,61 +14,86 @@
#pragma once
-#include
-
-#include
-#include
-#include
+#include
+#include
+#include
#include
-
#include
#include
+#include
+
+#include "server/daemon/commands_info/service/server_info.h"
+
namespace fastocloud {
namespace server {
-class ProtocoledDaemonClient : public fastotv::protocol::ProtocolClient {
+class ProtocoledDaemonClient : public common::libev::websocket::WebSocketServerClient {
public:
- typedef fastotv::protocol::ProtocolClient base_class;
+ typedef common::libev::websocket::WebSocketServerClient base_class;
ProtocoledDaemonClient(common::libev::IoLoop* server, const common::net::socket_info& info);
- common::ErrnoError StopMe() WARN_UNUSED_RESULT;
- common::ErrnoError RestartMe() WARN_UNUSED_RESULT;
+ bool IsVerified() const;
+ void SetVerified(bool verified, common::time64_t exp_time);
- common::ErrnoError StopFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError StopSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::time64_t GetExpTime() const;
+ bool IsExpired() const;
+ bool HaveFullAccess() const;
- common::ErrnoError Ping() WARN_UNUSED_RESULT;
- common::ErrnoError Ping(const common::daemon::commands::ClientPingInfo& server_ping_info) WARN_UNUSED_RESULT;
+ common::http::http_protocol GetProtocol() const;
+ common::libev::http::HttpServerInfo GetServerInfo() const;
- common::ErrnoError PongFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError Pong(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
- common::ErrnoError Pong(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ServerPingInfo& pong) WARN_UNUSED_RESULT;
+ common::ErrnoError StopMe(const common::uri::GURL& url) WARN_UNUSED_RESULT;
+ common::ErrnoError RestartMe(const common::uri::GURL& url) WARN_UNUSED_RESULT;
- common::ErrnoError ActivateFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError ActivateSuccess(fastotv::protocol::sequance_id_t id, const std::string& result) WARN_UNUSED_RESULT;
+ common::ErrnoError StopFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError StopSuccess() WARN_UNUSED_RESULT;
- common::ErrnoError GetLogServiceFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError GetLogServiceSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError RestartFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError RestartSuccess() WARN_UNUSED_RESULT;
- common::ErrnoError GetLogStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError GetLogStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError GetHardwareHashFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetHardwareHashSuccess(const common::daemon::commands::HardwareHashInfo& params)
+ WARN_UNUSED_RESULT;
- common::ErrnoError GetPipeStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError GetPipeStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError GetStatsFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetStatsSuccess(const service::FullServiceInfo& stats) WARN_UNUSED_RESULT;
- common::ErrnoError StartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError StartStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError GetLogServiceFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetLogServiceSuccess(const std::string& path) WARN_UNUSED_RESULT;
- common::ErrnoError ReStartStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError ReStartStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError GetLogStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetLogStreamSuccess(const std::string& path) WARN_UNUSED_RESULT;
- common::ErrnoError StopStreamFail(fastotv::protocol::sequance_id_t id, common::Error err) WARN_UNUSED_RESULT;
- common::ErrnoError StopStreamSuccess(fastotv::protocol::sequance_id_t id) WARN_UNUSED_RESULT;
+ common::ErrnoError GetPipeStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetPipeStreamSuccess(const std::string& path) WARN_UNUSED_RESULT;
- common::ErrnoError UnknownMethodError(fastotv::protocol::sequance_id_t id,
- const std::string& method) WARN_UNUSED_RESULT;
+ common::ErrnoError StartStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError StartStreamSuccess() WARN_UNUSED_RESULT;
+
+ common::ErrnoError ReStartStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError ReStartStreamSuccess() WARN_UNUSED_RESULT;
+
+ common::ErrnoError StopStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError StopStreamSuccess() WARN_UNUSED_RESULT;
+
+ common::ErrnoError GetConfigStreamFail(common::http::http_status code, common::Error err) WARN_UNUSED_RESULT;
+ common::ErrnoError GetConfigStreamSuccess(const std::string& path) WARN_UNUSED_RESULT;
+
+ common::ErrnoError UnknownMethodError(common::http::http_method method, const std::string& route) WARN_UNUSED_RESULT;
+
+ common::ErrnoError Broadcast(const common::json::WsDataJson& request) WARN_UNUSED_RESULT;
+
+ private:
+ common::ErrnoError SendDataJson(common::http::http_status code, const common::json::DataJson& data);
+ common::ErrnoError SendErrorJson(common::http::http_status code, common::Error err);
+ common::ErrnoError SendHeadersInternal(common::http::http_status code,
+ const char* mime_type,
+ size_t* length,
+ time_t* mod);
+
+ bool is_verified_;
+ common::time64_t exp_time_;
};
} // namespace server
diff --git a/src/server/daemon/commands.cpp b/src/server/daemon/commands.cpp
index 32f489d..5bb2627 100644
--- a/src/server/daemon/commands.cpp
+++ b/src/server/daemon/commands.cpp
@@ -14,36 +14,39 @@
#include "server/daemon/commands.h"
+// broadcast
+#define SERVICE_STATISTIC_SERVICE "statistic_service"
+
namespace fastocloud {
namespace server {
-common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, fastotv::protocol::request_t* req) {
+common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, common::json::WsDataJson* req) {
if (!req) {
return common::make_error_inval();
}
- std::string changed_json;
- common::Error err_ser = params.SerializeToString(&changed_json);
+ json_object* result_json = nullptr;
+ common::Error err_ser = params.Serialize(&result_json);
if (err_ser) {
return err_ser;
}
- *req = fastotv::protocol::request_t::MakeNotification(STREAM_CHANGED_SOURCES_STREAM, changed_json);
+ *req = common::json::WsDataJson(BROADCAST_CHANGED_SOURCES_STREAM, result_json);
return common::Error();
}
-common::Error StatisitcStreamBroadcast(const StatisticInfo& params, fastotv::protocol::request_t* req) {
+common::Error StatisitcStreamBroadcast(const StatisticInfo& params, common::json::WsDataJson* req) {
if (!req) {
return common::make_error_inval();
}
- std::string stat_json;
- common::Error err_ser = params.SerializeToString(&stat_json);
+ json_object* result_json = nullptr;
+ common::Error err_ser = params.Serialize(&result_json);
if (err_ser) {
return err_ser;
}
- *req = fastotv::protocol::request_t::MakeNotification(STREAM_STATISTIC_STREAM, stat_json);
+ *req = common::json::WsDataJson(BROADCAST_STATISTIC_STREAM, result_json);
return common::Error();
}
@@ -65,28 +68,33 @@ common::Error MlNotificationStreamBroadcast(const fastotv::commands_info::ml::No
}
#endif
-common::Error StatisitcServiceBroadcast(fastotv::protocol::serializet_params_t params,
- fastotv::protocol::request_t* req) {
+common::Error StatisitcServiceBroadcast(const service::ServerInfo& params, common::json::WsDataJson* req) {
if (!req) {
return common::make_error_inval();
}
- *req = fastotv::protocol::request_t::MakeNotification(STREAM_STATISTIC_SERVICE, params);
- return common::Error();
-}
-
-common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, fastotv::protocol::request_t* req) {
- if (!req) {
- return common::make_error_inval();
- }
-
- std::string quit_json;
- common::Error err_ser = params.SerializeToString(&quit_json);
+ json_object* stat_json = nullptr;
+ common::Error err_ser = params.Serialize(&stat_json);
if (err_ser) {
return err_ser;
}
- *req = fastotv::protocol::request_t::MakeNotification(STREAM_QUIT_STATUS_STREAM, quit_json);
+ *req = common::json::WsDataJson(SERVICE_STATISTIC_SERVICE, stat_json);
+ return common::Error();
+}
+
+common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, common::json::WsDataJson* req) {
+ if (!req) {
+ return common::make_error_inval();
+ }
+
+ json_object* quit_json = nullptr;
+ common::Error err_ser = params.Serialize(&quit_json);
+ if (err_ser) {
+ return err_ser;
+ }
+
+ *req = common::json::WsDataJson(STREAM_QUIT_STATUS_STREAM, quit_json);
return common::Error();
}
diff --git a/src/server/daemon/commands.h b/src/server/daemon/commands.h
index e53d9c4..2873127 100644
--- a/src/server/daemon/commands.h
+++ b/src/server/daemon/commands.h
@@ -14,15 +14,19 @@
#pragma once
+#include
+
#include
-#include
+#include
#if defined(MACHINE_LEARNING)
#include
#endif
+#include "server/daemon/commands_info/service/server_info.h"
#include "server/daemon/commands_info/stream/quit_status_info.h"
+
#include "stream_commands/commands_info/changed_sources_info.h"
#include "stream_commands/commands_info/statistic_info.h"
@@ -34,20 +38,17 @@
#define DAEMON_RESTART_STREAM "restart_stream"
#define DAEMON_GET_LOG_STREAM "get_log_stream"
#define DAEMON_GET_PIPELINE_STREAM "get_pipeline_stream"
+#define DAEMON_GET_CONFIG_JSON_STREAM "get_config_json_stream"
-#define DAEMON_ACTIVATE "activate_request" // {"key": "XXXXXXXXXXXXXXXXXX"}
#define DAEMON_STOP_SERVICE "stop_service" // {"delay": 0 }
#define DAEMON_RESTART_SERVICE "restart_service" // {"delay": 0 }
+#define DAEMON_STATS_SERVICE "stats"
-#define DAEMON_GET_CONFIG_JSON_STREAM "get_config_json_stream"
-#define DAEMON_PING_SERVICE "ping_service"
#define DAEMON_GET_LOG_SERVICE "get_log_service" // {"path":"http://localhost/service/id"}
-#define DAEMON_SERVER_PING "ping_client"
-
// Broadcast
-#define STREAM_CHANGED_SOURCES_STREAM "changed_source_stream"
-#define STREAM_STATISTIC_STREAM "statistic_stream"
+#define BROADCAST_CHANGED_SOURCES_STREAM "changed_source_stream"
+#define BROADCAST_STATISTIC_STREAM "statistic_stream"
#define STREAM_QUIT_STATUS_STREAM "quit_status_stream"
#if defined(MACHINE_LEARNING)
#define STREAM_ML_NOTIFICATION_STREAM "ml_notification_stream"
@@ -58,15 +59,17 @@ namespace fastocloud {
namespace server {
// Broadcast
-common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params, fastotv::protocol::request_t* req);
-common::Error StatisitcStreamBroadcast(const StatisticInfo& params, fastotv::protocol::request_t* req);
+common::Error ChangedSourcesStreamBroadcast(const ChangedSouresInfo& params,
+ common::json::WsDataJson* req) WARN_UNUSED_RESULT;
+common::Error StatisitcStreamBroadcast(const StatisticInfo& params, common::json::WsDataJson* req) WARN_UNUSED_RESULT;
#if defined(MACHINE_LEARNING)
common::Error MlNotificationStreamBroadcast(const fastotv::commands_info::ml::NotificationInfo& params,
fastotv::protocol::request_t* req);
#endif
-common::Error StatisitcServiceBroadcast(fastotv::protocol::serializet_params_t params,
- fastotv::protocol::request_t* req);
-common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, fastotv::protocol::request_t* req);
+common::Error StatisitcServiceBroadcast(const service::ServerInfo& params,
+ common::json::WsDataJson* req) WARN_UNUSED_RESULT;
+
+common::Error QuitStatusStreamBroadcast(const stream::QuitStatusInfo& params, common::json::WsDataJson* req) WARN_UNUSED_RESULT;
} // namespace server
} // namespace fastocloud
diff --git a/src/server/daemon/commands_factory.cpp b/src/server/daemon/commands_factory.cpp
deleted file mode 100644
index 02b1ea5..0000000
--- a/src/server/daemon/commands_factory.cpp
+++ /dev/null
@@ -1,311 +0,0 @@
-/* Copyright (C) 2014-2022 FastoGT. All right reserved.
- This file is part of fastocloud.
- fastocloud is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- fastocloud is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with fastocloud. If not, see .
-*/
-
-#include "server/daemon/commands_factory.h"
-
-#include
-
-#include "server/daemon/commands.h"
-
-namespace fastocloud {
-namespace server {
-
-common::Error StopServiceRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::StopInfo& params,
- fastotv::protocol::request_t* req) {
- if (!req) {
- return common::make_error_inval();
- }
-
- std::string req_str;
- common::Error err_ser = params.SerializeToString(&req_str);
- if (err_ser) {
- return err_ser;
- }
-
- fastotv::protocol::request_t lreq;
- lreq.id = id;
- lreq.method = DAEMON_STOP_SERVICE;
- lreq.params = req_str;
- *req = lreq;
- return common::Error();
-}
-
-common::Error RestartServiceRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::RestartInfo& params,
- fastotv::protocol::request_t* req) {
- if (!req) {
- return common::make_error_inval();
- }
-
- std::string req_str;
- common::Error err_ser = params.SerializeToString(&req_str);
- if (err_ser) {
- return err_ser;
- }
-
- fastotv::protocol::request_t lreq;
- lreq.id = id;
- lreq.method = DAEMON_RESTART_SERVICE;
- lreq.params = req_str;
- *req = lreq;
- return common::Error();
-}
-
-common::Error PingRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ClientPingInfo& params,
- fastotv::protocol::request_t* req) {
- if (!req) {
- return common::make_error_inval();
- }
-
- std::string req_str;
- common::Error err_ser = params.SerializeToString(&req_str);
- if (err_ser) {
- return err_ser;
- }
-
- fastotv::protocol::request_t lreq;
- lreq.id = id;
- lreq.method = DAEMON_SERVER_PING;
- lreq.params = req_str;
- *req = lreq;
- return common::Error();
-}
-
-common::Error StopServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error StopServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error GetLogServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error GetLogServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error ActivateResponseSuccess(fastotv::protocol::sequance_id_t id,
- const std::string& result,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeMessage(
- id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage(result));
- return common::Error();
-}
-
-common::Error ActivateResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error StartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error StartStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error StopStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error StopStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error RestartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error RestartStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error GetLogStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error GetLogStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
-
- return common::Error();
-}
-
-common::Error GetPipeStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeMessage(id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage());
- return common::Error();
-}
-
-common::Error GetPipeStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
-
- return common::Error();
-}
-
-common::Error PingServiceResponseSuccess(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ServerPingInfo& ping,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- std::string ping_server_json;
- common::Error err_ser = ping.SerializeToString(&ping_server_json);
- if (err_ser) {
- return err_ser;
- }
-
- *resp = fastotv::protocol::response_t::MakeMessage(
- id, common::protocols::json_rpc::JsonRPCMessage::MakeSuccessMessage(ping_server_json));
- return common::Error();
-}
-
-common::Error PingServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp = fastotv::protocol::response_t::MakeError(
- id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(error_text));
- return common::Error();
-}
-
-common::Error UnknownMethodResponse(fastotv::protocol::sequance_id_t id,
- const std::string& method,
- fastotv::protocol::response_t* resp) {
- if (!resp) {
- return common::make_error_inval();
- }
-
- *resp =
- fastotv::protocol::response_t::MakeError(id, common::protocols::json_rpc::JsonRPCError::MakeServerErrorFromText(
- common::MemSPrintf("Unknown method: %s", method)));
- return common::Error();
-}
-
-} // namespace server
-} // namespace fastocloud
diff --git a/src/server/daemon/commands_factory.h b/src/server/daemon/commands_factory.h
deleted file mode 100644
index dc3571b..0000000
--- a/src/server/daemon/commands_factory.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/* Copyright (C) 2014-2022 FastoGT. All right reserved.
- This file is part of fastocloud.
- fastocloud is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- fastocloud is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with fastocloud. If not, see .
-*/
-
-#pragma once
-
-#include
-
-#include
-#include
-#include
-#include
-
-#include
-
-namespace fastocloud {
-namespace server {
-
-// requests
-common::Error StopServiceRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::StopInfo& params,
- fastotv::protocol::request_t* req);
-common::Error RestartServiceRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::RestartInfo& params,
- fastotv::protocol::request_t* req);
-common::Error PingRequest(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ClientPingInfo& params,
- fastotv::protocol::request_t* req);
-
-// responses service
-common::Error StopServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error StopServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error GetLogServiceResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error GetLogServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error ActivateResponseSuccess(fastotv::protocol::sequance_id_t id,
- const std::string& result,
- fastotv::protocol::response_t* resp);
-common::Error ActivateResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error PingServiceResponseSuccess(fastotv::protocol::sequance_id_t id,
- const common::daemon::commands::ServerPingInfo& ping,
- fastotv::protocol::response_t* resp);
-common::Error PingServiceResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error UnknownMethodResponse(fastotv::protocol::sequance_id_t id,
- const std::string& method,
- fastotv::protocol::response_t* resp) WARN_UNUSED_RESULT;
-
-// responces streams
-common::Error StartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error StartStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error StopStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error StopStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error RestartStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error RestartStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error GetLogStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error GetLogStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-common::Error GetPipeStreamResponseSuccess(fastotv::protocol::sequance_id_t id, fastotv::protocol::response_t* resp);
-common::Error GetPipeStreamResponseFail(fastotv::protocol::sequance_id_t id,
- const std::string& error_text,
- fastotv::protocol::response_t* resp);
-
-} // namespace server
-} // namespace fastocloud
diff --git a/src/server/daemon/commands_info/stream/get_log_info.cpp b/src/server/daemon/commands_info/stream/get_log_info.cpp
index 17f68e6..a3dd64d 100644
--- a/src/server/daemon/commands_info/stream/get_log_info.cpp
+++ b/src/server/daemon/commands_info/stream/get_log_info.cpp
@@ -16,21 +16,16 @@
#include
-#define PATH_FIELD "path"
#define FEEDBACK_DIR_FIELD "feedback_directory"
namespace fastocloud {
namespace server {
namespace stream {
-GetLogInfo::GetLogInfo() : base_class(), feedback_dir_(), path_() {}
+GetLogInfo::GetLogInfo() : base_class(), feedback_dir_() {}
-GetLogInfo::GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir, const url_t& path)
- : base_class(stream_id), feedback_dir_(feedback_dir), path_(path) {}
-
-GetLogInfo::url_t GetLogInfo::GetLogPath() const {
- return path_;
-}
+GetLogInfo::GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir)
+ : base_class(stream_id), feedback_dir_(feedback_dir) {}
std::string GetLogInfo::GetFeedbackDir() const {
return feedback_dir_;
@@ -50,20 +45,11 @@ common::Error GetLogInfo::DoDeSerialize(json_object* serialized) {
}
inf.feedback_dir_ = feedback_dir;
- std::string path;
- err = GetStringField(serialized, PATH_FIELD, &path);
- if (err) {
- return err;
- }
- inf.path_ = url_t(path);
-
*this = inf;
return common::Error();
}
common::Error GetLogInfo::SerializeFields(json_object* out) const {
- const std::string path_str = path_.spec();
- ignore_result(SetStringField(out, PATH_FIELD, path_str));
ignore_result(SetStringField(out, FEEDBACK_DIR_FIELD, feedback_dir_));
return base_class::SerializeFields(out);
}
diff --git a/src/server/daemon/commands_info/stream/get_log_info.h b/src/server/daemon/commands_info/stream/get_log_info.h
index a0e7fd5..4e5d69e 100644
--- a/src/server/daemon/commands_info/stream/get_log_info.h
+++ b/src/server/daemon/commands_info/stream/get_log_info.h
@@ -14,10 +14,10 @@
#pragma once
-#include
-
#include
+#include
+
#include "server/daemon/commands_info/stream/stream_info.h"
namespace fastocloud {
@@ -27,12 +27,10 @@ namespace stream {
class GetLogInfo : public StreamInfo {
public:
typedef StreamInfo base_class;
- typedef common::uri::GURL url_t;
GetLogInfo();
- explicit GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir, const url_t& log_path);
+ explicit GetLogInfo(const fastotv::stream_id_t& stream_id, const std::string& feedback_dir);
- url_t GetLogPath() const;
std::string GetFeedbackDir() const;
protected:
@@ -41,7 +39,6 @@ class GetLogInfo : public StreamInfo {
private:
std::string feedback_dir_;
- url_t path_;
};
typedef GetLogInfo GetPipelineInfo;
diff --git a/src/server/daemon/commands_info/stream/restart_info.cpp b/src/server/daemon/commands_info/stream/restart_info.cpp
index 126969d..b36ba46 100644
--- a/src/server/daemon/commands_info/stream/restart_info.cpp
+++ b/src/server/daemon/commands_info/stream/restart_info.cpp
@@ -16,12 +16,6 @@
namespace fastocloud {
namespace server {
-namespace stream {
-
-RestartInfo::RestartInfo() : base_class() {}
-
-RestartInfo::RestartInfo(const fastotv::stream_id_t& stream_id) : base_class(stream_id) {}
-
-} // namespace stream
+namespace stream {} // namespace stream
} // namespace server
} // namespace fastocloud
diff --git a/src/server/daemon/commands_info/stream/restart_info.h b/src/server/daemon/commands_info/stream/restart_info.h
index bc74894..9476cd7 100644
--- a/src/server/daemon/commands_info/stream/restart_info.h
+++ b/src/server/daemon/commands_info/stream/restart_info.h
@@ -20,13 +20,7 @@ namespace fastocloud {
namespace server {
namespace stream {
-class RestartInfo : public StreamInfo {
- public:
- typedef StreamInfo base_class;
-
- RestartInfo();
- explicit RestartInfo(const fastotv::stream_id_t& stream_id);
-};
+typedef StreamInfo RestartInfo;
} // namespace stream
} // namespace server
diff --git a/src/server/daemon/commands_info/stream/stream_info.h b/src/server/daemon/commands_info/stream/stream_info.h
index 3b8d8f9..81e5c3d 100644
--- a/src/server/daemon/commands_info/stream/stream_info.h
+++ b/src/server/daemon/commands_info/stream/stream_info.h
@@ -14,12 +14,11 @@
#pragma once
-#include
-
#include
-
#include
+#include
+
namespace fastocloud {
namespace server {
namespace stream {
diff --git a/src/server/daemon/server.h b/src/server/daemon/server.h
index eb73ebd..7f96052 100644
--- a/src/server/daemon/server.h
+++ b/src/server/daemon/server.h
@@ -14,14 +14,14 @@
#pragma once
-#include
+#include
namespace fastocloud {
namespace server {
-class DaemonServer : public common::daemon::DaemonServer {
+class DaemonServer : public common::libev::tcp::TcpServer {
public:
- typedef common::daemon::DaemonServer base_class;
+ typedef common::libev::tcp::TcpServer base_class;
explicit DaemonServer(const common::net::HostAndPort& host, common::libev::IoLoopObserver* observer = nullptr);
private:
diff --git a/src/server/daemon_slave.cpp b/src/server/daemon_slave.cpp
index 15899f5..031e1e6 100644
--- a/src/server/daemon_slave.cpp
+++ b/src/server/daemon_slave.cpp
@@ -76,7 +76,7 @@ int main(int argc, char** argv, char** envp) {
return EXIT_FAILURE;
}
- err = fastocloud::server::ProcessSlaveWrapper::SendStopDaemonRequest(config);
+ err = fastocloud::server::ProcessSlaveWrapper::SendStopDaemonRequest(config.host);
sleep(fastocloud::server::ProcessSlaveWrapper::cleanup_seconds + 1);
if (err) {
std::cerr << "Stop command failed error: " << err->GetDescription() << std::endl;
@@ -92,7 +92,7 @@ int main(int argc, char** argv, char** envp) {
return EXIT_FAILURE;
}
- err = fastocloud::server::ProcessSlaveWrapper::SendRestartDaemonRequest(config);
+ err = fastocloud::server::ProcessSlaveWrapper::SendRestartDaemonRequest(config.host);
sleep(fastocloud::server::ProcessSlaveWrapper::cleanup_seconds + 1);
if (err) {
std::cerr << "Reload command failed error: " << err->GetDescription() << std::endl;
diff --git a/src/server/http/client.h b/src/server/http/client.h
index 4f6642b..8790ee6 100644
--- a/src/server/http/client.h
+++ b/src/server/http/client.h
@@ -21,18 +21,18 @@ typedef struct ssl_st SSL;
namespace fastocloud {
namespace server {
-class HttpClient : public common::libev::http::HttpClient {
+class HttpClient : public common::libev::http::HttpServerClient {
public:
- typedef common::libev::http::HttpClient base_class;
+ typedef common::libev::http::HttpServerClient base_class;
HttpClient(common::libev::IoLoop* server, const common::net::socket_info& info);
const char* ClassName() const override;
};
-class HttpsClient : public common::libev::http::HttpClient {
+class HttpsClient : public common::libev::http::HttpServerClient {
public:
- typedef common::libev::http::HttpClient base_class;
+ typedef common::libev::http::HttpServerClient base_class;
HttpsClient(common::libev::IoLoop* server, const common::net::socket_info& info, SSL* ssl);
diff --git a/src/server/http/handler.cpp b/src/server/http/handler.cpp
index 15c144a..2dc7ba2 100644
--- a/src/server/http/handler.cpp
+++ b/src/server/http/handler.cpp
@@ -112,8 +112,9 @@ void HttpHandler::ProcessReceived(HttpClient* hclient, const char* request, size
// keep alive
common::http::header_t connection_field;
- bool is_find_connection = hrequest.FindHeaderByKey("Connection", false, &connection_field);
- bool IsKeepAlive = is_find_connection ? common::EqualsASCII(connection_field.value, "Keep-Alive", false) : false;
+ // bool is_find_connection = hrequest.FindHeaderByKey("Connection", false, &connection_field);
+ bool IsKeepAlive =
+ false; // is_find_connection ? common::EqualsASCII(connection_field.value, "Keep-Alive", false) : false;
const common::http::http_protocol protocol = hrequest.GetProtocol();
const common::http::http_method method = hrequest.GetMethod();
if (method == common::http::http_method::HM_GET || method == common::http::http_method::HM_HEAD) {
@@ -181,7 +182,7 @@ void HttpHandler::ProcessReceived(HttpClient* hclient, const char* request, size
}
if (hrequest.GetMethod() == common::http::http_method::HM_GET) {
- common::ErrnoError err = hclient->SendFileByFd(protocol, file, sb.st_size);
+ common::ErrnoError err = hclient->SendFileByFd(protocol, file);
if (err) {
DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR);
} else {
diff --git a/src/server/pipe/client.h b/src/server/pipe/client.h
index a3d4abc..333d9c7 100644
--- a/src/server/pipe/client.h
+++ b/src/server/pipe/client.h
@@ -14,10 +14,15 @@
#pragma once
-#include
-
#include
+namespace common {
+namespace libev {
+class PipeReadClient;
+class PipeWriteClient;
+} // namespace libev
+} // namespace common
+
namespace fastocloud {
namespace server {
namespace pipe {
diff --git a/src/server/process_slave_wrapper.cpp b/src/server/process_slave_wrapper.cpp
index cad6958..9dc4d14 100644
--- a/src/server/process_slave_wrapper.cpp
+++ b/src/server/process_slave_wrapper.cpp
@@ -56,17 +56,13 @@
#undef SetPort
#endif
+#define API_KEY_PARAM "API-KEY"
+
+#define WS_UPDATES "updates"
+
namespace {
-common::Error PostHttpOrHttpsFile(const common::file_system::ascii_file_string_path& file_path,
- const common::uri::GURL& url) {
- struct timeval tv = {1, 0};
- const common::http::headers_t extra = {};
- if (url.SchemeIs("http")) {
- return common::net::PostHttpFile(file_path, url, extra, &tv);
- }
- return common::net::PostHttpsFile(file_path, url, extra, &tv);
-}
+const auto kClosedDaemon = common::make_errno_error("Connection closed", EAGAIN);
common::Optional MakeStreamConfigJsonPath(
const std::string& feedback_dir) {
@@ -90,22 +86,42 @@ namespace fastocloud {
namespace server {
namespace {
template
-common::ErrnoError ParseClientRequest(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req,
- common::serializer::JsonSerializer* result,
- bool access = true) {
- if (!req->params || !dclient || !req || !result) {
+common::ErrnoError ParsePostClientRequest(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req,
+ common::serializer::JsonSerializer* result,
+ bool access = true) {
+ if (!dclient || !result) {
return common::make_errno_error_inval();
}
if (access) {
+ common::http::header_t found_key_in_headers;
+ if (!req.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) {
+ return common::make_errno_error("Don't have permissions", EACCES);
+ }
+
+ const auto expire_key = common::license::make_license(found_key_in_headers.value);
+ if (!expire_key) {
+ return common::make_errno_error("Invalid access key", EACCES);
+ }
+
+ common::time64_t tm;
+ bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm);
+ if (!is_valid) {
+ common::Error err = common::make_error("Invalid expire key");
+ return common::make_errno_error(err->GetDescription(), EINVAL);
+ }
+
+ dclient->SetVerified(true, tm);
+
if (!dclient->HaveFullAccess()) {
return common::make_errno_error("Don't have permissions", EACCES);
}
}
- const char* params_ptr = req->params->c_str();
- json_object* jinfo = json_tokener_parse(params_ptr);
+ const auto params_ptr = req.GetBody();
+ const char* data = params_ptr.data();
+ json_object* jinfo = json_tokener_parse(data);
if (!jinfo) {
return common::make_errno_error("Can't parse request", EINVAL);
}
@@ -119,6 +135,65 @@ common::ErrnoError ParseClientRequest(ProtocoledDaemonClient* dclient,
return common::ErrnoError();
}
+common::ErrnoError ParseGetClientRequest(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req,
+ bool access = true) {
+ if (!dclient) {
+ return common::make_errno_error_inval();
+ }
+
+ if (access) {
+ common::http::header_t found_key_in_headers;
+ if (!req.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) {
+ return common::make_errno_error("Don't have permissions", EACCES);
+ }
+
+ const auto expire_key = common::license::make_license(found_key_in_headers.value);
+ if (!expire_key) {
+ return common::make_errno_error("Invalid access key", EACCES);
+ }
+
+ common::time64_t tm;
+ bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm);
+ if (!is_valid) {
+ common::Error err = common::make_error("Invalid expire key");
+ return common::make_errno_error(err->GetDescription(), EINVAL);
+ }
+
+ dclient->SetVerified(true, tm);
+
+ if (!dclient->HaveFullAccess()) {
+ return common::make_errno_error("Don't have permissions", EACCES);
+ }
+ }
+
+ return common::ErrnoError();
+}
+
+template
+common::ErrnoError ParseStreamRequest(ProcessSlaveWrapper::stream_client_t* pclient,
+ const fastotv::protocol::request_t* req,
+ common::serializer::JsonSerializer* result) {
+ if (!req->params || !pclient || !req || !result) {
+ return common::make_errno_error_inval();
+ }
+
+ const char* params_ptr = req->params->c_str();
+ json_object* jinfo = json_tokener_parse(params_ptr);
+ if (!jinfo) {
+ return common::make_errno_error_inval();
+ }
+
+ common::Error err_des = result->DeSerialize(jinfo);
+ json_object_put(jinfo);
+ if (err_des) {
+ const std::string err_str = err_des->GetDescription();
+ return common::make_errno_error(err_str, EAGAIN);
+ }
+
+ return common::ErrnoError();
+}
+
typedef HttpHandler VodsHandler;
typedef HttpServer VodsServer;
typedef VodsHandler CodsHandler;
@@ -251,19 +326,16 @@ ProcessSlaveWrapper::ProcessSlaveWrapper(const Config& config)
cods_server_->SetName("cods_server");
}
-common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const Config& config) {
- if (!config.IsValid()) {
- return common::make_errno_error_inval();
- }
-
+common::ErrnoError ProcessSlaveWrapper::SendStopDaemonRequest(const common::net::HostAndPort& host) {
common::net::socket_info client_info;
- common::ErrnoError err = common::net::connect(config.host, common::net::ST_SOCK_STREAM, nullptr, &client_info);
+ common::ErrnoError err = common::net::connect(host, common::net::ST_SOCK_STREAM, nullptr, &client_info);
if (err) {
return err;
}
std::unique_ptr connection(new ProtocoledDaemonClient(nullptr, client_info));
- err = connection->RestartMe();
+ auto url = common::uri::GURL("http://" + common::ConvertToString(host) + "/" + DAEMON_STOP_SERVICE);
+ err = connection->StopMe(url);
if (err) {
ignore_result(connection->Close());
return err;
@@ -273,19 +345,16 @@ common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const Config& c
return common::ErrnoError();
}
-common::ErrnoError ProcessSlaveWrapper::SendStopDaemonRequest(const Config& config) {
- if (!config.IsValid()) {
- return common::make_errno_error_inval();
- }
-
+common::ErrnoError ProcessSlaveWrapper::SendRestartDaemonRequest(const common::net::HostAndPort& host) {
common::net::socket_info client_info;
- common::ErrnoError err = common::net::connect(config.host, common::net::ST_SOCK_STREAM, nullptr, &client_info);
+ common::ErrnoError err = common::net::connect(host, common::net::ST_SOCK_STREAM, nullptr, &client_info);
if (err) {
return err;
}
std::unique_ptr connection(new ProtocoledDaemonClient(nullptr, client_info));
- err = connection->StopMe();
+ auto url = common::uri::GURL("http://" + common::ConvertToString(host) + "/" + DAEMON_RESTART_SERVICE);
+ err = connection->RestartMe(url);
if (err) {
ignore_result(connection->Close());
return err;
@@ -357,10 +426,6 @@ common::ErrnoError ProcessSlaveWrapper::Prepare() {
return common::ErrnoError();
}
-bool ProcessSlaveWrapper::IsStoped() const {
- return stoped_;
-}
-
int ProcessSlaveWrapper::Exec(int argc, char** argv) {
process_argc_ = argc;
process_argv_ = argv;
@@ -373,7 +438,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) {
// gpu statistic monitor
HttpServer* http_server = static_cast(http_server_);
- std::thread http_thread = std::thread([http_server] {
+ auto http_thread = std::thread([http_server] {
common::ErrnoError err = http_server->Bind(true);
if (err) {
ERROR_LOG() << "HttpServer bind error: " << err->GetDescription();
@@ -386,13 +451,13 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) {
return;
}
- INFO_LOG() << "HttpServer have started";
+ INFO_LOG() << "HttpServer have started address: " << common::ConvertToString(http_server->GetHost());
int res = http_server->Exec();
INFO_LOG() << "HttpServer have finished: " << res;
});
HttpServer* vods_server = static_cast(vods_server_);
- std::thread vods_thread = std::thread([vods_server] {
+ auto vods_thread = std::thread([vods_server] {
common::ErrnoError err = vods_server->Bind(true);
if (err) {
ERROR_LOG() << "VodsServer bind error: " << err->GetDescription();
@@ -405,13 +470,13 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) {
return;
}
- INFO_LOG() << "VodsServer have started";
+ INFO_LOG() << "VodsServer have started address: " << common::ConvertToString(vods_server->GetHost());
int res = vods_server->Exec();
INFO_LOG() << "VodsServer have finished: " << res;
});
HttpServer* cods_server = static_cast(cods_server_);
- std::thread cods_thread = std::thread([cods_server] {
+ auto cods_thread = std::thread([cods_server] {
common::ErrnoError err = cods_server->Bind(true);
if (err) {
ERROR_LOG() << "CodsServer bind error: " << err->GetDescription();
@@ -424,7 +489,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) {
return;
}
- INFO_LOG() << "CodsServer have started";
+ INFO_LOG() << "CodsServer have started address: " << common::ConvertToString(cods_server->GetHost());
int res = cods_server->Exec();
INFO_LOG() << "CodsServer have finished: " << res;
});
@@ -454,6 +519,7 @@ int ProcessSlaveWrapper::Exec(int argc, char** argv) {
node_stats_->prev_nshot = service::GetMachineNetShot();
node_stats_->timestamp = common::time::current_utc_mstime();
+ INFO_LOG() << "DaemonServer have started address: " << common::ConvertToString(server->GetHost());
res = server->Exec();
finished:
@@ -470,6 +536,10 @@ finished:
return res;
}
+bool ProcessSlaveWrapper::IsStoped() const {
+ return stoped_;
+}
+
void ProcessSlaveWrapper::PreLooped(common::libev::IoLoop* server) {
ping_client_timer_ = server->CreateTimer(ping_timeout_clients_seconds, true);
check_cods_vods_timer_ = server->CreateTimer(config_.cods_ttl / 2, true);
@@ -488,7 +558,13 @@ void ProcessSlaveWrapper::Moved(common::libev::IoLoop* server, common::libev::Io
}
void ProcessSlaveWrapper::Closed(common::libev::IoClient* client) {
- UNUSED(client);
+ ProtocoledDaemonClient* dclient = dynamic_cast(client);
+ if (dclient && dclient->IsVerified()) {
+ auto step = dclient->Step();
+ if (step != common::libev::websocket::ZERO) {
+ ignore_result(dclient->SendEOS());
+ }
+ }
}
void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::libev::timer_id_t id) {
@@ -508,24 +584,14 @@ void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::lib
}
continue;
}
-
- common::ErrnoError err = dclient->Ping();
- if (err) {
- DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR);
- ignore_result(dclient->Close());
- delete dclient;
- } else {
- INFO_LOG() << "Sent ping to client[" << client->GetFormatedName() << "], from server["
- << server->GetFormatedName() << "], " << online_clients.size() << " client(s) connected.";
- }
}
}
} else if (check_cods_vods_timer_ == id) {
fastotv::timestamp_t current_time = common::time::current_utc_mstime();
auto childs = server->GetChilds();
for (auto* child : childs) {
- auto channel = static_cast(child);
- if (channel->IsCOD()) {
+ auto channel = dynamic_cast(child);
+ if (channel && channel->IsCOD()) {
fastotv::timestamp_t cod_last_update = channel->GetLastUpdate();
fastotv::timestamp_t ts_diff = current_time - cod_last_update;
if (ts_diff > config_.cods_ttl * 1000) {
@@ -540,8 +606,8 @@ void ProcessSlaveWrapper::TimerEmited(common::libev::IoLoop* server, common::lib
RemoveOldFilesByTime(folder, max_life_time, "*" CHUNK_EXT, true);
}
} else if (node_stats_timer_ == id) {
- const std::string node_stats = MakeServiceStats(common::Optional());
- fastotv::protocol::request_t req;
+ const auto node_stats = MakeServiceInfoStats();
+ common::json::WsDataJson req;
common::Error err_ser = StatisitcServiceBroadcast(node_stats, &req);
if (err_ser) {
return;
@@ -577,7 +643,7 @@ void ProcessSlaveWrapper::ChildStatusChanged(common::libev::IoChild* child, int
delete channel;
stream::QuitStatusInfo ch_status_info(sid, status, signal);
- fastotv::protocol::request_t req;
+ common::json::WsDataJson req;
common::Error err_ser = QuitStatusStreamBroadcast(ch_status_info, &req);
if (err_ser) {
return;
@@ -615,12 +681,12 @@ ChildStream* ProcessSlaveWrapper::FindChildByID(fastotv::stream_id_t cid) const
return nullptr;
}
-void ProcessSlaveWrapper::BroadcastClients(const fastotv::protocol::request_t& req) {
+void ProcessSlaveWrapper::BroadcastClients(const common::json::WsDataJson& req) {
std::vector clients = loop_->GetClients();
for (size_t i = 0; i < clients.size(); ++i) {
ProtocoledDaemonClient* dclient = dynamic_cast(clients[i]);
if (dclient && dclient->IsVerified()) {
- common::ErrnoError err = dclient->WriteRequest(req);
+ common::ErrnoError err = dclient->Broadcast(req);
if (err) {
WARNING_LOG() << "BroadcastClients error: " << err->GetDescription();
}
@@ -628,47 +694,115 @@ void ProcessSlaveWrapper::BroadcastClients(const fastotv::protocol::request_t& r
}
}
-common::ErrnoError ProcessSlaveWrapper::DaemonDataReceived(ProtocoledDaemonClient* dclient) {
- CHECK(loop_->IsLoopThread());
- std::string input_command;
- common::ErrnoError err = dclient->ReadCommand(&input_command);
- if (err) {
- return err; // i don't want handle spam, comand must be foramated according protocol
- }
+common::ErrnoError ProcessSlaveWrapper::ProcessReceived(ProtocoledDaemonClient* hclient,
+ const char* request,
+ size_t req_len) {
+ static const common::libev::http::HttpServerInfo hinf(PROJECT_NAME_TITLE "/" PROJECT_VERSION, PROJECT_DOMAIN);
+ common::http::HttpRequest hrequest;
+ const auto result = common::http::parse_http_request(request, req_len, &hrequest);
+ DEBUG_LOG() << "Http request:\n" << request;
- fastotv::protocol::request_t* req = nullptr;
- fastotv::protocol::response_t* resp = nullptr;
- common::Error err_parse = common::protocols::json_rpc::ParseJsonRPC(input_command, &req, &resp);
- if (err_parse) {
- const std::string err_str = err_parse->GetDescription();
+ common::http::headers_t extra_headers = {{"Access-Control-Allow-Origin", "*"}};
+ if (result.second) {
+ const std::string err_str = result.second->GetDescription();
return common::make_errno_error(err_str, EAGAIN);
}
- if (req) {
- if (req->IsNotification()) {
- delete req;
- return common::make_errno_error("Don't handle notificastions for now.", EINVAL);
+ auto url = hrequest.GetURL();
+ auto route = url.path();
+ auto method = hrequest.GetMethod();
+ if (method == common::http::HM_POST) {
+ if (route == "/" DAEMON_STOP_SERVICE) { // +
+ return HandleRequestClientStopService(hclient, hrequest);
+ } else if (route == "/" DAEMON_RESTART_SERVICE) { // +
+ return HandleRequestClientRestartService(hclient, hrequest);
+ } else if (route == "/" DAEMON_START_STREAM) { // +
+ return HandleRequestClientStartStream(hclient, hrequest);
+ } else if (route == "/" DAEMON_RESTART_STREAM) { // +
+ return HandleRequestClientRestartStream(hclient, hrequest);
+ } else if (route == "/" DAEMON_STOP_STREAM) { // +
+ return HandleRequestClientStopStream(hclient, hrequest);
+ } else if (route == "/" DAEMON_GET_PIPELINE_STREAM) { // +
+ return HandleRequestClientGetPipelineStream(hclient, hrequest);
+ } else if (route == "/" DAEMON_GET_LOG_STREAM) { // +
+ return HandleRequestClientGetLogStream(hclient, hrequest);
+ } else if (route == "/" DAEMON_GET_CONFIG_JSON_STREAM) { // +
+ return HandleRequestClientGetConfigJsonStream(hclient, hrequest);
}
- DEBUG_LOG() << "Received daemon request: " << input_command;
- err = HandleRequestServiceCommand(dclient, req);
- if (err) {
- DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR);
+ } else if (method == common::http::http_method::HM_GET || method == common::http::http_method::HM_HEAD) {
+ if (method == common::http::http_method::HM_GET) {
+ if (route == "/" DAEMON_STATS_SERVICE) { // +
+ return HandleRequestClientGetStats(hclient, hrequest);
+ } else if (route == "/" DAEMON_GET_LOG_SERVICE) { // +
+ return HandleRequestClientGetLogService(hclient, hrequest);
+ } else if (route == "/" WS_UPDATES) {
+ common::http::header_t head;
+ if (!hrequest.FindHeaderByKey("Sec-WebSocket-Key", false, &head)) {
+ return common::make_errno_error("not found Sec-WebSocket-Key header", EAGAIN);
+ }
+
+ common::http::header_t found_key_in_headers;
+ if (!hrequest.FindHeaderByKey(API_KEY_PARAM, false, &found_key_in_headers)) {
+ auto query_str = url.query();
+ common::uri::Component key, value;
+ common::uri::Component query(0, query_str.length());
+ common::Optional found_key_in_params;
+ while (common::uri::ExtractQueryKeyValue(query_str.c_str(), &query, &key, &value)) {
+ std::string key_string(query_str.substr(key.begin, key.len));
+ std::string param_text(query_str.substr(value.begin, value.len));
+ if (common::EqualsASCII(key_string, API_KEY_PARAM, false)) {
+ found_key_in_params = param_text;
+ break;
+ }
+ }
+
+ if (!found_key_in_params) {
+ return common::make_errno_error("Don't have permissions", EACCES);
+ }
+ found_key_in_headers.value = *found_key_in_params;
+ }
+
+ const auto expire_key =
+ common::license::make_license(found_key_in_headers.value);
+ if (!expire_key) {
+ common::Error err = common::make_error("Invalid expire key");
+ return common::make_errno_error(err->GetDescription(), EINVAL);
+ }
+
+ common::time64_t tm;
+ bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm);
+ if (!is_valid) {
+ common::Error err = common::make_error("Failed to get expire key time");
+ return common::make_errno_error(err->GetDescription(), EINVAL);
+ }
+
+ hclient->SetVerified(true, tm);
+ if (!hclient->HaveFullAccess()) {
+ return common::make_errno_error("Don't have permissions", EACCES);
+ }
+
+ common::http::headers_t extra_headers = {{"Access-Control-Allow-Origin", "*"}};
+ return hclient->SendSwitchProtocolsResponse(head.value, extra_headers, hclient->GetServerInfo());
+ }
}
- delete req;
- return common::ErrnoError();
- } else if (resp) {
- DEBUG_LOG() << "Received daemon responce: " << input_command;
- err = HandleResponceServiceCommand(dclient, resp);
- if (err) {
- DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR);
+ }
+ return hclient->UnknownMethodError(method, route);
+}
+
+common::ErrnoError ProcessSlaveWrapper::DaemonDataReceived(ProtocoledDaemonClient* dclient) {
+ CHECK(loop_->IsLoopThread());
+ char buff[BUF_SIZE] = {0};
+ size_t nread = 0;
+ common::ErrnoError errn = dclient->SingleRead(buff, BUF_SIZE - 1, &nread);
+ if (errn || nread == 0) {
+ if (nread == 0) {
+ return kClosedDaemon;
}
- delete resp;
- return common::ErrnoError();
+ return errn;
}
- WARNING_LOG() << "Received unknown daemon message: " << input_command;
- return common::make_errno_error("Invalid command type.", EINVAL);
+ return ProcessReceived(dclient, buff, nread);
}
common::ErrnoError ProcessSlaveWrapper::StreamDataReceived(stream_client_t* pipe_client) {
@@ -711,11 +845,17 @@ common::ErrnoError ProcessSlaveWrapper::StreamDataReceived(stream_client_t* pipe
void ProcessSlaveWrapper::DataReceived(common::libev::IoClient* client) {
if (ProtocoledDaemonClient* dclient = dynamic_cast(client)) {
- common::ErrnoError err = DaemonDataReceived(dclient);
+ auto err = DaemonDataReceived(dclient);
if (err) {
- DEBUG_MSG_ERROR(err, common::logging::LOG_LEVEL_ERR);
+ ERROR_LOG() << "DataReceived daemon error: " << err->GetDescription();
ignore_result(dclient->Close());
delete dclient;
+ } else {
+ auto step = dclient->Step();
+ if (step == common::libev::websocket::ZERO) {
+ ignore_result(dclient->Close());
+ delete dclient;
+ }
}
} else if (stream_client_t* pipe_client = dynamic_cast(client)) {
common::ErrnoError err = StreamDataReceived(pipe_client);
@@ -776,108 +916,62 @@ void ProcessSlaveWrapper::PostLooped(common::libev::IoLoop* server) {
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestClientRestartService(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
- if (!dclient->IsVerified()) {
- const auto info = dclient->GetInfo();
- if (!common::net::IsLocalHost(info.host())) {
- common::Error err(common::MemSPrintf("Skipped stop request from: %s", info.host()));
- ignore_result(dclient->StopFail(req->id, err));
- return common::make_errno_error(err->GetDescription(), EINVAL);
- }
+ const auto info = dclient->GetInfo();
+ if (!common::net::IsLocalHost(info.host())) {
+ common::Error err(common::MemSPrintf("Skipped restart request from: %s", info.host()));
+ ignore_result(dclient->RestartFail(common::http::HS_FORBIDDEN, err));
+ return common::make_errno_error(err->GetDescription(), EINVAL);
+ };
+
+ common::daemon::commands::RestartInfo restart_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &restart_info, false);
+ if (errn) {
+ ignore_result(dclient->RestartFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
+ return errn;
}
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jstop = json_tokener_parse(params_ptr);
- if (!jstop) {
- return common::make_errno_error_inval();
- }
-
- common::daemon::commands::RestartInfo stop_info;
- common::Error err_des = stop_info.DeSerialize(jstop);
- json_object_put(jstop);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- if (quit_cleanup_timer_ != INVALID_TIMER_ID) {
- // in progress
- return dclient->StopFail(req->id, common::make_error("Restart service in progress..."));
- }
-
- StopImpl();
- return dclient->StopSuccess(req->id);
+ if (quit_cleanup_timer_ != INVALID_TIMER_ID) {
+ // in progress
+ common::Error err = common::make_error("Restart service in progress...");
+ ignore_result(dclient->RestartFail(common::http::HS_INTERNAL_ERROR, err));
+ return common::make_errno_error(err->GetDescription(), EAGAIN);
}
- return common::make_errno_error_inval();
+ StopImpl();
+ return dclient->RestartSuccess();
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopService(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
- if (!dclient->IsVerified()) {
- const auto info = dclient->GetInfo();
- if (!common::net::IsLocalHost(info.host())) {
- common::Error err(common::MemSPrintf("Skipped stop request from: %s", info.host()));
- ignore_result(dclient->StopFail(req->id, err));
- return common::make_errno_error(err->GetDescription(), EINVAL);
- }
+ const auto info = dclient->GetInfo();
+ if (!common::net::IsLocalHost(info.host())) {
+ common::Error err(common::MemSPrintf("Skipped restart request from: %s", info.host()));
+ ignore_result(dclient->StopFail(common::http::HS_FORBIDDEN, err));
+ return common::make_errno_error(err->GetDescription(), EINVAL);
}
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jstop = json_tokener_parse(params_ptr);
- if (!jstop) {
- return common::make_errno_error_inval();
- }
-
- common::daemon::commands::StopInfo stop_info;
- common::Error err_des = stop_info.DeSerialize(jstop);
- json_object_put(jstop);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- if (quit_cleanup_timer_ != INVALID_TIMER_ID) {
- // in progress
- return dclient->StopFail(req->id, common::make_error("Stop service in progress..."));
- }
-
- StopImpl();
- stoped_ = true;
- return dclient->StopSuccess(req->id);
+ common::daemon::commands::StopInfo stop_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &stop_info, false);
+ if (errn) {
+ ignore_result(dclient->StopFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
+ return errn;
}
- return common::make_errno_error_inval();
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleResponcePingService(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::response_t* resp) {
- UNUSED(dclient);
- CHECK(loop_->IsLoopThread());
-
- if (resp->IsMessage()) {
- const char* params_ptr = resp->message->result.c_str();
- json_object* jclient_ping = json_tokener_parse(params_ptr);
- if (!jclient_ping) {
- return common::make_errno_error_inval();
- }
-
- common::daemon::commands::ClientPingInfo client_ping_info;
- common::Error err_des = client_ping_info.DeSerialize(jclient_ping);
- json_object_put(jclient_ping);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
- return common::ErrnoError();
+ if (quit_cleanup_timer_ != INVALID_TIMER_ID) {
+ // in progress
+ common::Error err = common::make_error("Stop service in progress...");
+ ignore_result(dclient->StopFail(common::http::HS_INTERNAL_ERROR, err));
+ return common::make_errno_error(err->GetDescription(), EAGAIN);
}
- return common::ErrnoError();
+
+ StopImpl();
+ stoped_ = true;
+ return dclient->StopSuccess();
}
common::ErrnoError ProcessSlaveWrapper::CreateChildStream(const serialized_stream_t& config_args) {
@@ -925,68 +1019,44 @@ common::ErrnoError ProcessSlaveWrapper::StopChildStreamImpl(fastotv::stream_id_t
common::ErrnoError ProcessSlaveWrapper::HandleRequestChangedSourcesStream(stream_client_t* pclient,
const fastotv::protocol::request_t* req) {
- UNUSED(pclient);
CHECK(loop_->IsLoopThread());
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jrequest_changed_sources = json_tokener_parse(params_ptr);
- if (!jrequest_changed_sources) {
- return common::make_errno_error_inval();
- }
- ChangedSouresInfo ch_sources_info;
- common::Error err_des = ch_sources_info.DeSerialize(jrequest_changed_sources);
- json_object_put(jrequest_changed_sources);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- fastotv::protocol::request_t req;
- common::Error err_ser = ChangedSourcesStreamBroadcast(ch_sources_info, &req);
- if (err_ser) {
- const std::string err_str = err_ser->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- BroadcastClients(req);
- return common::ErrnoError();
+ ChangedSouresInfo ch_sources_info;
+ common::ErrnoError errn = ParseStreamRequest(pclient, req, &ch_sources_info);
+ if (errn) {
+ return errn;
}
- return common::make_errno_error_inval();
+ common::json::WsDataJson breq;
+ common::Error err_ser = ChangedSourcesStreamBroadcast(ch_sources_info, &breq);
+ if (err_ser) {
+ const std::string err_str = err_ser->GetDescription();
+ return common::make_errno_error(err_str, EAGAIN);
+ }
+
+ BroadcastClients(breq);
+ return common::ErrnoError();
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestStatisticStream(stream_client_t* pclient,
const fastotv::protocol::request_t* req) {
- UNUSED(pclient);
CHECK(loop_->IsLoopThread());
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jrequest_stat = json_tokener_parse(params_ptr);
- if (!jrequest_stat) {
- return common::make_errno_error_inval();
- }
- StatisticInfo stat;
- common::Error err_des = stat.DeSerialize(jrequest_stat);
- json_object_put(jrequest_stat);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- fastotv::protocol::request_t req;
- common::Error err_ser = StatisitcStreamBroadcast(stat, &req);
- if (err_ser) {
- const std::string err_str = err_ser->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- BroadcastClients(req);
- return common::ErrnoError();
+ StatisticInfo stat;
+ common::ErrnoError errn = ParseStreamRequest(pclient, req, &stat);
+ if (errn) {
+ return errn;
}
- return common::make_errno_error_inval();
+ common::json::WsDataJson breq;
+ common::Error err_ser = StatisitcStreamBroadcast(stat, &breq);
+ if (err_ser) {
+ const std::string err_str = err_ser->GetDescription();
+ return common::make_errno_error(err_str, EAGAIN);
+ }
+
+ BroadcastClients(breq);
+ return common::ErrnoError();
}
#if defined(MACHINE_LEARNING)
@@ -1024,369 +1094,170 @@ common::ErrnoError ProcessSlaveWrapper::HandleRequestMlNotificationStream(stream
}
#endif
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStartStream(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+common::ErrnoError ProcessSlaveWrapper::RestartChildStreamImpl(fastotv::stream_id_t sid) {
CHECK(loop_->IsLoopThread());
- if (!dclient->HaveFullAccess()) {
- return common::make_errno_error("Don't have permissions", EINTR);
+
+ auto stream = FindChildByID(sid);
+ if (!stream) {
+ return common::make_errno_error(common::MemSPrintf("Stream with id: %s does not exist, skip request.", sid),
+ EINVAL);
}
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jstart_info = json_tokener_parse(params_ptr);
- if (!jstart_info) {
- return common::make_errno_error_inval();
- }
-
- stream::StartInfo start_info;
- common::Error err_des = start_info.DeSerialize(jstart_info);
- json_object_put(jstart_info);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- common::ErrnoError errn = CreateChildStream(start_info.GetConfig());
- if (errn) {
- DEBUG_MSG_ERROR(errn, common::logging::LOG_LEVEL_WARNING);
- ignore_result(dclient->StartStreamFail(req->id, common::make_error_from_errno(errn)));
- return errn;
- }
-
- return dclient->StartStreamSuccess(req->id);
- }
-
- return common::make_errno_error_inval();
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopStream(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
- CHECK(loop_->IsLoopThread());
- if (!dclient->HaveFullAccess()) {
- return common::make_errno_error("Don't have permissions", EINTR);
- }
-
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jstop_info = json_tokener_parse(params_ptr);
- if (!jstop_info) {
- return common::make_errno_error_inval();
- }
-
- stream::StopInfo stop_info;
- common::Error err_des = stop_info.DeSerialize(jstop_info);
- json_object_put(jstop_info);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- common::ErrnoError errn = StopChildStreamImpl(stop_info.GetStreamID(), stop_info.GetForce());
- if (errn) {
- return dclient->StopFail(req->id, common::make_error_from_errno(errn));
- }
-
- return dclient->StopStreamSuccess(req->id);
- }
-
- return common::make_errno_error_inval();
+ return stream->Restart();
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestClientRestartStream(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
- CHECK(loop_->IsLoopThread());
- if (!dclient->HaveFullAccess()) {
- return common::make_errno_error("Don't have permissions", EINTR);
- }
-
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jrestart_info = json_tokener_parse(params_ptr);
- if (!jrestart_info) {
- return common::make_errno_error_inval();
- }
-
- stream::RestartInfo restart_info;
- common::Error err_des = restart_info.DeSerialize(jrestart_info);
- json_object_put(jrestart_info);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- Child* chan = FindChildByID(restart_info.GetStreamID());
- if (!chan) {
- return dclient->ReStartStreamFail(req->id, common::make_error("Stream not found"));
- }
-
- ignore_result(chan->Restart());
- return dclient->ReStartStreamSuccess(req->id);
- }
-
- return common::make_errno_error_inval();
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogStream(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
- stream::GetLogInfo log_info;
- common::ErrnoError errn = ParseClientRequest(dclient, req, &log_info);
+ stream::RestartInfo restart_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &restart_info);
if (errn) {
- ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->ReStartStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
return errn;
}
- const auto remote_log_path = log_info.GetLogPath();
- if (!remote_log_path.SchemeIsHTTPOrHTTPS()) {
- common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN);
- ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn)));
+ errn = RestartChildStreamImpl(restart_info.GetStreamID());
+ if (errn) {
+ ignore_result(dclient->ReStartStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
+ return errn;
+ }
+
+ return dclient->ReStartStreamSuccess();
+}
+
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogService(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
+ CHECK(loop_->IsLoopThread());
+
+ common::ErrnoError errn = ParseGetClientRequest(dclient, req);
+ if (errn) {
+ ignore_result(dclient->GetLogServiceFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
+ return errn;
+ }
+
+ return dclient->GetLogServiceSuccess(config_.log_path);
+}
+
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogStream(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
+ CHECK(loop_->IsLoopThread());
+
+ stream::GetLogInfo log_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &log_info);
+ if (errn) {
+ ignore_result(dclient->GetLogStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
return errn;
}
const auto stream_log_file = MakeStreamLogPath(log_info.GetFeedbackDir());
if (!stream_log_file) {
common::ErrnoError errn = common::make_errno_error("Can't generate log stream path", EAGAIN);
- ignore_result(dclient->GetLogStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->GetLogStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
return errn;
}
- common::Error err = PostHttpOrHttpsFile(*stream_log_file, remote_log_path);
- if (err) {
- ignore_result(dclient->GetLogStreamFail(req->id, err));
- const std::string err_str = err->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- ignore_result(dclient->GetLogStreamSuccess(req->id));
- return common::ErrnoError();
+ return dclient->GetLogStreamSuccess(stream_log_file->GetPath());
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetPipelineStream(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
stream::GetPipelineInfo pipeline_info;
- common::ErrnoError errn = ParseClientRequest(dclient, req, &pipeline_info);
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &pipeline_info);
if (errn) {
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
- return errn;
- }
-
- const auto remote_log_path = pipeline_info.GetLogPath();
- if (!remote_log_path.SchemeIsHTTPOrHTTPS()) {
- common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN);
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->GetPipeStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
return errn;
}
const auto pipe_file = MakeStreamPipelinePath(pipeline_info.GetFeedbackDir());
if (!pipe_file) {
common::ErrnoError errn = common::make_errno_error("Can't generate pipeline stream path", EAGAIN);
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->GetPipeStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
return errn;
}
- common::Error err = PostHttpOrHttpsFile(*pipe_file, remote_log_path);
- if (err) {
- ignore_result(dclient->GetPipeStreamFail(req->id, err));
- const std::string err_str = err->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- ignore_result(dclient->GetPipeStreamSuccess(req->id));
- return common::ErrnoError();
+ return dclient->GetPipeStreamSuccess(pipe_file->GetPath());
}
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetConfigJsonStream(
- ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetConfigJsonStream(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
stream::GetConfigJsonInfo config_json;
- common::ErrnoError errn = ParseClientRequest(dclient, req, &config_json);
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &config_json);
if (errn) {
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
- return errn;
- }
-
- const auto remote_log_path = config_json.GetLogPath();
- if (!remote_log_path.SchemeIsHTTPOrHTTPS()) {
- common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN);
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->GetConfigStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
return errn;
}
const auto pipe_file = MakeStreamConfigJsonPath(config_json.GetFeedbackDir());
if (!pipe_file) {
common::ErrnoError errn = common::make_errno_error("Can't generate config.json stream path", EAGAIN);
- ignore_result(dclient->GetPipeStreamFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->GetConfigStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
return errn;
}
- common::Error err = PostHttpOrHttpsFile(*pipe_file, remote_log_path);
- if (err) {
- ignore_result(dclient->GetPipeStreamFail(req->id, err));
- const std::string err_str = err->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- ignore_result(dclient->GetPipeStreamSuccess(req->id));
- return common::ErrnoError();
+ return dclient->GetConfigStreamSuccess(pipe_file->GetPath());
}
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientActivate(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
- CHECK(loop_->IsLoopThread());
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jactivate = json_tokener_parse(params_ptr);
- if (!jactivate) {
- return common::make_errno_error_inval();
- }
-
- common::daemon::commands::ActivateInfo activate_info;
- common::Error err_des = activate_info.DeSerialize(jactivate);
- json_object_put(jactivate);
- if (err_des) {
- ignore_result(dclient->ActivateFail(req->id, err_des));
- return common::make_errno_error(err_des->GetDescription(), EAGAIN);
- }
-
- const auto expire_key = activate_info.GetLicense();
- common::time64_t tm;
- bool is_valid = common::license::GetExpireTimeFromKey(PROJECT_NAME_LOWERCASE, *expire_key, &tm);
- if (!is_valid) {
- common::Error err = common::make_error("Invalid expire key");
- ignore_result(dclient->ActivateFail(req->id, err));
- return common::make_errno_error(err->GetDescription(), EINVAL);
- }
-
- const std::string node_stats = MakeServiceStats(tm);
- common::ErrnoError err_ser = dclient->ActivateSuccess(req->id, node_stats);
- if (err_ser) {
- return err_ser;
- }
-
- dclient->SetVerified(true, tm);
- return common::ErrnoError();
- }
-
- return common::make_errno_error_inval();
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientPingService(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
- CHECK(loop_->IsLoopThread());
- if (!dclient->IsVerified()) {
- return common::make_errno_error_inval();
- }
-
- if (req->params) {
- const char* params_ptr = req->params->c_str();
- json_object* jstop = json_tokener_parse(params_ptr);
- if (!jstop) {
- return common::make_errno_error_inval();
- }
-
- common::daemon::commands::ClientPingInfo client_ping_info;
- common::Error err_des = client_ping_info.DeSerialize(jstop);
- json_object_put(jstop);
- if (err_des) {
- const std::string err_str = err_des->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- return dclient->Pong(req->id);
- }
-
- return common::make_errno_error_inval();
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetLogService(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStopStream(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
- common::daemon::commands::GetLogInfo get_log_info;
- common::ErrnoError errn = ParseClientRequest(dclient, req, &get_log_info);
+ stream::StopInfo stop_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &stop_info);
if (errn) {
- ignore_result(dclient->PongFail(req->id, common::make_error_from_errno(errn)));
+ ignore_result(dclient->StopStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
return errn;
}
- const auto remote_log_path = get_log_info.GetLogPath();
- if (!remote_log_path.SchemeIsHTTPOrHTTPS()) {
- common::ErrnoError errn = common::make_errno_error("Not supported protocol", EAGAIN);
- ignore_result(dclient->GetLogServiceFail(req->id, common::make_error_from_errno(errn)));
+ errn = StopChildStreamImpl(stop_info.GetStreamID(), stop_info.GetForce());
+ if (errn) {
+ ignore_result(dclient->StopStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
return errn;
}
- common::Error err =
- PostHttpOrHttpsFile(common::file_system::ascii_file_string_path(config_.log_path), remote_log_path);
- if (err) {
- ignore_result(dclient->GetLogServiceFail(req->id, err));
- const std::string err_str = err->GetDescription();
- return common::make_errno_error(err_str, EAGAIN);
- }
-
- ignore_result(dclient->GetLogServiceSuccess(req->id));
- return common::ErrnoError();
+ return dclient->StopStreamSuccess();
}
-common::ErrnoError ProcessSlaveWrapper::HandleRequestServiceCommand(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::request_t* req) {
- if (req->method == DAEMON_START_STREAM) {
- return HandleRequestClientStartStream(dclient, req);
- } else if (req->method == DAEMON_STOP_STREAM) {
- return HandleRequestClientStopStream(dclient, req);
- } else if (req->method == DAEMON_RESTART_STREAM) {
- return HandleRequestClientRestartStream(dclient, req);
- } else if (req->method == DAEMON_GET_LOG_STREAM) {
- return HandleRequestClientGetLogStream(dclient, req);
- } else if (req->method == DAEMON_GET_PIPELINE_STREAM) {
- return HandleRequestClientGetPipelineStream(dclient, req);
- } else if (req->method == DAEMON_GET_CONFIG_JSON_STREAM) {
- return HandleRequestClientGetConfigJsonStream(dclient, req);
- } else if (req->method == DAEMON_RESTART_SERVICE) {
- return HandleRequestClientRestartService(dclient, req);
- } else if (req->method == DAEMON_STOP_SERVICE) {
- return HandleRequestClientStopService(dclient, req);
- } else if (req->method == DAEMON_ACTIVATE) {
- return HandleRequestClientActivate(dclient, req);
- } else if (req->method == DAEMON_PING_SERVICE) {
- return HandleRequestClientPingService(dclient, req);
- } else if (req->method == DAEMON_GET_LOG_SERVICE) {
- return HandleRequestClientGetLogService(dclient, req);
- }
-
- return dclient->UnknownMethodError(req->id, req->method);
-}
-
-common::ErrnoError ProcessSlaveWrapper::HandleResponceServiceCommand(ProtocoledDaemonClient* dclient,
- const fastotv::protocol::response_t* resp) {
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientStartStream(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
CHECK(loop_->IsLoopThread());
- if (!dclient->IsVerified()) {
- return common::make_errno_error_inval();
+
+ stream::StartInfo start_info;
+ common::ErrnoError errn = ParsePostClientRequest(dclient, req, &start_info);
+ if (errn) {
+ ignore_result(dclient->StartStreamFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
+ return errn;
}
- fastotv::protocol::request_t req;
- const auto sid = resp->id;
- if (dclient->PopRequestByID(sid, &req)) {
- if (req.method == DAEMON_SERVER_PING) {
- ignore_result(HandleResponcePingService(dclient, resp));
- } else {
- WARNING_LOG() << "HandleResponceServiceCommand not handled responce id: " << *sid << ", command: " << req.method;
- }
- } else {
- WARNING_LOG() << "HandleResponceServiceCommand not found responce id: " << *sid;
+ errn = CreateChildStream(start_info.GetConfig());
+ if (errn) {
+ ignore_result(dclient->StartStreamFail(common::http::HS_INTERNAL_ERROR, common::make_error_from_errno(errn)));
+ return errn;
}
- return common::ErrnoError();
+ return dclient->StartStreamSuccess();
+}
+
+common::ErrnoError ProcessSlaveWrapper::HandleRequestClientGetStats(ProtocoledDaemonClient* dclient,
+ const common::http::HttpRequest& req) {
+ CHECK(loop_->IsLoopThread());
+
+ common::ErrnoError errn = ParseGetClientRequest(dclient, req);
+ if (errn) {
+ ignore_result(dclient->GetStatsFail(common::http::HS_BAD_REQUEST, common::make_error_from_errno(errn)));
+ return errn;
+ }
+
+ return dclient->GetStatsSuccess(MakeServiceStats(dclient->GetExpTime()));
}
common::ErrnoError ProcessSlaveWrapper::HandleRequestStreamsCommand(stream_client_t* pclient,
const fastotv::protocol::request_t* req) {
- if (req->method == CHANGED_SOURCES_STREAM) {
+ if (req->method == BROADCAST_CHANGED_SOURCES_STREAM) {
return HandleRequestChangedSourcesStream(pclient, req);
- } else if (req->method == STATISTIC_STREAM) {
+ } else if (req->method == BROADCAST_STATISTIC_STREAM) {
return HandleRequestStatisticStream(pclient, req);
}
#if defined(MACHINE_LEARNING)
@@ -1402,12 +1273,15 @@ common::ErrnoError ProcessSlaveWrapper::HandleRequestStreamsCommand(stream_clien
common::ErrnoError ProcessSlaveWrapper::HandleResponceStreamsCommand(stream_client_t* pclient,
const fastotv::protocol::response_t* resp) {
fastotv::protocol::request_t req;
- if (pclient->PopRequestByID(resp->id, &req)) {
- if (req.method == STOP_STREAM) {
- } else if (req.method == RESTART_STREAM) {
+ const auto sid = resp->id;
+ if (pclient->PopRequestByID(sid, &req)) {
+ if (req.method == REQUEST_STOP_STREAM) {
+ } else if (req.method == REQUEST_RESTART_STREAM) {
} else {
- WARNING_LOG() << "HandleResponceStreamsCommand not handled command: " << req.method;
+ WARNING_LOG() << "HandleResponceStreamsCommand not handled responce id: " << *sid << ", command: " << req.method;
}
+ } else {
+ WARNING_LOG() << "HandleResponceStreamsCommand not found responce id: " << *sid;
}
return common::ErrnoError();
}
@@ -1427,7 +1301,20 @@ void ProcessSlaveWrapper::CheckLicenseExpired(common::libev::IoLoop* server) {
}
}
-std::string ProcessSlaveWrapper::MakeServiceStats(common::Optional expiration_time) const {
+service::FullServiceInfo ProcessSlaveWrapper::MakeServiceStats(common::time64_t expiration_time) const {
+ common::uri::Replacements replacements;
+ replacements.SetHost(config_.alias.data(), common::uri::Component(0, static_cast(config_.alias.length())));
+ auto stabled_hls_host = config_.hls_host.ReplaceComponents(replacements);
+ auto stabled_vods_host = config_.vods_host.ReplaceComponents(replacements);
+ auto stabled_cods_host = config_.cods_host.ReplaceComponents(replacements);
+ service::FullServiceInfo fstat(stabled_hls_host, stabled_vods_host, stabled_cods_host, expiration_time,
+ config_.hls_dir, config_.vods_dir, config_.cods_dir, config_.timeshifts_dir,
+ config_.feedback_dir, config_.proxy_dir, config_.data_dir, MakeServiceInfoStats());
+
+ return fstat;
+}
+
+service::ServerInfo ProcessSlaveWrapper::MakeServiceInfoStats() const {
service::CpuShot next = service::GetMachineCpuShot();
double cpu_load = service::GetCpuMachineLoad(node_stats_->prev, next);
node_stats_->prev = next;
@@ -1464,29 +1351,7 @@ std::string ProcessSlaveWrapper::MakeServiceStats(common::Optional replacements;
- replacements.SetHost(config_.alias.data(), common::uri::Component(0, static_cast(config_.alias.length())));
- auto stabled_hls_host = config_.hls_host.ReplaceComponents(replacements);
- auto stabled_vods_host = config_.vods_host.ReplaceComponents(replacements);
- auto stabled_cods_host = config_.cods_host.ReplaceComponents(replacements);
- service::FullServiceInfo fstat(stabled_hls_host, stabled_vods_host, stabled_cods_host, *expiration_time,
- config_.hls_dir, config_.vods_dir, config_.cods_dir, config_.timeshifts_dir,
- config_.feedback_dir, config_.proxy_dir, config_.data_dir, stat);
- common::Error err_ser = fstat.SerializeToString(&node_stats);
- if (err_ser) {
- const std::string err_str = err_ser->GetDescription();
- WARNING_LOG() << "Failed to generate node full statistic: " << err_str;
- }
- } else {
- common::Error err_ser = stat.SerializeToString(&node_stats);
- if (err_ser) {
- const std::string err_str = err_ser->GetDescription();
- WARNING_LOG() << "Failed to generate node statistic: " << err_str;
- }
- }
- return node_stats;
+ return stat;
}
} // namespace server
diff --git a/src/server/process_slave_wrapper.h b/src/server/process_slave_wrapper.h
index cc7f865..69bd70c 100644
--- a/src/server/process_slave_wrapper.h
+++ b/src/server/process_slave_wrapper.h
@@ -14,10 +14,8 @@
#pragma once
-#include