From 6bfd6b4774e61bbe31224a0d8e3c1ba268bba3bd Mon Sep 17 00:00:00 2001 From: topilski Date: Mon, 10 Jun 2019 07:01:32 -0400 Subject: [PATCH] Add subscribers --- app/service/service.py | 4 +-- app/service/service_client.py | 15 ++++++---- app/service/service_entry.py | 7 +++++ app/service/view.py | 5 ++-- app/stream/stream_entry.py | 47 ++++++++++++++++++++++++++++++ app/subscriber/subscriber_entry.py | 45 ++++++++++++++++------------ 6 files changed, 94 insertions(+), 29 deletions(-) diff --git a/app/service/service.py b/app/service/service.py index 731a92a..03a89b0 100644 --- a/app/service/service.py +++ b/app/service/service.py @@ -54,7 +54,7 @@ class Service(IStreamHandler): self._settings = settings self.__reload_from_db() # other fields - self._client = ServiceClient(self, settings) + self._client = ServiceClient(settings.id, self, settings) self._host = host self._port = port self._socketio = socketio @@ -69,7 +69,7 @@ class Service(IStreamHandler): return self._client.stop_service(delay) def get_log_service(self): - return self._client.get_log_service(self._host, self._port, self.id) + return self._client.get_log_service(self._host, self._port) def ping(self): return self._client.ping_service() diff --git a/app/service/service_client.py b/app/service/service_client.py index 9d0f5ef..0e2fa59 100644 --- a/app/service/service_client.py +++ b/app/service/service_client.py @@ -1,3 +1,5 @@ +from bson.objectid import ObjectId + from app.client.client import Client from app.client.client_handler import IClientHandler from app.client.json_rpc import Request, Response @@ -27,7 +29,8 @@ class ServiceClient(IClientHandler): def get_pipeline_stream_path(host: str, port: int, stream_id: str): return constants.DEFAULT_STREAM_PIPELINE_PATH_TEMPLATE_3SIS.format(host, port, stream_id) - def __init__(self, handler: IStreamHandler, settings: ServiceSettings): + def __init__(self, sid: ObjectId, handler: IStreamHandler, settings: ServiceSettings): + self.id = sid self._request_id = 0 self._handler = handler self._service_settings = settings @@ -52,8 +55,9 @@ class ServiceClient(IClientHandler): def stop_service(self, delay: int): return self._client.stop_service(self._gen_request_id(), delay) - def get_log_service(self, host: str, port: int, sid: str): - return self._client.get_log_service(self._gen_request_id(), ServiceClient.get_log_service_path(host, port, sid)) + def get_log_service(self, host: str, port: int): + return self._client.get_log_service(self._gen_request_id(), + ServiceClient.get_log_service_path(host, port, str(self.id))) def start_stream(self, config: dict): return self._client.start_stream(self._gen_request_id(), config) @@ -78,8 +82,9 @@ class ServiceClient(IClientHandler): streams.append(stream.config()) subscribers = [] - for subscriber in self._service_settings.subscribers: - subscribers.append(subscriber.to_service()) + for subs in self._service_settings.subscribers: + subscribers.append(subs.to_service(self.id)) + return self._client.sync_service(self._gen_request_id(), streams, subscribers) def get_http_host(self) -> str: diff --git a/app/service/service_entry.py b/app/service/service_entry.py index 1ed339d..348e766 100644 --- a/app/service/service_entry.py +++ b/app/service/service_entry.py @@ -61,3 +61,10 @@ class ServiceSettings(Document, ServerSettings): self.subscribers.remove(subscriber) break self.save() + + def find_stream_settings_by_id(self, sid): + for stream in self.streams: + if stream.id == sid: + return stream + + return None diff --git a/app/service/view.py b/app/service/view.py index c8e97d3..c7d8cd6 100644 --- a/app/service/view.py +++ b/app/service/view.py @@ -167,10 +167,9 @@ class ServiceView(FlaskView): server = ServiceSettings.objects(id=sid).first() if server: new_entry = form.make_entry() - new_entry.save() - - server.add_subscriber(new_entry.id) new_entry.add_server(server) + + server.add_subscriber(new_entry) return jsonify(status='ok'), 200 return render_template('service/subscriber/add.html', form=form) diff --git a/app/stream/stream_entry.py b/app/stream/stream_entry.py index 98edaac..aecf015 100644 --- a/app/stream/stream_entry.py +++ b/app/stream/stream_entry.py @@ -65,6 +65,46 @@ class StreamFields: TIMESTAMP = 'timestamp' +class EpgInfo: + ID_FIELD = 'id' + URL_FIELD = 'url' + TITLE_FIELD = 'display_name' + ICON_FIELD = 'icon' + PROGRAMS_FIELD = 'programs' + + id = str + url = str + title = str + icon = str + programs = [] + + def __init__(self, eid: str, url: str, title: str, icon: str, programs=list()): + self.id = eid + self.url = url + self.title = title + self.icon = icon + self.programs = programs + + def to_dict(self) -> dict: + return {EpgInfo.ID_FIELD: self.id, EpgInfo.URL_FIELD: self.url, EpgInfo.TITLE_FIELD: self.title, + EpgInfo.ICON_FIELD: self.icon, EpgInfo.PROGRAMS_FIELD: self.programs} + + +class ChannelInfo: + EPG_FIELD = 'epg' + VIDEO_ENABLE_FIELD = 'video' + AUDIO_ENABLE_FIELD = 'audio' + + def __init__(self, epg: EpgInfo, have_video=True, have_audio=True): + self.have_video = have_video + self.have_audio = have_audio + self.epg = epg + + def to_dict(self) -> dict: + return {ChannelInfo.EPG_FIELD: self.epg.to_dict(), ChannelInfo.VIDEO_ENABLE_FIELD: self.have_video, + ChannelInfo.AUDIO_ENABLE_FIELD: self.have_audio} + + class Stream(EmbeddedDocument): meta = {'allow_inheritance': True, 'auto_create_index': True} @@ -159,6 +199,13 @@ class Stream(EmbeddedDocument): conf[AUDIO_SELECT_FIELD] = audio_select return conf + def to_channel_info(self) -> [ChannelInfo]: + ch = [] + for out in self.output.urls: + epg = EpgInfo(self.get_id(), out.uri, self.name, self.icon) + ch.append(ChannelInfo(epg)) + return ch + def generate_feedback_dir(self): return '{0}/{1}/{2}'.format(self._settings.feedback_directory, self.get_type(), self.get_id()) diff --git a/app/subscriber/subscriber_entry.py b/app/subscriber/subscriber_entry.py index 959eec5..ed9155a 100644 --- a/app/subscriber/subscriber_entry.py +++ b/app/subscriber/subscriber_entry.py @@ -7,7 +7,6 @@ from mongoengine import Document, EmbeddedDocument, StringField, DateTimeField, PULL, ObjectIdField from app.service.service_entry import ServiceSettings -from app.stream.stream_entry import Stream class Device(EmbeddedDocument): @@ -29,7 +28,7 @@ class Subscriber(Document): PASSWORD_FIELD = "password" STATUS_FIELD = "status" DEVICES_FIELD = "devices" - STREAMS_FIELD = "streams" + STREAMS_FIELD = "channels" class Status(IntEnum): NO_ACTIVE = 0 @@ -51,38 +50,46 @@ class Subscriber(Document): type = IntField(default=Type.USER) country = StringField(min_length=2, max_length=3, required=True) servers = ListField(ReferenceField(ServiceSettings, reverse_delete_rule=PULL), default=[]) - devices = ListField(Device, default=[]) - streams = ListField(ReferenceField(Stream, reverse_delete_rule=PULL), default=[]) + devices = ListField(Device(), default=[]) + streams = ListField(ObjectIdField(), default=[]) def add_server(self, server: ServiceSettings): self.servers.append(server) self.save() - def add_stream(self, sid): - self.streams.append(sid) + def add_stream(self, stream): + self.streams.append(stream.id) self.save() - def remove_stream(self, sid): + def remove_stream(self, sid: ObjectId): for stream in self.streams: if stream == sid: self.streams.remove(stream) break self.save() - def to_service(self) -> dict: - devices = [] - for dev in self.devices: - devices.append(str(dev.id)) + def to_service(self, sid: ObjectId) -> dict: + for serv in self.servers: + if serv.id == sid: + devices = [] + for dev in self.devices: + devices.append(str(dev.id)) - streams = [] - for stream in self.streams: - streams.append(str(stream)) + streams = [] + for stream in self.streams: + founded_stream = serv.find_stream_settings_by_id(stream) + if founded_stream: + channels = founded_stream.to_channel_info() + for ch in channels: + streams.append(ch.to_dict()) - conf = { - Subscriber.ID_FIELD: str(self.id), Subscriber.EMAIL_FIELD: self.email, - Subscriber.PASSWORD_FIELD: self.password, Subscriber.STATUS_FIELD: self.status, - Subscriber.DEVICES_FIELD: devices, Subscriber.STREAMS_FIELD: streams} - return conf + conf = { + Subscriber.ID_FIELD: str(self.id), Subscriber.EMAIL_FIELD: self.email, + Subscriber.PASSWORD_FIELD: self.password, Subscriber.STATUS_FIELD: self.status, + Subscriber.DEVICES_FIELD: devices, Subscriber.STREAMS_FIELD: streams} + return conf + + return {} @staticmethod def make_md5_hash_from_password(password: str) -> str: