From b33a40b7aafd824253a6264d42617efaf7a309ad Mon Sep 17 00:00:00 2001 From: topilski Date: Wed, 4 Sep 2019 23:48:01 -0400 Subject: [PATCH] Div services --- app/__init__.py | 6 +- app/config/config.py | 3 +- app/service/service_manager.py | 191 +----------------- app/service/subscribers_service_manager.py | 215 +++++++++++++++++++++ app/templates/provider/dashboard.html | 2 + 5 files changed, 225 insertions(+), 192 deletions(-) create mode 100644 app/service/subscribers_service_manager.py diff --git a/app/__init__.py b/app/__init__.py index 2aadea8..2abb296 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -10,6 +10,7 @@ from flask_socketio import SocketIO from werkzeug.contrib.fixers import ProxyFix from app.service.service_manager import ServiceManager +from app.service.subscribers_service_manager import SubscribersServiceManager def get_app_folder(): @@ -67,7 +68,10 @@ def init_project(static_folder, *args): host = sn_host or _host port = int(sn_port or _port) - servers_manager = ServiceManager(host, port, socketio) + support_subscribers = app.config.get('SUBSCRIBERS_SUPPORT') + servers_manager = SubscribersServiceManager(host, port, socketio) if support_subscribers else ServiceManager(host, + port, + socketio) return app, bootstrap, babel, db, mail, login_manager, servers_manager diff --git a/app/config/config.py b/app/config/config.py index aba1804..387f783 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -2,4 +2,5 @@ SECRET_KEY = '1d4bb560a7644fa48852a92ce52d6e08' SERVER_NAME_FOR_POST = '0.0.0.0:8080' PREFERRED_URL_SCHEME = 'http' -BOOTSTRAP_SERVE_LOCAL = True \ No newline at end of file +BOOTSTRAP_SERVE_LOCAL = True +SUBSCRIBERS_SUPPORT = False \ No newline at end of file diff --git a/app/service/service_manager.py b/app/service/service_manager.py index ef62562..2bba4cd 100644 --- a/app/service/service_manager.py +++ b/app/service/service_manager.py @@ -1,39 +1,17 @@ from app.common.service.entry import ServiceSettings from app.service.service import Service -from app.service.subscriber_client import SubscriberConnection -from app.common.subscriber.entry import Subscriber -from app.common.constants import PlayerMessage -from pyfastocloud.subscriber_client import Commands -from pyfastocloud.client import make_utc_timestamp -from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus from gevent import socket from gevent import select -def check_is_auth_client(client) -> bool: - if not client: - return False - - return client.is_active() - - -class ServiceManager(IClientHandler): - SUBSCRIBER_PORT = 6000 - BANDWIDTH_PORT = 5000 - PING_SUBSCRIBERS_SEC = 60 - +class ServiceManager(object): def __init__(self, host: str, port: int, socketio): self._host = host self._port = port self._socketio = socketio self._stop_listen = False self._servers_pool = [] - serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serversock.bind((host, ServiceManager.SUBSCRIBER_PORT)) - serversock.listen(10) - self._subscribers_server_socket = serversock - self._subscribers = [] def stop(self): self._stop_listen = True @@ -50,185 +28,18 @@ class ServiceManager(IClientHandler): def refresh(self): while not self._stop_listen: rsockets = [] - rsockets.append(self._subscribers_server_socket) - for client in self._subscribers: - rsockets.append(client.socket()) for server in self._servers_pool: if server.is_connected(): rsockets.append(server.socket()) readable, writeable, _ = select.select(rsockets, [], [], 1) - ts_sec = make_utc_timestamp() / 1000 for read in readable: - # income subscriber connection - if self._subscribers_server_socket == read: - csock, addr = read.accept() - subs = SubscriberConnection(csock, addr, self) - self.__add_subscriber(subs) - continue - - # subscriber read - for client in self._subscribers: - if client.socket() == read: - res = client.recv_data() - if not res: - self.__close_subscriber(client) - break for server in self._servers_pool: if server.socket() == read: server.recv_data() break - for client in self._subscribers: - if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC: - if client.is_active(): - client.ping(client.gen_request_id()) - client.last_ping_ts = ts_sec - - def process_response(self, client, req: Request, resp: Response): - if req.method == Commands.SERVER_PING_COMMAND: - self._handle_server_ping_command(client, resp) - elif req.method == Commands.SERVER_GET_CLIENT_INFO_COMMAND: - self._handle_server_get_client_info(client, resp) - - def process_request(self, client, req: Request): - if not req: - return - - result = False - if req.method == Commands.ACTIVATE_COMMAND: - result = self._handle_activate_subscriber(client, req.id, req.params) - elif req.method == Commands.GET_SERVER_INFO_COMMAND: - result = self._handle_get_server_info(client, req.id, req.params) - elif req.method == Commands.CLIENT_PING_COMMAND: - result = self._handle_client_ping(client, req.id, req.params) - elif req.method == Commands.GET_CHANNELS: - result = self._handle_get_channels(client, req.id, req.params) - elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO: - result = self._handle_get_runtime_channel_info(client, req.id, req.params) - else: - pass - - if not result: - self.__close_subscriber(client) - - def on_client_state_changed(self, client, status: ClientStatus): - pass - - # protected - - def _handle_server_ping_command(self, client, resp: Response): - pass - - def _handle_server_get_client_info(self, client, resp: Response): - pass - - def _handle_activate_subscriber(self, client, cid: str, params: dict) -> bool: - login = params[Subscriber.EMAIL_FIELD] - password_hash = params[Subscriber.PASSWORD_FIELD] - device_id = params['device_id'] - - check_user = Subscriber.objects(email=login, class_check=False).first() - if not check_user: - client.activate_fail(cid, 'User not found') - return False - - if check_user.status == Subscriber.Status.NOT_ACTIVE: - client.activate_fail(cid, 'User not active') - return False - - if check_user.status == Subscriber.Status.BANNED: - client.activate_fail(cid, 'Banned user') - return False - - if check_user[Subscriber.PASSWORD_FIELD] != password_hash: - client.activate_fail(cid, 'User invalid password') - return False - - found_device = check_user.find_device(device_id) - if not found_device: - client.activate_fail(cid, 'Device not found') - return False - - user_connections = self.get_user_connections_by_email(login) - for conn in user_connections: - if conn.device == found_device: - client.activate_fail(cid, 'Device in use') - return False - - client.activate_success(cid) - client.info = check_user - client.device = found_device - return True - - def _handle_get_server_info(self, client, cid: str, params: dict) -> bool: - if not check_is_auth_client(client): - client.check_activate_fail(cid, 'User not active') - return False - - client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT)) - return True - - def _handle_client_ping(self, client, cid: str, params: dict) -> bool: - if not check_is_auth_client(client): - client.check_activate_fail(cid, 'User not active') - return False - - client.pong(cid) - return True - - def _handle_get_channels(self, client, cid: str, params: dict) -> bool: - if not check_is_auth_client(client): - client.check_activate_fail(cid, 'User not active') - return False - - channels = client.info.get_streams() - client.get_channels_success(cid, channels) - return True - - def _handle_get_runtime_channel_info(self, client, cid: str, params: dict) -> bool: - if not check_is_auth_client(client): - client.check_activate_fail(cid, 'User not active') - return False - - sid = params['id'] - watchers = self.get_watchers_by_stream_id(sid) - client.current_stream_id = sid - client.get_runtime_channel_info_success(cid, sid, watchers) - return True - - def get_watchers_by_stream_id(self, sid: str): - total = 0 - for user in self._subscribers: - if user.current_stream_id == sid: - total += 1 - - return total - - def get_user_connections_by_email(self, email) -> list: - connections = [] - for user in self._subscribers: - if user.info and user.info.email == email: - connections.append(user) - - return connections - - def send_message(self, email: str, message: PlayerMessage): - for user in self._subscribers: - if user.info and user.info.email == email: - user.send_message(user.gen_request_id(), message.message, message.type, message.ttl * 1000) - # private - def __close_subscriber(self, subs: SubscriberConnection): - self.__remove_subscriber(subs) - subs.disconnect() - def __add_server(self, server: Service): self._servers_pool.append(server) - - def __add_subscriber(self, subs: SubscriberConnection): - self._subscribers.append(subs) - - def __remove_subscriber(self, subs: SubscriberConnection): - self._subscribers.remove(subs) diff --git a/app/service/subscribers_service_manager.py b/app/service/subscribers_service_manager.py new file mode 100644 index 0000000..5d03192 --- /dev/null +++ b/app/service/subscribers_service_manager.py @@ -0,0 +1,215 @@ +from app.service.service_manager import ServiceManager + +from app.service.subscriber_client import SubscriberConnection +from app.common.subscriber.entry import Subscriber +from app.common.constants import PlayerMessage +from pyfastocloud.subscriber_client import Commands +from pyfastocloud.client import make_utc_timestamp +from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus + +from gevent import socket +from gevent import select + + +def check_is_auth_client(client) -> bool: + if not client: + return False + + return client.is_active() + + +class SubscribersServiceManager(ServiceManager, IClientHandler): + SUBSCRIBER_PORT = 6000 + BANDWIDTH_PORT = 5000 + PING_SUBSCRIBERS_SEC = 60 + + def __init__(self, host: str, port: int, socketio): + super(SubscribersServiceManager, self).__init__(host, port, socketio) + serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serversock.bind((host, SubscribersServiceManager.SUBSCRIBER_PORT)) + serversock.listen(10) + self._subscribers_server_socket = serversock + self._subscribers = [] + + def refresh(self): + while not self._stop_listen: + rsockets = [] + rsockets.append(self._subscribers_server_socket) + for client in self._subscribers: + rsockets.append(client.socket()) + for server in self._servers_pool: + if server.is_connected(): + rsockets.append(server.socket()) + + readable, writeable, _ = select.select(rsockets, [], [], 1) + ts_sec = make_utc_timestamp() / 1000 + for read in readable: + # income subscriber connection + if self._subscribers_server_socket == read: + csock, addr = read.accept() + subs = SubscriberConnection(csock, addr, self) + self.__add_subscriber(subs) + continue + + # subscriber read + for client in self._subscribers: + if client.socket() == read: + res = client.recv_data() + if not res: + self.__close_subscriber(client) + break + + for server in self._servers_pool: + if server.socket() == read: + server.recv_data() + break + + for client in self._subscribers: + if ts_sec - client.last_ping_ts > SubscribersServiceManager.PING_SUBSCRIBERS_SEC: + if client.is_active(): + client.ping(client.gen_request_id()) + client.last_ping_ts = ts_sec + + def process_response(self, client, req: Request, resp: Response): + if req.method == Commands.SERVER_PING_COMMAND: + self._handle_server_ping_command(client, resp) + elif req.method == Commands.SERVER_GET_CLIENT_INFO_COMMAND: + self._handle_server_get_client_info(client, resp) + + def process_request(self, client, req: Request): + if not req: + return + + result = False + if req.method == Commands.ACTIVATE_COMMAND: + result = self._handle_activate_subscriber(client, req.id, req.params) + elif req.method == Commands.GET_SERVER_INFO_COMMAND: + result = self._handle_get_server_info(client, req.id, req.params) + elif req.method == Commands.CLIENT_PING_COMMAND: + result = self._handle_client_ping(client, req.id, req.params) + elif req.method == Commands.GET_CHANNELS: + result = self._handle_get_channels(client, req.id, req.params) + elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO: + result = self._handle_get_runtime_channel_info(client, req.id, req.params) + else: + pass + + if not result: + self.__close_subscriber(client) + + def on_client_state_changed(self, client, status: ClientStatus): + pass + + # protected + + def _handle_server_ping_command(self, client, resp: Response): + pass + + def _handle_server_get_client_info(self, client, resp: Response): + pass + + def _handle_activate_subscriber(self, client, cid: str, params: dict) -> bool: + login = params[Subscriber.EMAIL_FIELD] + password_hash = params[Subscriber.PASSWORD_FIELD] + device_id = params['device_id'] + + check_user = Subscriber.objects(email=login, class_check=False).first() + if not check_user: + client.activate_fail(cid, 'User not found') + return False + + if check_user.status == Subscriber.Status.NOT_ACTIVE: + client.activate_fail(cid, 'User not active') + return False + + if check_user.status == Subscriber.Status.BANNED: + client.activate_fail(cid, 'Banned user') + return False + + if check_user[Subscriber.PASSWORD_FIELD] != password_hash: + client.activate_fail(cid, 'User invalid password') + return False + + found_device = check_user.find_device(device_id) + if not found_device: + client.activate_fail(cid, 'Device not found') + return False + + user_connections = self.get_user_connections_by_email(login) + for conn in user_connections: + if conn.device == found_device: + client.activate_fail(cid, 'Device in use') + return False + + client.activate_success(cid) + client.info = check_user + client.device = found_device + return True + + def _handle_get_server_info(self, client, cid: str, params: dict) -> bool: + if not check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + return False + + client.get_server_info_success(cid, '{0}:{1}'.format(self._host, SubscribersServiceManager.BANDWIDTH_PORT)) + return True + + def _handle_client_ping(self, client, cid: str, params: dict) -> bool: + if not check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + return False + + client.pong(cid) + return True + + def _handle_get_channels(self, client, cid: str, params: dict) -> bool: + if not check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + return False + + channels = client.info.get_streams() + client.get_channels_success(cid, channels) + return True + + def _handle_get_runtime_channel_info(self, client, cid: str, params: dict) -> bool: + if not check_is_auth_client(client): + client.check_activate_fail(cid, 'User not active') + return False + + sid = params['id'] + watchers = self.get_watchers_by_stream_id(sid) + client.current_stream_id = sid + client.get_runtime_channel_info_success(cid, sid, watchers) + return True + + def get_watchers_by_stream_id(self, sid: str): + total = 0 + for user in self._subscribers: + if user.current_stream_id == sid: + total += 1 + + return total + + def get_user_connections_by_email(self, email) -> list: + connections = [] + for user in self._subscribers: + if user.info and user.info.email == email: + connections.append(user) + + return connections + + def send_message(self, email: str, message: PlayerMessage): + for user in self._subscribers: + if user.info and user.info.email == email: + user.send_message(user.gen_request_id(), message.message, message.type, message.ttl * 1000) + + # private + def __close_subscriber(self, subs: SubscriberConnection): + self.__remove_subscriber(subs) + subs.disconnect() + + def __add_subscriber(self, subs: SubscriberConnection): + self._subscribers.append(subs) + + def __remove_subscriber(self, subs: SubscriberConnection): + self._subscribers.remove(subs) diff --git a/app/templates/provider/dashboard.html b/app/templates/provider/dashboard.html index 9fc2ea2..187bf2d 100644 --- a/app/templates/provider/dashboard.html +++ b/app/templates/provider/dashboard.html @@ -167,12 +167,14 @@ Dashboard | {{ config['PUBLIC_CONFIG'].site.title }} {% trans %}Providers{% endtrans %} + {% if (config['SUBSCRIBERS_SUPPORT']) %}
{% trans %}Subscribers{% endtrans %}
+ {% endif %} {% if (role == 2) %}