diff --git a/app/service/service_manager.py b/app/service/service_manager.py index edcf58f..00579e6 100644 --- a/app/service/service_manager.py +++ b/app/service/service_manager.py @@ -1,7 +1,8 @@ from app.common.service.entry import ServiceSettings from app.service.service import Service -from app.common.subscriber.entry import Subscriber, Device -from pyfastocloud.subscriber_client import SubscriberClient, Commands +from app.service.subscriber_client import SubscriberConnection +from app.common.subscriber.entry import Subscriber +from pyfastocloud.subscriber_client import Commands from pyfastocloud.client import make_utc_timestamp from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus @@ -9,61 +10,6 @@ from gevent import socket from gevent import select -class SubscriberConnection(SubscriberClient): - def __init__(self, sock, addr, handler): - super(SubscriberConnection, self).__init__(sock, addr, handler) - self._info = None - self._current_stream_id = str() - self._device = None - self._last_ping_ts = make_utc_timestamp() / 1000 - self._request_id = 0 - - @property - def info(self) -> Subscriber: - return self._info - - @info.setter - def info(self, value): - self._info = value - - @property - def current_stream_id(self) -> str: - return self._current_stream_id - - @current_stream_id.setter - def current_stream_id(self, value): - self._current_stream_id = value - - @property - def device(self) -> Device: - return self._device - - @device.setter - def device(self, value): - self._device = value - - @property - def last_ping_ts(self) -> float: - return self._last_ping_ts - - @last_ping_ts.setter - def last_ping_ts(self, value): - self._last_ping_ts = value - - def recv_data(self) -> bool: - data = self.read_command() - if not data: - return False - - self.process_commands(data) - return True - - def gen_request_id(self) -> int: - current_value = self._request_id - self._request_id += 1 - return current_value - - class ServiceManager(IClientHandler): SUBSCRIBER_PORT = 6000 BANDWIDTH_PORT = 5000 @@ -129,7 +75,7 @@ class ServiceManager(IClientHandler): for client in self._subscribers: if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC: - client.ping(client.gen_request_id()) + client.ping() client.last_ping_ts = ts_sec def process_response(self, client, req: Request, resp: Response): @@ -151,7 +97,7 @@ class ServiceManager(IClientHandler): elif req.method == Commands.GET_CHANNELS: self._handle_get_channels(client, req.id, req.params) elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO: - self._handle_get_runtiime_channel_info(client, req.id, req.params) + self._handle_get_runtime_channel_info(client, req.id, req.params) else: pass @@ -217,12 +163,7 @@ class ServiceManager(IClientHandler): client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT)) def _handle_client_ping(self, client, cid: str, params: dict): - if not self._check_is_auth_client(client): - client.check_activate_fail(cid, 'User not active') - client.disconnect() - return - - client.pong(cid) + pass def _handle_get_channels(self, client, cid: str, params: dict): if not self._check_is_auth_client(client): @@ -230,10 +171,9 @@ class ServiceManager(IClientHandler): client.disconnect() return - channels = client.info.get_streams() - client.get_channels_success(cid, channels) + client.get_channels_success(cid) - def _handle_get_runtiime_channel_info(self, client, cid: str, params: dict): + def _handle_get_runtime_channel_info(self, client, cid: str, params: dict): if not self._check_is_auth_client(client): client.check_activate_fail(cid, 'User not active') client.disconnect() diff --git a/app/service/subscriber_client.py b/app/service/subscriber_client.py new file mode 100644 index 0000000..e35cd00 --- /dev/null +++ b/app/service/subscriber_client.py @@ -0,0 +1,90 @@ +from app.common.subscriber.entry import Subscriber, Device +from pyfastocloud.subscriber_client import SubscriberClient +from pyfastocloud.client import make_utc_timestamp + + +class SubscriberConnection(object): + def __init__(self, sock, addr, handler): + self._client = SubscriberClient(sock, addr, handler) + self._info = None + self._current_stream_id = str() + self._device = None + self._last_ping_ts = make_utc_timestamp() / 1000 + self._request_id = 0 + + @property + def info(self) -> Subscriber: + return self._info + + @info.setter + def info(self, value): + self._info = value + + @property + def current_stream_id(self) -> str: + return self._current_stream_id + + @current_stream_id.setter + def current_stream_id(self, value): + self._current_stream_id = value + + @property + def device(self) -> Device: + return self._device + + @device.setter + def device(self, value): + self._device = value + + @property + def last_ping_ts(self) -> float: + return self._last_ping_ts + + @last_ping_ts.setter + def last_ping_ts(self, value): + self._last_ping_ts = value + + def recv_data(self) -> bool: + data = self._client.read_command() + if not data: + return False + + self._client.process_commands(data) + return True + + def socket(self): + return self._client.socket() + + def ping(self): + return self._client.ping(self._gen_request_id()) + + def disconnect(self): + return self._client.disconnect() + + def is_active(self): + return self._client.is_active() + + def activate_fail(self, cid: str, error: str): + return self._client.activate_fail(cid, error) + + def activate_success(self, cid: str): + return self._client.activate_success(cid) + + def check_activate_fail(self, command_id: str, error: str): + return self._client.check_activate_fail(command_id, error) + + def get_runtime_channel_info_success(self, command_id, sid: str, watchers: int): + return self._client.get_runtime_channel_info_success(command_id, sid, watchers) + + def get_channels_success(self, cid: str): + channels = self.info.get_streams() + return self._client.get_channels_success(cid, channels) + + def get_server_info_success(self, cid: str, bandwidth_host: str): + return self._client.get_server_info_success(cid, bandwidth_host) + + # private + def _gen_request_id(self) -> int: + current_value = self._request_id + self._request_id += 1 + return current_value