1
0
Fork 0
mirror of https://github.com/fastogt/fastocloud_admin.git synced 2025-03-09 23:38:52 +00:00

Balancer integration

This commit is contained in:
topilski 2019-09-03 08:24:43 -04:00
parent 756a60cbb2
commit 62302dc9e6
4 changed files with 246 additions and 20 deletions

@ -1 +1 @@
Subproject commit 551514f61e364783439981435f7a3fc9bf52d7c0 Subproject commit 485e7e717c64601470fb03fadfdcf2e1f6ebdfd2

View file

@ -3,4 +3,4 @@ SERVER_NAME_FOR_POST = '0.0.0.0:8080'
PREFERRED_URL_SCHEME = 'http' PREFERRED_URL_SCHEME = 'http'
BOOTSTRAP_SERVE_LOCAL = True BOOTSTRAP_SERVE_LOCAL = True
SUBSCRIBERS_SUPPORT = False SUBSCRIBERS_SUPPORT = True

View file

@ -1,9 +1,9 @@
from bson.objectid import ObjectId from bson.objectid import ObjectId
from pyfastocloud.fastocloud_client import FastoCloudClient, Fields from pyfastocloud.fastocloud_client import FastoCloudClient, Fields, Commands
from pyfastocloud.client_handler import IClientHandler from pyfastocloud.client_handler import IClientHandler
from pyfastocloud.json_rpc import Request, Response from pyfastocloud.json_rpc import Request, Response
from pyfastocloud.client_constants import Commands, ClientStatus from pyfastocloud.client_constants import ClientStatus
from app.service.stream_handler import IStreamHandler from app.service.stream_handler import IStreamHandler
import app.common.constants as constants import app.common.constants as constants
@ -58,9 +58,13 @@ class ServiceClient(IClientHandler):
def socket(self): def socket(self):
return self._client.socket() return self._client.socket()
def recv_data(self): def recv_data(self) -> bool:
data = self._client.read_command() data = self._client.read_command()
if not data:
return False
self._client.process_commands(data) self._client.process_commands(data)
return True
def status(self) -> ClientStatus: def status(self) -> ClientStatus:
return self._client.status() return self._client.status()
@ -72,7 +76,7 @@ class ServiceClient(IClientHandler):
return self._client.activate(self._gen_request_id(), license_key) return self._client.activate(self._gen_request_id(), license_key)
def ping_service(self): def ping_service(self):
return self._client.ping_service(self._gen_request_id()) return self._client.ping(self._gen_request_id())
def stop_service(self, delay: int): def stop_service(self, delay: int):
return self._client.stop_service(self._gen_request_id(), delay) return self._client.stop_service(self._gen_request_id(), delay)
@ -107,12 +111,7 @@ class ServiceClient(IClientHandler):
stream.set_server_settings(settings) stream.set_server_settings(settings)
streams.append(stream.config()) streams.append(stream.config())
subscribers = [] return self._client.sync_service(self._gen_request_id(), streams)
for subs in settings.subscribers:
conf = subs.to_service(settings)
subscribers.append(conf)
return self._client.sync_service(self._gen_request_id(), streams, subscribers)
def prepare_service(self, settings): def prepare_service(self, settings):
if not settings: if not settings:
@ -152,7 +151,7 @@ class ServiceClient(IClientHandler):
return self._version return self._version
# handler # handler
def process_response(self, req: Request, resp: Response): def process_response(self, client, req: Request, resp: Response):
if not req: if not req:
return return
@ -175,7 +174,7 @@ class ServiceClient(IClientHandler):
self._vods_in = directory[Fields.VODS_IN_DIRECTORY]['content'] self._vods_in = directory[Fields.VODS_IN_DIRECTORY]['content']
break break
def process_request(self, req: Request): def process_request(self, client, req: Request):
if not req: if not req:
return return
@ -193,7 +192,7 @@ class ServiceClient(IClientHandler):
elif req.method == Commands.CLIENT_PING_COMMAND: elif req.method == Commands.CLIENT_PING_COMMAND:
self._handler.on_ping_received(req.params) self._handler.on_ping_received(req.params)
def on_client_state_changed(self, status: ClientStatus): def on_client_state_changed(self, client, status: ClientStatus):
if status != ClientStatus.ACTIVE: if status != ClientStatus.ACTIVE:
self._set_runtime_fields() self._set_runtime_fields()
if self._handler: if self._handler:

