From 96c6066a9123dfbe0b4f9ea6ff69c143b592c022 Mon Sep 17 00:00:00 2001 From: Bumsik Kim Date: Thu, 28 Jun 2018 10:12:08 -0400 Subject: [PATCH] split single virtscreen.py into submodules --- setup.py | 4 +- virtscreen/__init__.py | 1 - virtscreen/display.py | 108 +++++ virtscreen/main.py | 189 +++++++++ virtscreen/path.py | 38 ++ virtscreen/process.py | 99 +++++ virtscreen/qt_backend.py | 355 ++++++++++++++++ virtscreen/virtscreen.py | 894 --------------------------------------- virtscreen/xrandr.py | 138 ++++++ 9 files changed, 929 insertions(+), 897 deletions(-) create mode 100644 virtscreen/display.py create mode 100755 virtscreen/main.py create mode 100644 virtscreen/path.py create mode 100644 virtscreen/process.py create mode 100644 virtscreen/qt_backend.py delete mode 100755 virtscreen/virtscreen.py create mode 100644 virtscreen/xrandr.py diff --git a/setup.py b/setup.py index c2b7f0c..a6b2abb 100644 --- a/setup.py +++ b/setup.py @@ -136,7 +136,7 @@ setup( # # py_modules=["my_module"], # - packages=['virtscreen'], # Required + packages=find_packages(), # Required # This field lists other packages that your project depends on to run. # Any package you put here will be installed by pip when your project is @@ -195,7 +195,7 @@ setup( # executes the function `main` from this package when invoked: entry_points={ # Optional 'console_scripts': [ - 'virtscreen = virtscreen.virtscreen:main', + 'virtscreen = virtscreen.main:main', ], }, diff --git a/virtscreen/__init__.py b/virtscreen/__init__.py index 8be907d..e69de29 100644 --- a/virtscreen/__init__.py +++ b/virtscreen/__init__.py @@ -1 +0,0 @@ -__all__ = ['virtscreen'] \ No newline at end of file diff --git a/virtscreen/display.py b/virtscreen/display.py new file mode 100644 index 0000000..6a162b2 --- /dev/null +++ b/virtscreen/display.py @@ -0,0 +1,108 @@ +"""Display information data classes""" + +from PyQt5.QtCore import QObject, pyqtProperty + + +class Display(object): + """Display information""" + __slots__ = ['name', 'primary', 'connected', 'active', 'width', 'height', + 'x_offset', 'y_offset'] + + def __init__(self): + self.name: str = None + self.primary: bool = False + self.connected: bool = False + self.active: bool = False + self.width: int = 0 + self.height: int = 0 + self.x_offset: int = 0 + self.y_offset: int = 0 + + def __str__(self) -> str: + ret = f"{self.name}" + if self.connected: + ret += " connected" + else: + ret += " disconnected" + if self.primary: + ret += " primary" + if self.active: + ret += f" {self.width}x{self.height}+{self.x_offset}+{self.y_offset}" + else: + ret += f" not active {self.width}x{self.height}" + return ret + + +class DisplayProperty(QObject): + """Wrapper around Display class for Qt""" + def __init__(self, display: Display, parent=None): + super(DisplayProperty, self).__init__(parent) + self._display = display + + @property + def display(self): + return self._display + + @pyqtProperty(str, constant=True) + def name(self): + return self._display.name + + @name.setter + def name(self, name): + self._display.name = name + + @pyqtProperty(bool, constant=True) + def primary(self): + return self._display.primary + + @primary.setter + def primary(self, primary): + self._display.primary = primary + + @pyqtProperty(bool, constant=True) + def connected(self): + return self._display.connected + + @connected.setter + def connected(self, connected): + self._display.connected = connected + + @pyqtProperty(bool, constant=True) + def active(self): + return self._display.active + + @active.setter + def active(self, active): + self._display.active = active + + @pyqtProperty(int, constant=True) + def width(self): + return self._display.width + + @width.setter + def width(self, width): + self._display.width = width + + @pyqtProperty(int, constant=True) + def height(self): + return self._display.height + + @height.setter + def height(self, height): + self._display.height = height + + @pyqtProperty(int, constant=True) + def x_offset(self): + return self._display.x_offset + + @x_offset.setter + def x_offset(self, x_offset): + self._display.x_offset = x_offset + + @pyqtProperty(int, constant=True) + def y_offset(self): + return self._display.y_offset + + @y_offset.setter + def y_offset(self, y_offset): + self._display.y_offset = y_offset diff --git a/virtscreen/main.py b/virtscreen/main.py new file mode 100755 index 0000000..9b273ae --- /dev/null +++ b/virtscreen/main.py @@ -0,0 +1,189 @@ +#!/usr/bin/python3 + +# Python standard packages +import sys +import os +import signal +import json +import shutil +import argparse +from typing import Callable +import asyncio + +# Import OpenGL library for Nvidia driver +# https://github.com/Ultimaker/Cura/pull/131#issuecomment-176088664 +import ctypes +from ctypes.util import find_library +ctypes.CDLL(find_library('GL'), ctypes.RTLD_GLOBAL) + +from PyQt5.QtWidgets import QApplication +from PyQt5.QtQml import qmlRegisterType, QQmlApplicationEngine +from PyQt5.QtGui import QIcon +from PyQt5.QtCore import Qt, QUrl +from quamash import QEventLoop + +from .display import DisplayProperty +from .xrandr import XRandR +from .qt_backend import Backend, Cursor, Network +from .path import HOME_PATH, ICON_PATH, MAIN_QML_PATH, CONFIG_PATH + + +def main() -> None: + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter, + description='Make your iPad/tablet/computer as a secondary monitor on Linux.\n\n' + 'You can start VirtScreen in the following two modes:\n\n' + ' - GUI mode: A system tray icon will appear when no argument passed.\n' + ' You need to use this first to configure a virtual screen.\n' + ' - CLI mode: After configured the virtual screen, you can start VirtScreen\n' + ' in CLI mode if you do not want a GUI, by passing any arguments\n', + epilog='example:\n' + 'virtscreen # GUI mode. You need to use this first\n' + ' to configure the screen\n' + 'virtscreen --auto # CLI mode. Scrren will be created using previous\n' + ' settings (from both GUI mode and CLI mode)\n' + 'virtscreen --left # CLI mode. On the left to the primary monitor\n' + 'virtscreen --below # CLI mode. Below the primary monitor.\n' + 'virtscreen --below --portrait # Below, and portrait mode.\n' + 'virtscreen --below --portrait --hipdi # Below, portrait, HiDPI mode.\n') + parser.add_argument('--auto', action='store_true', + help='create a virtual screen automatically using previous\n' + 'settings (from both GUI mode and CLI mode)') + parser.add_argument('--left', action='store_true', + help='a virtual screen will be created left to the primary\n' + 'monitor') + parser.add_argument('--right', action='store_true', + help='right to the primary monitor') + parser.add_argument('--above', '--up', action='store_true', + help='above the primary monitor') + parser.add_argument('--below', '--down', action='store_true', + help='below the primary monitor') + parser.add_argument('--portrait', action='store_true', + help='Portrait mode. Width and height of the screen are swapped') + parser.add_argument('--hidpi', action='store_true', + help='HiDPI mode. Width and height are doubled') + # Add signal handler + def on_exit(self, signum=None, frame=None): + sys.exit(0) + for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT]: + signal.signal(sig, on_exit) + # Start main + args = parser.parse_args() + if any(vars(args).values()): + main_cli(args) + else: + main_gui() + print('Program should not reach here.') + sys.exit(1) + +def check_env(msg: Callable[[str], None]) -> None: + if os.environ['XDG_SESSION_TYPE'].lower() == 'wayland': + msg("Currently Wayland is not supported") + sys.exit(1) + if not HOME_PATH: + msg("Cannot detect home directory.") + sys.exit(1) + if not os.path.exists(HOME_PATH): + try: + os.makedirs(HOME_PATH) + except: + msg("Cannot create ~/.config/virtscreen") + sys.exit(1) + if not shutil.which('x11vnc'): + msg("x11vnc is not installed.") + sys.exit(1) + try: + test = XRandR() + except RuntimeError as e: + msg(str(e)) + sys.exit(1) + +def main_gui(): + QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) + app = QApplication(sys.argv) + loop = QEventLoop(app) + asyncio.set_event_loop(loop) + + # Check environment first + from PyQt5.QtWidgets import QMessageBox, QSystemTrayIcon + def dialog(message: str) -> None: + QMessageBox.critical(None, "VirtScreen", message) + if not QSystemTrayIcon.isSystemTrayAvailable(): + dialog("Cannot detect system tray on this system.") + sys.exit(1) + check_env(dialog) + + # Replace Twisted reactor with qt5reactor + import qt5reactor # pylint: disable=E0401 + qt5reactor.install() + from twisted.internet import reactor # pylint: disable=E0401 + + app.setWindowIcon(QIcon(ICON_PATH)) + os.environ["QT_QUICK_CONTROLS_STYLE"] = "Material" + # os.environ["QT_QUICK_CONTROLS_STYLE"] = "Fusion" + + # Register the Python type. Its URI is 'People', it's v1.0 and the type + # will be called 'Person' in QML. + qmlRegisterType(DisplayProperty, 'VirtScreen.DisplayProperty', 1, 0, 'DisplayProperty') + qmlRegisterType(Backend, 'VirtScreen.Backend', 1, 0, 'Backend') + qmlRegisterType(Cursor, 'VirtScreen.Cursor', 1, 0, 'Cursor') + qmlRegisterType(Network, 'VirtScreen.Network', 1, 0, 'Network') + + # Create a component factory and load the QML script. + engine = QQmlApplicationEngine() + engine.load(QUrl(MAIN_QML_PATH)) + if not engine.rootObjects(): + dialog("Failed to load QML") + sys.exit(1) + sys.exit(app.exec_()) + with loop: + loop.run_forever() + +def main_cli(args: argparse.Namespace): + loop = asyncio.get_event_loop() + for key, value in vars(args).items(): + print(key, ": ", value) + # Check the environment + check_env(print) + if not os.path.exists(CONFIG_PATH): + print("Configuration file does not exist.\n" + "Configure a virtual screen using GUI first.") + sys.exit(1) + # By instantiating the backend, additional verifications of config + # file will be done. + backend = Backend() + # Get settings + with open(CONFIG_PATH, 'r') as f: + config = json.load(f) + # Override settings from arguments + position = '' + if not args.auto: + args_virt = ['portrait', 'hidpi'] + for prop in args_virt: + if vars(args)[prop]: + config['virt'][prop] = True + args_position = ['left', 'right', 'above', 'below'] + tmp_args = {k: vars(args)[k] for k in args_position} + if not any(tmp_args.values()): + print("Choose a position relative to the primary monitor. (e.g. --left)") + sys.exit(1) + for key, value in tmp_args.items(): + if value: + position = key + # Create virtscreen and Start VNC + def handle_error(msg): + print('Error: ', msg) + sys.exit(1) + backend.onError.connect(handle_error) + backend.createVirtScreen(config['virt']['device'], config['virt']['width'], + config['virt']['height'], config['virt']['portrait'], + config['virt']['hidpi'], position) + def handle_vnc_changed(state): + if state is backend.VNCState.OFF: + sys.exit(0) + backend.onVncStateChanged.connect(handle_vnc_changed) + backend.startVNC(config['vnc']['port']) + loop.run_forever() + +if __name__ == '__main__': + main() diff --git a/virtscreen/path.py b/virtscreen/path.py new file mode 100644 index 0000000..52ccc8c --- /dev/null +++ b/virtscreen/path.py @@ -0,0 +1,38 @@ +"""File path definitions""" + +import os +from pathlib import Path + + +# Sanitize environment variables +# https://wiki.sei.cmu.edu/confluence/display/c/ENV03-C.+Sanitize+the+environment+when+invoking+external+programs + +# Delete $HOME env for security reason. This will make +# Path.home() to look up in the password directory (pwd module) +if 'HOME' in os.environ: + del os.environ['HOME'] +os.environ['HOME'] = str(Path.home()) +os.environ['PATH'] = os.confstr("CS_PATH") # Sanitize $PATH + +# Setting home path and base path +# https://www.freedesktop.org/software/systemd/man/file-hierarchy.html +# HOME_PATH will point to ~/.config/virtscreen by default +if 'XDG_CONFIG_HOME' in os.environ and os.environ['XDG_CONFIG_HOME']: + HOME_PATH = os.environ['XDG_CONFIG_HOME'] +else: + HOME_PATH = os.environ['HOME'] + if HOME_PATH is not None: + HOME_PATH = HOME_PATH + "/.config" +if HOME_PATH is not None: + HOME_PATH = HOME_PATH + "/virtscreen" +BASE_PATH = os.path.dirname(__file__) +# Path in ~/.virtscreen +X11VNC_LOG_PATH = HOME_PATH + "/x11vnc_log.txt" +X11VNC_PASSWORD_PATH = HOME_PATH + "/x11vnc_passwd" +CONFIG_PATH = HOME_PATH + "/config.json" +# Path in the program path +ICON_PATH = BASE_PATH + "/icon/icon.png" +ASSETS_PATH = BASE_PATH + "/assets" +DATA_PATH = ASSETS_PATH + "/data.json" +DEFAULT_CONFIG_PATH = ASSETS_PATH + "/config.default.json" +MAIN_QML_PATH = ASSETS_PATH + "/main.qml" diff --git a/virtscreen/process.py b/virtscreen/process.py new file mode 100644 index 0000000..b3b197b --- /dev/null +++ b/virtscreen/process.py @@ -0,0 +1,99 @@ +"""Subprocess wrapper""" + +import subprocess +import asyncio +import signal +import shlex +import os + + +class SubprocessWrapper: + """Subprocess wrapper class""" + def __init__(self): + pass + + def check_output(self, arg) -> None: + return subprocess.check_output(shlex.split(arg), stderr=subprocess.STDOUT).decode('utf-8') + + def run(self, arg: str, input: str = None, check=False) -> str: + if input: + input = input.encode('utf-8') + return subprocess.run(shlex.split(arg), input=input, stdout=subprocess.PIPE, + check=check, stderr=subprocess.STDOUT).stdout.decode('utf-8') + + +class _Protocol(asyncio.SubprocessProtocol): + """SubprocessProtocol implementation""" + + def __init__(self, outer): + self.outer = outer + self.transport: asyncio.SubprocessTransport + + def connection_made(self, transport): + print("connectionMade!") + self.outer.connected() + self.transport = transport + transport.get_pipe_transport(0).close() # No more input + + def pipe_data_received(self, fd, data): + if fd == 1: # stdout + self.outer.out_recevied(data) + if self.outer.logfile is not None: + self.outer.logfile.write(data) + elif fd == 2: # stderr + self.outer.err_recevied(data) + if self.outer.logfile is not None: + self.outer.logfile.write(data) + + def pipe_connection_lost(self, fd, exc): + if fd == 0: # stdin + print("stdin is closed. (we probably did it)") + elif fd == 1: # stdout + print("The child closed their stdout.") + elif fd == 2: # stderr + print("The child closed their stderr.") + + def connection_lost(self, exc): + print("Subprocess connection lost.") + + def process_exited(self): + if self.outer.logfile is not None: + self.outer.logfile.close() + self.transport.close() + return_code = self.transport.get_returncode() + if return_code is None: + print("Unknown exit") + self.outer.ended(1) + return + print("processEnded, status", return_code) + self.outer.ended(return_code) + + +class AsyncSubprocess(): + """Asynchronous subprocess wrapper class""" + + def __init__(self, connected, out_recevied, err_recevied, ended, logfile=None): + self.connected = connected + self.out_recevied = out_recevied + self.err_recevied = err_recevied + self.ended = ended + self.logfile = logfile + self.transport: asyncio.SubprocessTransport + self.protocol: _Protocol + + async def _run(self, arg: str, loop: asyncio.AbstractEventLoop): + self.transport, self.protocol = await loop.subprocess_exec( + lambda: _Protocol(self), *shlex.split(arg), env=os.environ) + + def run(self, arg: str): + """Spawn a process. + + Arguments: + arg {str} -- arguments in string + """ + loop = asyncio.get_event_loop() + loop.create_task(self._run(arg, loop)) + + def close(self): + """Kill a spawned process.""" + self.transport.send_signal(signal.SIGINT) diff --git a/virtscreen/qt_backend.py b/virtscreen/qt_backend.py new file mode 100644 index 0000000..1c7e465 --- /dev/null +++ b/virtscreen/qt_backend.py @@ -0,0 +1,355 @@ +"""GUI backend""" + +import json +import re +import subprocess +import os +import shutil +import atexit +import time + +from PyQt5.QtCore import QObject, pyqtProperty, pyqtSlot, pyqtSignal, Q_ENUMS +from PyQt5.QtGui import QCursor +from PyQt5.QtQml import QQmlListProperty +from PyQt5.QtWidgets import QApplication +from netifaces import interfaces, ifaddresses, AF_INET + +from .display import DisplayProperty +from .xrandr import XRandR +from .process import AsyncSubprocess, SubprocessWrapper +from .path import (DATA_PATH, CONFIG_PATH, DEFAULT_CONFIG_PATH, + X11VNC_PASSWORD_PATH, X11VNC_LOG_PATH) + + +class Backend(QObject): + """ Backend class for QML frontend """ + + class VNCState: + """ Enum to indicate a state of the VNC server """ + OFF = 0 + ERROR = 1 + WAITING = 2 + CONNECTED = 3 + + Q_ENUMS(VNCState) + + # Signals + onVirtScreenCreatedChanged = pyqtSignal(bool) + onVncUsePasswordChanged = pyqtSignal(bool) + onVncStateChanged = pyqtSignal(VNCState) + onDisplaySettingClosed = pyqtSignal() + onError = pyqtSignal(str) + + def __init__(self, parent=None): + super(Backend, self).__init__(parent) + # Virtual screen properties + self.xrandr: XRandR = XRandR() + self._virtScreenCreated: bool = False + # VNC server properties + self._vncUsePassword: bool = False + self._vncState: self.VNCState = self.VNCState.OFF + # Primary screen and mouse posistion + self.vncServer: AsyncSubprocess + # Check config file + # and initialize if needed + need_init = False + if not os.path.exists(CONFIG_PATH): + shutil.copy(DEFAULT_CONFIG_PATH, CONFIG_PATH) + need_init = True + # Version check + file_match = True + with open(CONFIG_PATH, 'r') as f_config, open(DATA_PATH, 'r') as f_data: + config = json.load(f_config) + data = json.load(f_data) + if config['version'] != data['version']: + file_match = False + # Override config with default when version doesn't match + if not file_match: + shutil.copy(DEFAULT_CONFIG_PATH, CONFIG_PATH) + need_init = True + # initialize config file + if need_init: + # 1. Available x11vnc options + # Get available x11vnc options from x11vnc first + p = SubprocessWrapper() + arg = 'x11vnc -opts' + ret = p.run(arg) + options = tuple(m.group(1) for m in re.finditer("\s*(-\w+)\s+", ret)) + # Set/unset available x11vnc options flags in config + with open(CONFIG_PATH, 'r') as f, open(DATA_PATH, 'r') as f_data: + config = json.load(f) + data = json.load(f_data) + for key, value in config["x11vncOptions"].items(): + if key in options: + value["available"] = True + else: + value["available"] = False + # Default Display settings app for a Desktop Environment + desktop_environ = os.environ['XDG_CURRENT_DESKTOP'].lower() + for key, value in data['displaySettingApps'].items(): + for de in value['XDG_CURRENT_DESKTOP']: + if de in desktop_environ: + config["displaySettingApp"] = key + # Save the new config + with open(CONFIG_PATH, 'w') as f: + f.write(json.dumps(config, indent=4, sort_keys=True)) + + # Qt properties + @pyqtProperty(str, constant=True) + def settings(self): + with open(CONFIG_PATH, "r") as f: + return f.read() + + @settings.setter + def settings(self, json_str): + with open(CONFIG_PATH, "w") as f: + f.write(json_str) + + @pyqtProperty(bool, notify=onVirtScreenCreatedChanged) + def virtScreenCreated(self): + return self._virtScreenCreated + + @virtScreenCreated.setter + def virtScreenCreated(self, value): + self._virtScreenCreated = value + self.onVirtScreenCreatedChanged.emit(value) + + @pyqtProperty(QQmlListProperty, constant=True) + def screens(self): + try: + return QQmlListProperty(DisplayProperty, self, [DisplayProperty(x) for x in self.xrandr.screens]) + except RuntimeError as e: + self.onError.emit(str(e)) + return QQmlListProperty(DisplayProperty, self, []) + + @pyqtProperty(bool, notify=onVncUsePasswordChanged) + def vncUsePassword(self): + if os.path.isfile(X11VNC_PASSWORD_PATH): + self._vncUsePassword = True + else: + if self._vncUsePassword: + self.vncUsePassword = False + return self._vncUsePassword + + @vncUsePassword.setter + def vncUsePassword(self, use): + self._vncUsePassword = use + self.onVncUsePasswordChanged.emit(use) + + @pyqtProperty(VNCState, notify=onVncStateChanged) + def vncState(self): + return self._vncState + + @vncState.setter + def vncState(self, state): + self._vncState = state + self.onVncStateChanged.emit(self._vncState) + + # Qt Slots + @pyqtSlot(str, int, int, bool, bool) + def createVirtScreen(self, device, width, height, portrait, hidpi, pos=''): + self.xrandr.virt_name = device + print("Creating a Virtual Screen...") + try: + self.xrandr.create_virtual_screen(width, height, portrait, hidpi, pos) + except subprocess.CalledProcessError as e: + self.onError.emit(str(e.cmd) + '\n' + e.stdout.decode('utf-8')) + return + except RuntimeError as e: + self.onError.emit(str(e)) + return + self.virtScreenCreated = True + + @pyqtSlot() + def deleteVirtScreen(self): + print("Deleting the Virtual Screen...") + if self.vncState is not self.VNCState.OFF: + self.onError.emit("Turn off the VNC server first") + self.virtScreenCreated = True + return + try: + self.xrandr.delete_virtual_screen() + except RuntimeError as e: + self.onError.emit(str(e)) + return + self.virtScreenCreated = False + + @pyqtSlot(str) + def createVNCPassword(self, password): + if password: + password += '\n' + password + '\n\n' # verify + confirm + p = SubprocessWrapper() + try: + p.run(f"x11vnc -storepasswd {X11VNC_PASSWORD_PATH}", input=password, check=True) + except subprocess.CalledProcessError as e: + self.onError.emit(str(e.cmd) + '\n' + e.stdout.decode('utf-8')) + return + self.vncUsePassword = True + else: + self.onError.emit("Empty password") + + @pyqtSlot() + def deleteVNCPassword(self): + if os.path.isfile(X11VNC_PASSWORD_PATH): + os.remove(X11VNC_PASSWORD_PATH) + self.vncUsePassword = False + else: + self.onError.emit("Failed deleting the password file") + + @pyqtSlot(int) + def startVNC(self, port): + # Check if a virtual screen created + if not self.virtScreenCreated: + self.onError.emit("Virtual Screen not crated.") + return + if self.vncState is not self.VNCState.OFF: + self.onError.emit("VNC Server is already running.") + return + # regex used in callbacks + patter_connected = re.compile(r"^.*Got connection from client.*$", re.M) + patter_disconnected = re.compile(r"^.*client_count: 0*$", re.M) + + # define callbacks + def _connected(): + print("VNC started.") + self.vncState = self.VNCState.WAITING + + def _received(data): + data = data.decode("utf-8") + if (self._vncState is not self.VNCState.CONNECTED) and patter_connected.search(data): + print("VNC connected.") + self.vncState = self.VNCState.CONNECTED + if (self._vncState is self.VNCState.CONNECTED) and patter_disconnected.search(data): + print("VNC disconnected.") + self.vncState = self.VNCState.WAITING + + def _ended(exitCode): + if exitCode is not 0: + self.vncState = self.VNCState.ERROR + self.onError.emit('X11VNC: Error occurred.\n' + 'Double check if the port is already used.') + self.vncState = self.VNCState.OFF # TODO: better handling error state + else: + self.vncState = self.VNCState.OFF + print("VNC Exited.") + atexit.unregister(self.stopVNC) + # load settings + with open(CONFIG_PATH, 'r') as f: + config = json.load(f) + options = '' + if config['customX11vncArgs']['enabled']: + options = config['customX11vncArgs']['value'] + else: + for key, value in config['x11vncOptions'].items(): + if value['available'] and value['enabled']: + options += key + ' ' + if value['arg'] is not None: + options += str(value['arg']) + ' ' + # Sart x11vnc, turn settings object into VNC arguments format + logfile = open(X11VNC_LOG_PATH, "wb") + self.vncServer = AsyncSubprocess(_connected, _received, _received, _ended, logfile) + try: + virt = self.xrandr.get_virtual_screen() + except RuntimeError as e: + self.onError.emit(str(e)) + return + clip = f"{virt.width}x{virt.height}+{virt.x_offset}+{virt.y_offset}" + arg = f"x11vnc -rfbport {port} -clip {clip} {options}" + if self.vncUsePassword: + arg += f" -rfbauth {X11VNC_PASSWORD_PATH}" + self.vncServer.run(arg) + # auto stop on exit + atexit.register(self.stopVNC, force=True) + + @pyqtSlot(str) + def openDisplaySetting(self, app: str = "arandr"): + # define callbacks + def _connected(): + print("External Display Setting opened.") + + def _received(data): + pass + + def _ended(exitCode): + print("External Display Setting closed.") + self.onDisplaySettingClosed.emit() + if exitCode is not 0: + self.onError.emit(f'Error opening "{running_program}".') + with open(DATA_PATH, 'r') as f: + data = json.load(f)['displaySettingApps'] + if app not in data: + self.onError.emit('Wrong display settings program') + return + program_list = [data[app]['args'], "arandr"] + program = AsyncSubprocess(_connected, _received, _received, _ended, None) + running_program = '' + for arg in program_list: + if not shutil.which(arg.split()[0]): + continue + running_program = arg + program.run(arg) + return + self.onError.emit('Failed to find a display settings program.\n' + 'Please install ARandR package.\n' + '(e.g. sudo apt-get install arandr)\n' + 'Please issue a feature request\n' + 'if you wish to add a display settings\n' + 'program for your Desktop Environment.') + + @pyqtSlot() + def stopVNC(self, force=False): + if force: + # Usually called from atexit(). + self.vncServer.close() + time.sleep(3) # Make sure X11VNC shutdown before execute next atexit(). + if self._vncState in (self.VNCState.WAITING, self.VNCState.CONNECTED): + self.vncServer.close() + else: + self.onError.emit("stopVNC called while it is not running") + + @pyqtSlot() + def clearCache(self): + # engine.clearComponentCache() + pass + + @pyqtSlot() + def quitProgram(self): + self.blockSignals(True) # This will prevent invoking auto-restart or etc. + QApplication.instance().quit() + + +class Cursor(QObject): + """ Global mouse cursor position """ + + def __init__(self, parent=None): + super(Cursor, self).__init__(parent) + + @pyqtProperty(int) + def x(self): + cursor = QCursor().pos() + return cursor.x() + + @pyqtProperty(int) + def y(self): + cursor = QCursor().pos() + return cursor.y() + + +class Network(QObject): + """ Backend class for network interfaces """ + onIPAddressesChanged = pyqtSignal() + + def __init__(self, parent=None): + super(Network, self).__init__(parent) + + @pyqtProperty('QStringList', notify=onIPAddressesChanged) + def ipAddresses(self): + for interface in interfaces(): + if interface == 'lo': + continue + addresses = ifaddresses(interface).get(AF_INET, None) + if addresses is None: + continue + for link in addresses: + if link is not None: + yield link['addr'] diff --git a/virtscreen/virtscreen.py b/virtscreen/virtscreen.py deleted file mode 100755 index 5262587..0000000 --- a/virtscreen/virtscreen.py +++ /dev/null @@ -1,894 +0,0 @@ -#!/usr/bin/python3 - -# Python standard packages -import sys -import os -import subprocess -import signal -import re -import atexit -import time -import json -import shutil -import argparse -import shlex -from pathlib import Path -from enum import Enum -from typing import List, Dict, Callable -import asyncio - -# Import OpenGL library for Nvidia driver -# https://github.com/Ultimaker/Cura/pull/131#issuecomment-176088664 -import ctypes -from ctypes.util import find_library -ctypes.CDLL(find_library('GL'), ctypes.RTLD_GLOBAL) - -# PyQt5 packages -from PyQt5.QtWidgets import QApplication -from PyQt5.QtCore import QObject, QUrl, Qt, pyqtProperty, pyqtSlot, pyqtSignal, Q_ENUMS -from PyQt5.QtGui import QIcon, QCursor -from PyQt5.QtQml import qmlRegisterType, QQmlApplicationEngine, QQmlListProperty -from quamash import QEventLoop -from netifaces import interfaces, ifaddresses, AF_INET - -# ------------------------------------------------------------------------------- -# file path definitions -# ------------------------------------------------------------------------------- -# Sanitize environment variables -# https://wiki.sei.cmu.edu/confluence/display/c/ENV03-C.+Sanitize+the+environment+when+invoking+external+programs - -# Delete $HOME env for security reason. This will make -# Path.home() to look up in the password directory (pwd module) -if 'HOME' in os.environ: - del os.environ['HOME'] -os.environ['HOME'] = str(Path.home()) -os.environ['PATH'] = os.confstr("CS_PATH") # Sanitize $PATH - -# Setting home path and base path -# https://www.freedesktop.org/software/systemd/man/file-hierarchy.html -# HOME_PATH will point to ~/.config/virtscreen by default -if 'XDG_CONFIG_HOME' in os.environ and os.environ['XDG_CONFIG_HOME']: - HOME_PATH = os.environ['XDG_CONFIG_HOME'] -else: - HOME_PATH = os.environ['HOME'] - if HOME_PATH is not None: - HOME_PATH = HOME_PATH + "/.config" -if HOME_PATH is not None: - HOME_PATH = HOME_PATH + "/virtscreen" -BASE_PATH = os.path.dirname(__file__) -# Path in ~/.virtscreen -X11VNC_LOG_PATH = HOME_PATH + "/x11vnc_log.txt" -X11VNC_PASSWORD_PATH = HOME_PATH + "/x11vnc_passwd" -CONFIG_PATH = HOME_PATH + "/config.json" -# Path in the program path -ICON_PATH = BASE_PATH + "/icon/icon.png" -ASSETS_PATH = BASE_PATH + "/assets" -DATA_PATH = ASSETS_PATH + "/data.json" -DEFAULT_CONFIG_PATH = ASSETS_PATH + "/config.default.json" -MAIN_QML_PATH = ASSETS_PATH + "/main.qml" - - -# ------------------------------------------------------------------------------- -# Subprocess wrapper -# ------------------------------------------------------------------------------- -class SubprocessWrapper: - def __init__(self): - pass - - def check_output(self, arg) -> None: - return subprocess.check_output(shlex.split(arg), stderr=subprocess.STDOUT).decode('utf-8') - - def run(self, arg: str, input: str = None, check=False) -> str: - if input: - input = input.encode('utf-8') - return subprocess.run(shlex.split(arg), input=input, stdout=subprocess.PIPE, - check=check, stderr=subprocess.STDOUT).stdout.decode('utf-8') - - -# ------------------------------------------------------------------------------- -# Asynchronous subprocess wrapper class -# ------------------------------------------------------------------------------- -class AsyncSubprocess(): - class Protocol(asyncio.SubprocessProtocol): - def __init__(self, outer): - self.outer = outer - self.transport: asyncio.SubprocessTransport - - def connection_made(self, transport): - print("connectionMade!") - self.outer.connected() - self.transport = transport - transport.get_pipe_transport(0).close() # No more input - - def pipe_data_received(self, fd, data): - if fd == 1: # stdout - self.outer.out_recevied(data) - if self.outer.logfile is not None: - self.outer.logfile.write(data) - elif fd == 2: # stderr - self.outer.err_recevied(data) - if self.outer.logfile is not None: - self.outer.logfile.write(data) - - def pipe_connection_lost(self, fd, exc): - if fd == 0: # stdin - print("stdin is closed. (we probably did it)") - elif fd == 1: # stdout - print("The child closed their stdout.") - elif fd == 2: # stderr - print("The child closed their stderr.") - - def connection_lost(self, exc): - print("Subprocess connection lost.") - - def process_exited(self): - if self.outer.logfile is not None: - self.outer.logfile.close() - self.transport.close() - return_code = self.transport.get_returncode() - if return_code is None: - print("Unknown exit") - self.outer.ended(1) - return - print("processEnded, status", return_code) - self.outer.ended(return_code) - - def __init__(self, connected, out_recevied, err_recevied, ended, logfile=None): - self.connected = connected - self.out_recevied = out_recevied - self.err_recevied = err_recevied - self.ended = ended - self.logfile = logfile - self.transport: asyncio.SubprocessTransport - self.protocol: self.Protocol - - async def _run(self, arg: str, loop: asyncio.AbstractEventLoop): - self.transport, self.protocol = await loop.subprocess_exec( - lambda: self.Protocol(self), *shlex.split(arg), env=os.environ) - - def run(self, arg: str): - """Spawn a process. - - Arguments: - arg {str} -- arguments in string - """ - loop = asyncio.get_event_loop() - loop.create_task(self._run(arg, loop)) - - def close(self): - """Kill a spawned process.""" - self.transport.send_signal(signal.SIGINT) - - -# ------------------------------------------------------------------------------- -# Display properties -# ------------------------------------------------------------------------------- -class Display(object): - __slots__ = ['name', 'primary', 'connected', 'active', 'width', 'height', 'x_offset', 'y_offset'] - - def __init__(self): - self.name: str = None - self.primary: bool = False - self.connected: bool = False - self.active: bool = False - self.width: int = 0 - self.height: int = 0 - self.x_offset: int = 0 - self.y_offset: int = 0 - - def __str__(self): - ret = f"{self.name}" - if self.connected: - ret += " connected" - else: - ret += " disconnected" - if self.primary: - ret += " primary" - if self.active: - ret += f" {self.width}x{self.height}+{self.x_offset}+{self.y_offset}" - else: - ret += f" not active {self.width}x{self.height}" - return ret - - -class DisplayProperty(QObject): - def __init__(self, display: Display, parent=None): - super(DisplayProperty, self).__init__(parent) - self._display = display - - @property - def display(self): - return self._display - - @pyqtProperty(str, constant=True) - def name(self): - return self._display.name - - @name.setter - def name(self, name): - self._display.name = name - - @pyqtProperty(bool, constant=True) - def primary(self): - return self._display.primary - - @primary.setter - def primary(self, primary): - self._display.primary = primary - - @pyqtProperty(bool, constant=True) - def connected(self): - return self._display.connected - - @connected.setter - def connected(self, connected): - self._display.connected = connected - - @pyqtProperty(bool, constant=True) - def active(self): - return self._display.active - - @active.setter - def active(self, active): - self._display.active = active - - @pyqtProperty(int, constant=True) - def width(self): - return self._display.width - - @width.setter - def width(self, width): - self._display.width = width - - @pyqtProperty(int, constant=True) - def height(self): - return self._display.height - - @height.setter - def height(self, height): - self._display.height = height - - @pyqtProperty(int, constant=True) - def x_offset(self): - return self._display.x_offset - - @x_offset.setter - def x_offset(self, x_offset): - self._display.x_offset = x_offset - - @pyqtProperty(int, constant=True) - def y_offset(self): - return self._display.y_offset - - @y_offset.setter - def y_offset(self, y_offset): - self._display.y_offset = y_offset - - -# ------------------------------------------------------------------------------- -# Screen adjustment class -# ------------------------------------------------------------------------------- -class XRandR(SubprocessWrapper): - VIRT_SCREEN_SUFFIX = "_virt" - - def __init__(self): - super(XRandR, self).__init__() - self.mode_name: str - self.screens: List[Display] = [] - self.virt: Display() = None - self.primary: Display() = None - self.virt_name: str = '' - self.virt_idx: int = None - self.primary_idx: int = None - # Primary display - self._update_screens() - - def _update_screens(self) -> None: - output = self.run("xrandr") - self.primary = None - self.virt = None - self.screens = [] - self.virt_idx = None - self.primary_idx = None - pattern = re.compile(r"^(\S*)\s+(connected|disconnected)\s+((primary)\s+)?" - r"((\d+)x(\d+)\+(\d+)\+(\d+)\s+)?.*$", re.M) - for idx, match in enumerate(pattern.finditer(output)): - screen = Display() - screen.name = match.group(1) - if self.virt_name and screen.name == self.virt_name: - self.virt_idx = idx - screen.primary = True if match.group(4) else False - if screen.primary: - self.primary_idx = idx - screen.connected = True if match.group(2) == "connected" else False - screen.active = True if match.group(5) else False - self.screens.append(screen) - if not screen.active: - continue - screen.width = int(match.group(6)) - screen.height = int(match.group(7)) - screen.x_offset = int(match.group(8)) - screen.y_offset = int(match.group(9)) - print("Display information:") - for s in self.screens: - print("\t", s) - if self.primary_idx is None: - raise RuntimeError("There is no primary screen detected.\n" - "Go to display settings and set\n" - "a primary screen\n") - if self.virt_idx == self.primary_idx: - raise RuntimeError("Virtual screen must be selected other than the primary screen") - if self.virt_idx is not None: - self.virt = self.screens[self.virt_idx] - elif self.virt_name and self.virt_idx is None: - raise RuntimeError("No virtual screen name found") - self.primary = self.screens[self.primary_idx] - - def _add_screen_mode(self, width, height, portrait, hidpi) -> None: - if not self.virt or not self.virt_name: - raise RuntimeError("No virtual screen selected.\n" - "Go to Display->Virtual Display->Advaced\n" - "To select a device.") - # Set virtual screen property first - self.virt.width = width - self.virt.height = height - if portrait: - self.virt.width = height - self.virt.height = width - if hidpi: - self.virt.width *= 2 - self.virt.height *= 2 - self.mode_name = str(self.virt.width) + "x" + str(self.virt.height) + self.VIRT_SCREEN_SUFFIX - # Then create using xrandr command - args_addmode = f"xrandr --addmode {self.virt.name} {self.mode_name}" - try: - self.check_output(args_addmode) - except subprocess.CalledProcessError: - # When failed create mode and then add again - output = self.run(f"cvt {self.virt.width} {self.virt.height}") - mode = re.search(r"^.*Modeline\s*\".*\"\s*(.*)$", output, re.M).group(1) - # Create new screen mode - self.check_output(f"xrandr --newmode {self.mode_name} {mode}") - # Add mode again - self.check_output(args_addmode) - # After adding mode the program should delete the mode automatically on exit - atexit.register(self.delete_virtual_screen) - - def get_primary_screen(self) -> Display: - self._update_screens() - return self.primary - - def get_virtual_screen(self) -> Display: - self._update_screens() - return self.virt - - def create_virtual_screen(self, width, height, portrait=False, hidpi=False, pos='') -> None: - self._update_screens() - print("creating: ", self.virt) - self._add_screen_mode(width, height, portrait, hidpi) - arg_pos = ['left', 'right', 'above', 'below'] - xrandr_pos = ['--left-of', '--right-of', '--above', '--below'] - if pos and pos in arg_pos: - # convert pos for xrandr - pos = xrandr_pos[arg_pos.index(pos)] - pos += ' ' + self.primary.name - elif not pos: - pos = '--preferred' - else: - raise RuntimeError("Incorrect position option selected.") - self.check_output(f"xrandr --output {self.virt.name} --mode {self.mode_name}") - self.check_output("sleep 5") - self.check_output(f"xrandr --output {self.virt.name} {pos}") - self._update_screens() - - def delete_virtual_screen(self) -> None: - self._update_screens() - try: - self.virt.name - self.mode_name - except AttributeError: - return - self.run(f"xrandr --output {self.virt.name} --off") - self.run(f"xrandr --delmode {self.virt.name} {self.mode_name}") - atexit.unregister(self.delete_virtual_screen) - self._update_screens() - - -# ------------------------------------------------------------------------------- -# QML Backend class -# ------------------------------------------------------------------------------- -class Backend(QObject): - """ Backend class for QML frontend """ - - class VNCState: - """ Enum to indicate a state of the VNC server """ - OFF = 0 - ERROR = 1 - WAITING = 2 - CONNECTED = 3 - - Q_ENUMS(VNCState) - - # Signals - onVirtScreenCreatedChanged = pyqtSignal(bool) - onVncUsePasswordChanged = pyqtSignal(bool) - onVncStateChanged = pyqtSignal(VNCState) - onDisplaySettingClosed = pyqtSignal() - onError = pyqtSignal(str) - - def __init__(self, parent=None): - super(Backend, self).__init__(parent) - # Virtual screen properties - self.xrandr: XRandR = XRandR() - self._virtScreenCreated: bool = False - # VNC server properties - self._vncUsePassword: bool = False - self._vncState: self.VNCState = self.VNCState.OFF - # Primary screen and mouse posistion - self.vncServer: AsyncSubprocess - # Check config file - # and initialize if needed - need_init = False - if not os.path.exists(CONFIG_PATH): - shutil.copy(DEFAULT_CONFIG_PATH, CONFIG_PATH) - need_init = True - # Version check - file_match = True - with open(CONFIG_PATH, 'r') as f_config, open(DATA_PATH, 'r') as f_data: - config = json.load(f_config) - data = json.load(f_data) - if config['version'] != data['version']: - file_match = False - # Override config with default when version doesn't match - if not file_match: - shutil.copy(DEFAULT_CONFIG_PATH, CONFIG_PATH) - need_init = True - # initialize config file - if need_init: - # 1. Available x11vnc options - # Get available x11vnc options from x11vnc first - p = SubprocessWrapper() - arg = 'x11vnc -opts' - ret = p.run(arg) - options = tuple(m.group(1) for m in re.finditer("\s*(-\w+)\s+", ret)) - # Set/unset available x11vnc options flags in config - with open(CONFIG_PATH, 'r') as f, open(DATA_PATH, 'r') as f_data: - config = json.load(f) - data = json.load(f_data) - for key, value in config["x11vncOptions"].items(): - if key in options: - value["available"] = True - else: - value["available"] = False - # Default Display settings app for a Desktop Environment - desktop_environ = os.environ['XDG_CURRENT_DESKTOP'].lower() - for key, value in data['displaySettingApps'].items(): - for de in value['XDG_CURRENT_DESKTOP']: - if de in desktop_environ: - config["displaySettingApp"] = key - # Save the new config - with open(CONFIG_PATH, 'w') as f: - f.write(json.dumps(config, indent=4, sort_keys=True)) - - # Qt properties - @pyqtProperty(str, constant=True) - def settings(self): - with open(CONFIG_PATH, "r") as f: - return f.read() - - @settings.setter - def settings(self, json_str): - with open(CONFIG_PATH, "w") as f: - f.write(json_str) - - @pyqtProperty(bool, notify=onVirtScreenCreatedChanged) - def virtScreenCreated(self): - return self._virtScreenCreated - - @virtScreenCreated.setter - def virtScreenCreated(self, value): - self._virtScreenCreated = value - self.onVirtScreenCreatedChanged.emit(value) - - @pyqtProperty(QQmlListProperty, constant=True) - def screens(self): - try: - return QQmlListProperty(DisplayProperty, self, [DisplayProperty(x) for x in self.xrandr.screens]) - except RuntimeError as e: - self.onError.emit(str(e)) - return QQmlListProperty(DisplayProperty, self, []) - - @pyqtProperty(bool, notify=onVncUsePasswordChanged) - def vncUsePassword(self): - if os.path.isfile(X11VNC_PASSWORD_PATH): - self._vncUsePassword = True - else: - if self._vncUsePassword: - self.vncUsePassword = False - return self._vncUsePassword - - @vncUsePassword.setter - def vncUsePassword(self, use): - self._vncUsePassword = use - self.onVncUsePasswordChanged.emit(use) - - @pyqtProperty(VNCState, notify=onVncStateChanged) - def vncState(self): - return self._vncState - - @vncState.setter - def vncState(self, state): - self._vncState = state - self.onVncStateChanged.emit(self._vncState) - - # Qt Slots - @pyqtSlot(str, int, int, bool, bool) - def createVirtScreen(self, device, width, height, portrait, hidpi, pos=''): - self.xrandr.virt_name = device - print("Creating a Virtual Screen...") - try: - self.xrandr.create_virtual_screen(width, height, portrait, hidpi, pos) - except subprocess.CalledProcessError as e: - self.onError.emit(str(e.cmd) + '\n' + e.stdout.decode('utf-8')) - return - except RuntimeError as e: - self.onError.emit(str(e)) - return - self.virtScreenCreated = True - - @pyqtSlot() - def deleteVirtScreen(self): - print("Deleting the Virtual Screen...") - if self.vncState is not self.VNCState.OFF: - self.onError.emit("Turn off the VNC server first") - self.virtScreenCreated = True - return - try: - self.xrandr.delete_virtual_screen() - except RuntimeError as e: - self.onError.emit(str(e)) - return - self.virtScreenCreated = False - - @pyqtSlot(str) - def createVNCPassword(self, password): - if password: - password += '\n' + password + '\n\n' # verify + confirm - p = SubprocessWrapper() - try: - p.run(f"x11vnc -storepasswd {X11VNC_PASSWORD_PATH}", input=password, check=True) - except subprocess.CalledProcessError as e: - self.onError.emit(str(e.cmd) + '\n' + e.stdout.decode('utf-8')) - return - self.vncUsePassword = True - else: - self.onError.emit("Empty password") - - @pyqtSlot() - def deleteVNCPassword(self): - if os.path.isfile(X11VNC_PASSWORD_PATH): - os.remove(X11VNC_PASSWORD_PATH) - self.vncUsePassword = False - else: - self.onError.emit("Failed deleting the password file") - - @pyqtSlot(int) - def startVNC(self, port): - # Check if a virtual screen created - if not self.virtScreenCreated: - self.onError.emit("Virtual Screen not crated.") - return - if self.vncState is not self.VNCState.OFF: - self.onError.emit("VNC Server is already running.") - return - # regex used in callbacks - patter_connected = re.compile(r"^.*Got connection from client.*$", re.M) - patter_disconnected = re.compile(r"^.*client_count: 0*$", re.M) - - # define callbacks - def _connected(): - print("VNC started.") - self.vncState = self.VNCState.WAITING - - def _received(data): - data = data.decode("utf-8") - if (self._vncState is not self.VNCState.CONNECTED) and patter_connected.search(data): - print("VNC connected.") - self.vncState = self.VNCState.CONNECTED - if (self._vncState is self.VNCState.CONNECTED) and patter_disconnected.search(data): - print("VNC disconnected.") - self.vncState = self.VNCState.WAITING - - def _ended(exitCode): - if exitCode is not 0: - self.vncState = self.VNCState.ERROR - self.onError.emit('X11VNC: Error occurred.\n' - 'Double check if the port is already used.') - self.vncState = self.VNCState.OFF # TODO: better handling error state - else: - self.vncState = self.VNCState.OFF - print("VNC Exited.") - atexit.unregister(self.stopVNC) - # load settings - with open(CONFIG_PATH, 'r') as f: - config = json.load(f) - options = '' - if config['customX11vncArgs']['enabled']: - options = config['customX11vncArgs']['value'] - else: - for key, value in config['x11vncOptions'].items(): - if value['available'] and value['enabled']: - options += key + ' ' - if value['arg'] is not None: - options += str(value['arg']) + ' ' - # Sart x11vnc, turn settings object into VNC arguments format - logfile = open(X11VNC_LOG_PATH, "wb") - self.vncServer = AsyncSubprocess(_connected, _received, _received, _ended, logfile) - try: - virt = self.xrandr.get_virtual_screen() - except RuntimeError as e: - self.onError.emit(str(e)) - return - clip = f"{virt.width}x{virt.height}+{virt.x_offset}+{virt.y_offset}" - arg = f"x11vnc -rfbport {port} -clip {clip} {options}" - if self.vncUsePassword: - arg += f" -rfbauth {X11VNC_PASSWORD_PATH}" - self.vncServer.run(arg) - # auto stop on exit - atexit.register(self.stopVNC, force=True) - - @pyqtSlot(str) - def openDisplaySetting(self, app: str = "arandr"): - # define callbacks - def _connected(): - print("External Display Setting opened.") - - def _received(data): - pass - - def _ended(exitCode): - print("External Display Setting closed.") - self.onDisplaySettingClosed.emit() - if exitCode is not 0: - self.onError.emit(f'Error opening "{running_program}".') - with open(DATA_PATH, 'r') as f: - data = json.load(f)['displaySettingApps'] - if app not in data: - self.onError.emit('Wrong display settings program') - return - program_list = [data[app]['args'], "arandr"] - program = AsyncSubprocess(_connected, _received, _received, _ended, None) - running_program = '' - for arg in program_list: - if not shutil.which(shlex.split(arg)[0]): - continue - running_program = arg - program.run(arg) - return - self.onError.emit('Failed to find a display settings program.\n' - 'Please install ARandR package.\n' - '(e.g. sudo apt-get install arandr)\n' - 'Please issue a feature request\n' - 'if you wish to add a display settings\n' - 'program for your Desktop Environment.') - - @pyqtSlot() - def stopVNC(self, force=False): - if force: - # Usually called from atexit(). - self.vncServer.close() - time.sleep(3) # Make sure X11VNC shutdown before execute next atexit(). - if self._vncState in (self.VNCState.WAITING, self.VNCState.CONNECTED): - self.vncServer.close() - else: - self.onError.emit("stopVNC called while it is not running") - - @pyqtSlot() - def clearCache(self): - engine.clearComponentCache() - - @pyqtSlot() - def quitProgram(self): - self.blockSignals(True) # This will prevent invoking auto-restart or etc. - QApplication.instance().quit() - - -class Cursor(QObject): - """ Global mouse cursor position """ - - def __init__(self, parent=None): - super(Cursor, self).__init__(parent) - - @pyqtProperty(int) - def x(self): - cursor = QCursor().pos() - return cursor.x() - - @pyqtProperty(int) - def y(self): - cursor = QCursor().pos() - return cursor.y() - - -class Network(QObject): - """ Backend class for network interfaces """ - onIPAddressesChanged = pyqtSignal() - - def __init__(self, parent=None): - super(Network, self).__init__(parent) - - @pyqtProperty('QStringList', notify=onIPAddressesChanged) - def ipAddresses(self): - for interface in interfaces(): - if interface == 'lo': - continue - addresses = ifaddresses(interface).get(AF_INET, None) - if addresses is None: - continue - for link in addresses: - if link is not None: - yield link['addr'] - - -# ------------------------------------------------------------------------------- -# Main Code -# ------------------------------------------------------------------------------- -def main() -> None: - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, - description='Make your iPad/tablet/computer as a secondary monitor on Linux.\n\n' - 'You can start VirtScreen in the following two modes:\n\n' - ' - GUI mode: A system tray icon will appear when no argument passed.\n' - ' You need to use this first to configure a virtual screen.\n' - ' - CLI mode: After configured the virtual screen, you can start VirtScreen\n' - ' in CLI mode if you do not want a GUI, by passing any arguments\n', - epilog='example:\n' - 'virtscreen # GUI mode. You need to use this first\n' - ' to configure the screen\n' - 'virtscreen --auto # CLI mode. Scrren will be created using previous\n' - ' settings (from both GUI mode and CLI mode)\n' - 'virtscreen --left # CLI mode. On the left to the primary monitor\n' - 'virtscreen --below # CLI mode. Below the primary monitor.\n' - 'virtscreen --below --portrait # Below, and portrait mode.\n' - 'virtscreen --below --portrait --hipdi # Below, portrait, HiDPI mode.\n') - parser.add_argument('--auto', action='store_true', - help='create a virtual screen automatically using previous\n' - 'settings (from both GUI mode and CLI mode)') - parser.add_argument('--left', action='store_true', - help='a virtual screen will be created left to the primary\n' - 'monitor') - parser.add_argument('--right', action='store_true', - help='right to the primary monitor') - parser.add_argument('--above', '--up', action='store_true', - help='above the primary monitor') - parser.add_argument('--below', '--down', action='store_true', - help='below the primary monitor') - parser.add_argument('--portrait', action='store_true', - help='Portrait mode. Width and height of the screen are swapped') - parser.add_argument('--hidpi', action='store_true', - help='HiDPI mode. Width and height are doubled') - # Add signal handler - def on_exit(self, signum=None, frame=None): - sys.exit(0) - for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT]: - signal.signal(sig, on_exit) - # Start main - args = parser.parse_args() - if any(vars(args).values()): - main_cli(args) - else: - main_gui() - print('Program should not reach here.') - sys.exit(1) - -def check_env(msg: Callable[[str], None]) -> None: - if os.environ['XDG_SESSION_TYPE'].lower() == 'wayland': - msg("Currently Wayland is not supported") - sys.exit(1) - if not HOME_PATH: - msg("Cannot detect home directory.") - sys.exit(1) - if not os.path.exists(HOME_PATH): - try: - os.makedirs(HOME_PATH) - except: - msg("Cannot create ~/.config/virtscreen") - sys.exit(1) - if not shutil.which('x11vnc'): - msg("x11vnc is not installed.") - sys.exit(1) - try: - test = XRandR() - except RuntimeError as e: - msg(str(e)) - sys.exit(1) - -def main_gui(): - QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) - app = QApplication(sys.argv) - loop = QEventLoop(app) - asyncio.set_event_loop(loop) - - # Check environment first - from PyQt5.QtWidgets import QMessageBox, QSystemTrayIcon - def dialog(message: str) -> None: - QMessageBox.critical(None, "VirtScreen", message) - if not QSystemTrayIcon.isSystemTrayAvailable(): - dialog("Cannot detect system tray on this system.") - sys.exit(1) - check_env(dialog) - - # Replace Twisted reactor with qt5reactor - import qt5reactor # pylint: disable=E0401 - qt5reactor.install() - from twisted.internet import reactor # pylint: disable=E0401 - - app.setWindowIcon(QIcon(ICON_PATH)) - os.environ["QT_QUICK_CONTROLS_STYLE"] = "Material" - # os.environ["QT_QUICK_CONTROLS_STYLE"] = "Fusion" - - # Register the Python type. Its URI is 'People', it's v1.0 and the type - # will be called 'Person' in QML. - qmlRegisterType(DisplayProperty, 'VirtScreen.DisplayProperty', 1, 0, 'DisplayProperty') - qmlRegisterType(Backend, 'VirtScreen.Backend', 1, 0, 'Backend') - qmlRegisterType(Cursor, 'VirtScreen.Cursor', 1, 0, 'Cursor') - qmlRegisterType(Network, 'VirtScreen.Network', 1, 0, 'Network') - - # Create a component factory and load the QML script. - engine = QQmlApplicationEngine() - engine.load(QUrl(MAIN_QML_PATH)) - if not engine.rootObjects(): - dialog("Failed to load QML") - sys.exit(1) - sys.exit(app.exec_()) - with loop: - loop.run_forever() - -def main_cli(args: argparse.Namespace): - loop = asyncio.get_event_loop() - for key, value in vars(args).items(): - print(key, ": ", value) - # Check the environment - check_env(print) - if not os.path.exists(CONFIG_PATH): - print("Configuration file does not exist.\n" - "Configure a virtual screen using GUI first.") - sys.exit(1) - # By instantiating the backend, additional verifications of config - # file will be done. - backend = Backend() - # Get settings - with open(CONFIG_PATH, 'r') as f: - config = json.load(f) - # Override settings from arguments - position = '' - if not args.auto: - args_virt = ['portrait', 'hidpi'] - for prop in args_virt: - if vars(args)[prop]: - config['virt'][prop] = True - args_position = ['left', 'right', 'above', 'below'] - tmp_args = {k: vars(args)[k] for k in args_position} - if not any(tmp_args.values()): - print("Choose a position relative to the primary monitor. (e.g. --left)") - sys.exit(1) - for key, value in tmp_args.items(): - if value: - position = key - # Create virtscreen and Start VNC - def handle_error(msg): - print('Error: ', msg) - sys.exit(1) - backend.onError.connect(handle_error) - backend.createVirtScreen(config['virt']['device'], config['virt']['width'], - config['virt']['height'], config['virt']['portrait'], - config['virt']['hidpi'], position) - def handle_vnc_changed(state): - if state is backend.VNCState.OFF: - sys.exit(0) - backend.onVncStateChanged.connect(handle_vnc_changed) - backend.startVNC(config['vnc']['port']) - loop.run_forever() - -if __name__ == '__main__': - main() diff --git a/virtscreen/xrandr.py b/virtscreen/xrandr.py new file mode 100644 index 0000000..d794454 --- /dev/null +++ b/virtscreen/xrandr.py @@ -0,0 +1,138 @@ +"""XRandr parser""" + +import re +import atexit +import subprocess +from typing import List + +from .display import Display +from .process import SubprocessWrapper + + +VIRT_SCREEN_SUFFIX = "_virt" + + +class XRandR(SubprocessWrapper): + """XRandr parser class""" + + def __init__(self): + super(XRandR, self).__init__() + self.mode_name: str + self.screens: List[Display] = [] + self.virt: Display() = None + self.primary: Display() = None + self.virt_name: str = '' + self.virt_idx: int = None + self.primary_idx: int = None + # Primary display + self._update_screens() + + def _update_screens(self) -> None: + output = self.run("xrandr") + self.primary = None + self.virt = None + self.screens = [] + self.virt_idx = None + self.primary_idx = None + pattern = re.compile(r"^(\S*)\s+(connected|disconnected)\s+((primary)\s+)?" + r"((\d+)x(\d+)\+(\d+)\+(\d+)\s+)?.*$", re.M) + for idx, match in enumerate(pattern.finditer(output)): + screen = Display() + screen.name = match.group(1) + if self.virt_name and screen.name == self.virt_name: + self.virt_idx = idx + screen.primary = True if match.group(4) else False + if screen.primary: + self.primary_idx = idx + screen.connected = True if match.group(2) == "connected" else False + screen.active = True if match.group(5) else False + self.screens.append(screen) + if not screen.active: + continue + screen.width = int(match.group(6)) + screen.height = int(match.group(7)) + screen.x_offset = int(match.group(8)) + screen.y_offset = int(match.group(9)) + print("Display information:") + for s in self.screens: + print("\t", s) + if self.primary_idx is None: + raise RuntimeError("There is no primary screen detected.\n" + "Go to display settings and set\n" + "a primary screen\n") + if self.virt_idx == self.primary_idx: + raise RuntimeError("Virtual screen must be selected other than the primary screen") + if self.virt_idx is not None: + self.virt = self.screens[self.virt_idx] + elif self.virt_name and self.virt_idx is None: + raise RuntimeError("No virtual screen name found") + self.primary = self.screens[self.primary_idx] + + def _add_screen_mode(self, width, height, portrait, hidpi) -> None: + if not self.virt or not self.virt_name: + raise RuntimeError("No virtual screen selected.\n" + "Go to Display->Virtual Display->Advaced\n" + "To select a device.") + # Set virtual screen property first + self.virt.width = width + self.virt.height = height + if portrait: + self.virt.width = height + self.virt.height = width + if hidpi: + self.virt.width *= 2 + self.virt.height *= 2 + self.mode_name = str(self.virt.width) + "x" + str(self.virt.height) + VIRT_SCREEN_SUFFIX + # Then create using xrandr command + args_addmode = f"xrandr --addmode {self.virt.name} {self.mode_name}" + try: + self.check_output(args_addmode) + except subprocess.CalledProcessError: + # When failed create mode and then add again + output = self.run(f"cvt {self.virt.width} {self.virt.height}") + mode = re.search(r"^.*Modeline\s*\".*\"\s*(.*)$", output, re.M).group(1) + # Create new screen mode + self.check_output(f"xrandr --newmode {self.mode_name} {mode}") + # Add mode again + self.check_output(args_addmode) + # After adding mode the program should delete the mode automatically on exit + atexit.register(self.delete_virtual_screen) + + def get_primary_screen(self) -> Display: + self._update_screens() + return self.primary + + def get_virtual_screen(self) -> Display: + self._update_screens() + return self.virt + + def create_virtual_screen(self, width, height, portrait=False, hidpi=False, pos='') -> None: + self._update_screens() + print("creating: ", self.virt) + self._add_screen_mode(width, height, portrait, hidpi) + arg_pos = ['left', 'right', 'above', 'below'] + xrandr_pos = ['--left-of', '--right-of', '--above', '--below'] + if pos and pos in arg_pos: + # convert pos for xrandr + pos = xrandr_pos[arg_pos.index(pos)] + pos += ' ' + self.primary.name + elif not pos: + pos = '--preferred' + else: + raise RuntimeError("Incorrect position option selected.") + self.check_output(f"xrandr --output {self.virt.name} --mode {self.mode_name}") + self.check_output("sleep 5") + self.check_output(f"xrandr --output {self.virt.name} {pos}") + self._update_screens() + + def delete_virtual_screen(self) -> None: + self._update_screens() + try: + self.virt.name + self.mode_name + except AttributeError: + return + self.run(f"xrandr --output {self.virt.name} --off") + self.run(f"xrandr --delmode {self.virt.name} {self.mode_name}") + atexit.unregister(self.delete_virtual_screen) + self._update_screens()