1
0
Fork 0
mirror of https://github.com/fastogt/fastocloud_admin.git synced 2025-03-09 23:38:52 +00:00

Subscriber client

This commit is contained in:
topilski 2019-09-03 10:53:34 -04:00
parent 020091e227
commit a840fdcfb4
2 changed files with 98 additions and 68 deletions

View file

@ -1,7 +1,8 @@
from app.common.service.entry import ServiceSettings from app.common.service.entry import ServiceSettings
from app.service.service import Service from app.service.service import Service
from app.common.subscriber.entry import Subscriber, Device from app.service.subscriber_client import SubscriberConnection
from pyfastocloud.subscriber_client import SubscriberClient, Commands from app.common.subscriber.entry import Subscriber
from pyfastocloud.subscriber_client import Commands
from pyfastocloud.client import make_utc_timestamp from pyfastocloud.client import make_utc_timestamp
from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus
@ -9,61 +10,6 @@ from gevent import socket
from gevent import select from gevent import select
class SubscriberConnection(SubscriberClient):
def __init__(self, sock, addr, handler):
super(SubscriberConnection, self).__init__(sock, addr, handler)
self._info = None
self._current_stream_id = str()
self._device = None
self._last_ping_ts = make_utc_timestamp() / 1000
self._request_id = 0
@property
def info(self) -> Subscriber:
return self._info
@info.setter
def info(self, value):
self._info = value
@property
def current_stream_id(self) -> str:
return self._current_stream_id
@current_stream_id.setter
def current_stream_id(self, value):
self._current_stream_id = value
@property
def device(self) -> Device:
return self._device
@device.setter
def device(self, value):
self._device = value
@property
def last_ping_ts(self) -> float:
return self._last_ping_ts
@last_ping_ts.setter
def last_ping_ts(self, value):
self._last_ping_ts = value
def recv_data(self) -> bool:
data = self.read_command()
if not data:
return False
self.process_commands(data)
return True
def gen_request_id(self) -> int:
current_value = self._request_id
self._request_id += 1
return current_value
class ServiceManager(IClientHandler): class ServiceManager(IClientHandler):
SUBSCRIBER_PORT = 6000 SUBSCRIBER_PORT = 6000
BANDWIDTH_PORT = 5000 BANDWIDTH_PORT = 5000
@ -129,7 +75,7 @@ class ServiceManager(IClientHandler):
for client in self._subscribers: for client in self._subscribers:
if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC: if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC:
client.ping(client.gen_request_id()) client.ping()
client.last_ping_ts = ts_sec client.last_ping_ts = ts_sec
def process_response(self, client, req: Request, resp: Response): def process_response(self, client, req: Request, resp: Response):
@ -151,7 +97,7 @@ class ServiceManager(IClientHandler):
elif req.method == Commands.GET_CHANNELS: elif req.method == Commands.GET_CHANNELS:
self._handle_get_channels(client, req.id, req.params) self._handle_get_channels(client, req.id, req.params)
elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO: elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO:
self._handle_get_runtiime_channel_info(client, req.id, req.params) self._handle_get_runtime_channel_info(client, req.id, req.params)
else: else:
pass pass
@ -217,12 +163,7 @@ class ServiceManager(IClientHandler):
client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT)) client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT))
def _handle_client_ping(self, client, cid: str, params: dict): def _handle_client_ping(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client): pass
client.check_activate_fail(cid, 'User not active')
client.disconnect()
return
client.pong(cid)
def _handle_get_channels(self, client, cid: str, params: dict): def _handle_get_channels(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client): if not self._check_is_auth_client(client):
@ -230,10 +171,9 @@ class ServiceManager(IClientHandler):
client.disconnect() client.disconnect()
return return
channels = client.info.get_streams() client.get_channels_success(cid)
client.get_channels_success(cid, channels)
def _handle_get_runtiime_channel_info(self, client, cid: str, params: dict): def _handle_get_runtime_channel_info(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client): if not self._check_is_auth_client(client):
client.check_activate_fail(cid, 'User not active') client.check_activate_fail(cid, 'User not active')
client.disconnect() client.disconnect()

View file

@ -0,0 +1,90 @@
from app.common.subscriber.entry import Subscriber, Device
from pyfastocloud.subscriber_client import SubscriberClient
from pyfastocloud.client import make_utc_timestamp
class SubscriberConnection(object):
def __init__(self, sock, addr, handler):
self._client = SubscriberClient(sock, addr, handler)
self._info = None
self._current_stream_id = str()
self._device = None
self._last_ping_ts = make_utc_timestamp() / 1000
self._request_id = 0
@property
def info(self) -> Subscriber:
return self._info
@info.setter
def info(self, value):
self._info = value
@property
def current_stream_id(self) -> str:
return self._current_stream_id
@current_stream_id.setter
def current_stream_id(self, value):
self._current_stream_id = value
@property
def device(self) -> Device:
return self._device
@device.setter
def device(self, value):
self._device = value
@property
def last_ping_ts(self) -> float:
return self._last_ping_ts
@last_ping_ts.setter
def last_ping_ts(self, value):
self._last_ping_ts = value
def recv_data(self) -> bool:
data = self._client.read_command()
if not data:
return False
self._client.process_commands(data)
return True
def socket(self):
return self._client.socket()
def ping(self):
return self._client.ping(self._gen_request_id())
def disconnect(self):
return self._client.disconnect()
def is_active(self):
return self._client.is_active()
def activate_fail(self, cid: str, error: str):
return self._client.activate_fail(cid, error)
def activate_success(self, cid: str):
return self._client.activate_success(cid)
def check_activate_fail(self, command_id: str, error: str):
return self._client.check_activate_fail(command_id, error)
def get_runtime_channel_info_success(self, command_id, sid: str, watchers: int):
return self._client.get_runtime_channel_info_success(command_id, sid, watchers)
def get_channels_success(self, cid: str):
channels = self.info.get_streams()
return self._client.get_channels_success(cid, channels)
def get_server_info_success(self, cid: str, bandwidth_host: str):
return self._client.get_server_info_success(cid, bandwidth_host)
# private
def _gen_request_id(self) -> int:
current_value = self._request_id
self._request_id += 1
return current_value