View file

@ -1,14 +1,78 @@
from app.common.service.entry import ServiceSettings from app.common.service.entry import ServiceSettings
from app.service.service import Service from app.service.service import Service
from app.common.subscriber.entry import Subscriber, Device
from pyfastocloud.subscriber_client import SubscriberClient, Commands, make_utc_timestamp
from pyfastocloud.client_handler import IClientHandler, Request, Response, ClientStatus
from gevent import socket
from gevent import select
class ServiceManager(object): class SubscriberConnection(SubscriberClient):
def __init__(self, sock, addr, handler):
super(SubscriberConnection, self).__init__(sock, addr, handler)
self._info = None
self._current_stream_id = str()
self._device = None
self._last_ping_ts = make_utc_timestamp() / 1000
@property
def info(self) -> Subscriber:
return self._info
@info.setter
def info(self, value):
self._info = value
@property
def current_stream_id(self) -> str:
return self._current_stream_id
@current_stream_id.setter
def current_stream_id(self, value):
self._current_stream_id = value
@property
def device(self) -> Device:
return self._device
@device.setter
def device(self, value):
self._device = value
@property
def last_ping_ts(self) -> float:
return self._last_ping_ts
@last_ping_ts.setter
def last_ping_ts(self, value):
self._last_ping_ts = value
def recv_data(self) -> bool:
data = self.read_command()
if not data:
return False
self.process_commands(data)
return True
class ServiceManager(IClientHandler):
SUBSCRIBER_PORT = 6000
BANDWIDTH_PORT = 5000
PING_SUBSCRIBERS_SEC = 60
def __init__(self, host: str, port: int, socketio): def __init__(self, host: str, port: int, socketio):
self._host = host self._host = host
self._port = port self._port = port
self._socketio = socketio self._socketio = socketio
self._stop_listen = False self._stop_listen = False
self._servers_pool = [] self._servers_pool = []
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversock.bind((host, ServiceManager.SUBSCRIBER_PORT))
serversock.listen(10)
self._subscribers_server_socket = serversock
self._subscribers = []
def stop(self): def stop(self):
self._stop_listen = True self._stop_listen = True
@ -23,20 +87,183 @@ class ServiceManager(object):
return server return server
def refresh(self): def refresh(self):
from gevent import select
while not self._stop_listen: while not self._stop_listen:
sockets = [] rsockets = []
rsockets.append(self._subscribers_server_socket)
for client in self._subscribers:
rsockets.append(client.socket())
for server in self._servers_pool: for server in self._servers_pool:
if server.is_connected(): if server.is_connected():
sockets.append(server.socket()) rsockets.append(server.socket())
readable, _, _ = select.select(sockets, [], [], 1) readable, writeable, _ = select.select(rsockets, [], [], 1)
ts_sec = make_utc_timestamp() / 1000
for read in readable: for read in readable:
# income subscriber connection
if self._subscribers_server_socket == read:
csock, addr = read.accept()
subs = SubscriberConnection(csock, addr, self)
self.__add_subscriber(subs)
continue
# subscriber read
for client in self._subscribers:
if client.socket() == read:
res = client.recv_data()
if not res:
self.__remove_subscriber(client)
client.disconnect()
break
for server in self._servers_pool: for server in self._servers_pool:
if server.socket() == read: if server.socket() == read:
server.recv_data() server.recv_data()
break break
for client in self._subscribers:
if ts_sec - client.last_ping_ts > ServiceManager.PING_SUBSCRIBERS_SEC:
client.ping()
client.last_ping_ts = ts_sec
def process_response(self, client, req: Request, resp: Response):
if req.method == Commands.SERVER_PING_COMMAND:
self._handle_server_ping_command(client, resp)
elif req.method == Commands.SERVER_GET_CLIENT_INFO_COMMAND:
self._handle_server_get_client_info(client, resp)
def process_request(self, client, req: Request):
if not req:
return
if req.method == Commands.ACTIVATE_COMMAND:
self._handle_activate_subscriber(client, req.id, req.params)
elif req.method == Commands.GET_SERVER_INFO_COMMAND:
self._handle_get_server_info(client, req.id, req.params)
elif req.method == Commands.CLIENT_PING_COMMAND:
self._handle_client_ping(client, req.id, req.params)
elif req.method == Commands.GET_CHANNELS:
self._handle_get_channels(client, req.id, req.params)
elif req.method == Commands.GET_RUNTIME_CHANNEL_INFO:
self._handle_get_runtiime_channel_info(client, req.id, req.params)
else:
pass
def on_client_state_changed(self, client, status: ClientStatus):
pass
# protected
def _check_is_auth_client(self, client) -> bool:
if not client:
return False
return client.is_active()
def _handle_server_ping_command(self, client, resp: Response):
pass
def _handle_server_get_client_info(self, client, resp: Response):
pass
def _handle_activate_subscriber(self, client, cid: str, params: dict):
login = params[Subscriber.EMAIL_FIELD]
password_hash = params[Subscriber.PASSWORD_FIELD]
device_id = params['device_id']
check_user = Subscriber.objects(email=login, class_check=False).first()
if not check_user:
client.activate_fail(cid, 'User not found')
return
if check_user.status == Subscriber.Status.NOT_ACTIVE:
client.activate_fail(cid, 'User not active')
return
if check_user.status == Subscriber.Status.BANNED:
client.activate_fail(cid, 'Banned user')
return
if check_user[Subscriber.PASSWORD_FIELD] != password_hash:
client.activate_fail(cid, 'User invalid password')
return
found_device = None
for device in check_user.devices:
if device.id == device_id:
found_device = device
break
if not found_device:
client.activate_fail(cid, 'Device not found')
return
user_connections = self.get_user_connections_by_email(login)
for conn in user_connections:
if conn.device == found_device:
client.activate_fail(cid, 'Device in use')
return
client.activate_success(cid)
client.info = check_user
client.device = found_device
def _handle_get_server_info(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client):
client.check_activate_fail(cid, 'User not active')
client.disconnect()
return
client.get_server_info_success(cid, '{0}:{1}'.format(self._host, ServiceManager.BANDWIDTH_PORT))
def _handle_client_ping(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client):
client.check_activate_fail(cid, 'User not active')
client.disconnect()
return
client.pong()
def _handle_get_channels(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client):
client.check_activate_fail(cid, 'User not active')
client.disconnect()
return
channels = client.info.get_streams()
client.get_channels_success(cid, channels)
def _handle_get_runtiime_channel_info(self, client, cid: str, params: dict):
if not self._check_is_auth_client(client):
client.check_activate_fail(cid, 'User not active')
client.disconnect()
return
sid = params['id']
watchers = self.get_watchers_by_stream_id(sid)
client.current_stream_id = sid
client.get_runtime_channel_info_success(cid, sid, watchers)
# private # private
def __add_server(self, server: Service): def __add_server(self, server: Service):
self._servers_pool.append(server) self._servers_pool.append(server)
def __add_subscriber(self, subs: SubscriberConnection):
self._subscribers.append(subs)
def __remove_subscriber(self, subs: SubscriberConnection):
self._subscribers.remove(subs)
def get_watchers_by_stream_id(self, sid: str):
total = 0
for user in self._subscribers:
if user.current_stream_id == sid:
total += 1
return total
def get_user_connections_by_email(self, email) -> list:
connections = []
for user in self._subscribers:
if user.info.email == email:
connections.append(user)
return connections