diff --git a/app/common b/app/common index 551514f..485e7e7 160000 --- a/app/common +++ b/app/common @@ -1 +1 @@ -Subproject commit 551514f61e364783439981435f7a3fc9bf52d7c0 +Subproject commit 485e7e717c64601470fb03fadfdcf2e1f6ebdfd2 diff --git a/app/config/config.py b/app/config/config.py index 387f783..dc1b979 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -3,4 +3,4 @@ SERVER_NAME_FOR_POST = '0.0.0.0:8080' PREFERRED_URL_SCHEME = 'http' BOOTSTRAP_SERVE_LOCAL = True -SUBSCRIBERS_SUPPORT = False \ No newline at end of file +SUBSCRIBERS_SUPPORT = True \ No newline at end of file diff --git a/app/service/service_client.py b/app/service/service_client.py index f5a0cf8..1f783e6 100644 --- a/app/service/service_client.py +++ b/app/service/service_client.py @@ -1,9 +1,9 @@ from bson.objectid import ObjectId -from pyfastocloud.fastocloud_client import FastoCloudClient, Fields +from pyfastocloud.fastocloud_client import FastoCloudClient, Fields, Commands from pyfastocloud.client_handler import IClientHandler from pyfastocloud.json_rpc import Request, Response -from pyfastocloud.client_constants import Commands, ClientStatus +from pyfastocloud.client_constants import ClientStatus from app.service.stream_handler import IStreamHandler import app.common.constants as constants @@ -58,9 +58,13 @@ class ServiceClient(IClientHandler): def socket(self): return self._client.socket() - def recv_data(self): + def recv_data(self) -> bool: data = self._client.read_command() + if not data: + return False + self._client.process_commands(data) + return True def status(self) -> ClientStatus: return self._client.status() @@ -72,7 +76,7 @@ class ServiceClient(IClientHandler): return self._client.activate(self._gen_request_id(), license_key) def ping_service(self): - return self._client.ping_service(self._gen_request_id()) + return self._client.ping(self._gen_request_id()) def stop_service(self, delay: int): return self._client.stop_service(self._gen_request_id(), delay) @@ -107,12 +111,7 @@ class ServiceClient(IClientHandler): stream.set_server_settings(settings) streams.append(stream.config()) - subscribers = [] - for subs in settings.subscribers: - conf = subs.to_service(settings) - subscribers.append(conf) - - return self._client.sync_service(self._gen_request_id(), streams, subscribers) + return self._client.sync_service(self._gen_request_id(), streams) def prepare_service(self, settings): if not settings: @@ -152,7 +151,7 @@ class ServiceClient(IClientHandler): return self._version # handler - def process_response(self, req: Request, resp: Response): + def process_response(self, client, req: Request, resp: Response): if not req: return @@ -175,7 +174,7 @@ class ServiceClient(IClientHandler): self._vods_in = directory[Fields.VODS_IN_DIRECTORY]['content'] break - def process_request(self, req: Request): + def process_request(self, client, req: Request): if not req: return @@ -193,7 +192,7 @@ class ServiceClient(IClientHandler): elif req.method == Commands.CLIENT_PING_COMMAND: self._handler.on_ping_received(req.params) - def on_client_state_changed(self, status: ClientStatus): + def on_client_state_changed(self, client, status: ClientStatus): if status != ClientStatus.ACTIVE: self._set_runtime_fields() if self._handler: diff --git a/app/service/service_manager.py b/app/service/service_manager.py index 1b20ee0..ce71ebc 100644 --- a/app/service/service_manager.py +++ b/app/service/service_manager.py @@ -1,14 +1,78 @@ from app.common.service.entry import ServiceSettings from app.service.service import Service +from app.common.subscriber.entry import Subscriber, Device +from pyfastocloud.subscriber_client import SubscriberClient, Commands, make_utc_timestamp +from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus + +from gevent import socket +from gevent import select -class ServiceManager(object): +class SubscriberConnection(SubscriberClient): + def __init__(self, sock, addr, handler): + super(SubscriberConnection, self).__init__(sock, addr, handler) + self._info = None + self._current_stream_id = str() + self._device = None + self._last_ping_ts = make_utc_timestamp() / 1000 + + @property + def info(self) -> Subscriber: + return self._info + + @info.setter + def info(self, value): + self._info = value + + @property + def current_stream_id(self) -> str: + return self._current_stream_id + + @current_stream_id.setter + def current_stream_id(self, value): + self._current_stream_id = value + + @property + def device(self) -> Device: + return self._device + + @device.setter + def device(self, value): + self._device = value + + @property + def last_ping_ts(self) -> float: + return self._last_ping_ts + + @last_ping_ts.setter + def last_ping_ts(self, value): + self._last_ping_ts = value + + def recv_data(self) -> bool: + data = self.read_command() + if not data: + return False + + self.process_commands(data) + return True + + +class ServiceManager(IClientHandler): + SUBSCRIBER_PORT = 6000 + BANDWIDTH_PORT = 5000 + PING_SUBSCRIBERS_SEC = 60 + def __init__(self, host: str, port: int, socketio): self._host = host self._port = port self._socketio = socketio self._stop_listen = False self._servers_pool = [] + serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serversock.bind((host, ServiceManager.SUBSCRIBER_PORT)) + serversock.listen(10) + self._subscribers_server_socket = serversock + self._subscribers = [] def stop(self): self._stop_listen = True @@ -23,20 +87,183 @@ class ServiceManager(object): return server def refresh(self): - from gevent import select while not self._stop_listen: - sockets = [] + rsockets = [] + rsockets.append(self._subscribers_server_socket) + for client in self._subscribers: + rsockets.append(client.socket()) for server in self._servers_pool: if server.is_connected(): - sockets.append(server.socket()) + rsockets.append(server.socket()) - readable, _, _ = select.select(sockets, [], [], 1) + readable, writeable, _ = select.select(rsockets, [], [], 1) + ts_sec = make_utc_timestamp() / 1000 for read in readable: + # income subscriber connection + if self._subscribers_server_socket == read: + csock, addr = read.accept() + subs = SubscriberConnection(csock, addr, self) + self.__add_subscriber(subs) + continue + + # subscriber read + for client in self._subscribers: + if client.socket() == read: + res = client.recv_data() + if not res: + self.__remove_subscriber(client) + client.disconnect() + break + for server in self._servers_pool: if server.socket() == read: server.recv_data() break + for client in self._subscribers: + if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC: + client.ping() + client.last_ping_ts = ts_sec + + def process_response(self, client, req: Request, resp: Response): + if req.method == Commands.SERVER_PING_COMMAND: + self._handle_server_ping_command(client, resp) + elif req.method == Commands.SERVER_GET_CLIENT_INFO_COMMAND: + self._handle_server_get_client_info(client, resp) + + def process_request(self, client, req: Request): + if not req: + return + + if req.method == Commands.ACTIVATE_COMMAND: + self._handle_activate_subscriber(client, req.id, req.params) + elif req.method == Commands.GET_SERVER_INFO_COMMAND: + self._handle_get_server_info(client, req.id, req.params) + elif req.method == Commands.CLIENT_PING_COMMAND: + self._handle_client_ping(client, req.id, req.params) + elif req.method == Commands.GET_CHANNELS: + self._handle_get_channels(client, req.id, req.params) + elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO: + self._handle_get_runtiime_channel_info(client, req.id, req.params) + else: + pass + + def on_client_state_changed(self, client, status: ClientStatus): + pass + + # protected + def _check_is_auth_client(self, client) -> bool: + if not client: + return False + + return client.is_active() + + def _handle_server_ping_command(self, client, resp: Response): + pass + + def _handle_server_get_client_info(self, client, resp: Response): + pass + + def _handle_activate_subscriber(self, client, cid: str, params: dict): + login = params[Subscriber.EMAIL_FIELD] + password_hash = params[Subscriber.PASSWORD_FIELD] + device_id = params['device_id'] + + check_user = Subscriber.objects(email=login, class_check=False).first() + if not check_user: + client.activate_fail(cid, 'User not found') + return + + if check_user.status == Subscriber.Status.NOT_ACTIVE: + client.activate_fail(cid, 'User not active') + return + + if check_user.status == Subscriber.Status.BANNED: + client.activate_fail(cid, 'Banned user') + return + + if check_user[Subscriber.PASSWORD_FIELD] != password_hash: + client.activate_fail(cid, 'User invalid password') + return + + found_device = None + for device in check_user.devices: + if device.id == device_id: + found_device = device + break + + if not found_device: + client.activate_fail(cid, 'Device not found') + return + + user_connections = self.get_user_connections_by_email(login) + for conn in user_connections: + if conn.device == found_device: + client.activate_fail(cid, 'Device in use') + return + + client.activate_success(cid) + client.info = check_user + client.device = found_device + + def _handle_get_server_info(self, client, cid: str, params: dict): + if not self._check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + client.disconnect() + return + + client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT)) + + def _handle_client_ping(self, client, cid: str, params: dict): + if not self._check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + client.disconnect() + return + + client.pong() + + def _handle_get_channels(self, client, cid: str, params: dict): + if not self._check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + client.disconnect() + return + + channels = client.info.get_streams() + client.get_channels_success(cid, channels) + + def _handle_get_runtiime_channel_info(self, client, cid: str, params: dict): + if not self._check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + client.disconnect() + return + + sid = params['id'] + watchers = self.get_watchers_by_stream_id(sid) + client.current_stream_id = sid + client.get_runtime_channel_info_success(cid, sid, watchers) + # private def __add_server(self, server: Service): self._servers_pool.append(server) + + def __add_subscriber(self, subs: SubscriberConnection): + self._subscribers.append(subs) + + def __remove_subscriber(self, subs: SubscriberConnection): + self._subscribers.remove(subs) + + def get_watchers_by_stream_id(self, sid: str): + total = 0 + for user in self._subscribers: + if user.current_stream_id == sid: + total += 1 + + return total + + def get_user_connections_by_email(self, email) -> list: + connections = [] + for user in self._subscribers: + if user.info.email == email: + connections.append(user) + + return connections