#!/usr/bin/python3 import sys import os import re import json from ubnthelpers import get_ubnt_shortname DEVICE_DISK_INFO = { "UNVRPRO": { "scsi_map": { 1: "7:0:0:0", 2: "5:0:0:0", 3: "3:0:0:0", 4: "6:0:0:0", 5: "4:0:0:0", 6: "0:0:0:0", 7: "2:0:0:0", } }, } SMARTCTL_PATH = "/usr/sbin/smartctl" def __read_file(path: str): with open(path) as f: s = f.read().rstrip("\n").rstrip() return s def __parse_smartctl(input: str, regex: str): search = re.search( regex, input, re.MULTILINE, ) if bool(search): return search.group(1) else: return None def __find_and_map_disks(): # Are we supported? if get_ubnt_shortname() not in DEVICE_DISK_INFO: raise Exception( f"Error: Your Unifi device of {get_ubnt_shortname()} is not yet supported by ustorage! Exiting..." ) # For each of our scsi IDs, see if we exist in proc (aka a disk is there) ustorage_response = [] for port, scsi_id in DEVICE_DISK_INFO[get_ubnt_shortname()]["scsi_map"].items(): if os.path.exists(f"/sys/class/scsi_disk/{scsi_id}"): # Disk is here, now find out what sd device it is so we can get drive deets blkdirlist = os.listdir(f"/sys/class/scsi_disk/{scsi_id}/device/block") if len(blkdirlist) > 0 and blkdirlist[0].startswith("sd"): # We found our disk, it has a /dev/sd* entry # Let's get our smartdata we need disk_node = blkdirlist[0] disk_smartdata = os.popen( f"{SMARTCTL_PATH} -iHA /dev/{disk_node}" ).read() disk_temp = int( __parse_smartctl( disk_smartdata, r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) ) disk_bad_sectors = int( __parse_smartctl( disk_smartdata, r"^ 5 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) ) disk_size = int(__read_file(f"/sys/block/{disk_node}/size")) * int( __read_file(f"/sys/block/{disk_node}/queue/logical_block_size") ) # Do we pass SMART testing? if "PASSED" in __parse_smartctl( disk_smartdata, r"SMART overall-health self-assessment test result:\s*(.*)", ): disk_state = "normal" else: disk_state = "failed" # Are we an SSD? if __parse_smartctl( disk_smartdata, r"Rotation Rate:\s+Solid State Device*(.)" ): disk_type = "SSD" # SSD disks also need to report their life_span disk_span_raw = __parse_smartctl( disk_smartdata, r"^231 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) life_span = 100 # Default assume # Did we have SMART value 231? if disk_span_raw: life_span = 100 - int(disk_span_raw) else: disk_type = "HDD" life_span = None # Generate and add our disk object diskdata = { "bad_sector": disk_bad_sectors, "estimate": None, # No idea what this is for, maybe rebuilds? "model": __read_file( f"/sys/class/scsi_disk/{scsi_id}/device/model" ), "node": disk_node, "size": disk_size, "slot": port, "state": disk_state, "temperature": disk_temp, "type": disk_type, "life_span": life_span, } ustorage_response.append(diskdata) else: print( f"Error: Unable to find block device name for disk at SCSI ID ${scsi_id}! Exiting..." ) sys.exit(1) else: # Disk doesn't exist, add the offline entry. nodisk = {"healthy": "none", "reason": [], "slot": port, "state": "nodisk"} ustorage_response.append(nodisk) return json.dumps(ustorage_response) if __name__ == "__main__": # Yes this is dirty and lazy, but this should be the only cmd ulcmd calls try: if len(sys.argv) == 3 and sys.argv[1] == "disk" and sys.argv[2] == "inspect": print(__find_and_map_disks()) except Exception as e: raise