#!/usr/bin/python3 import datetime import json import os import re import sys from ubnthelpers import get_ubnt_shortname DEVICE_DISK_INFO = { "UNVRPRO": { "scsi_map": { 1: "7:0:0:0", 2: "5:0:0:0", 3: "3:0:0:0", 4: "6:0:0:0", 5: "4:0:0:0", 6: "0:0:0:0", 7: "2:0:0:0", } }, } CACHE_FILE = "/tmp/.ustorage_cache" SMARTCTL_PATH = "/usr/sbin/smartctl" class UNVRDiskInfo: def __init__(self, disk_slot: int, scsi_id: str): self.disk_slot = disk_slot self.scsi_id = scsi_id self.blk_device = None self.__smartctl_output = None # Default no disk response self.__resp = { "healthy": "none", "reason": [], "slot": self.disk_slot, "state": "nodisk", } # Trigger a disk scan, and update smartctl if a disk exists if self.__scan_for_disk(): self.__smartctl_output = self.__get_smartctl_data() # Since we exist, build our return payload :) self.__resp = { "bad_sector": self.__parse_bad_sector(), "estimate": None, # Not sure what this is used for sadly "model": self.__read_file( f"/sys/class/scsi_disk/{self.scsi_id}/device/model" ), "node": self.blk_device, "size": self.__parse_disk_size(), "slot": self.disk_slot, "state": self.__parse_disk_state(), "temperature": self.__parse_disk_temp(), "type": self.__parse_disk_type(), "life_span": self.__parse_ssd_life_span(), } def __scan_for_disk(self): # Our path for the SCSI ID should always exist, but play it safe if os.path.exists(f"/sys/class/scsi_disk/{self.scsi_id}"): # Now, do we have a block device attached? blkdirlist = os.listdir(f"/sys/class/scsi_disk/{self.scsi_id}/device/block") if len(blkdirlist) > 0 and blkdirlist[0].startswith("sd"): # We found our disk, it has a /dev/sd* entry self.blk_device = blkdirlist[0] return True # No disk, return false return False def __get_smartctl_data(self): # Get our response from smartctl for the device for us to parse later return os.popen(f"{SMARTCTL_PATH} -iHA /dev/{self.blk_device}").read() def __parse_smartctl(self, input: str, regex: str): # Used to assist in parsing smartctl output search = re.search( regex, input, re.MULTILINE, ) if bool(search): return search.group(1) else: return None def __read_file(self, path: str): with open(path) as f: s = f.read().rstrip("\n").rstrip() return s def __parse_bad_sector(self): try: return int( self.__parse_smartctl( self.__smartctl_output, r"^ 5 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) ) except: return None def __parse_disk_size(self): try: return int(self.__read_file(f"/sys/block/{self.blk_device}/size")) * int( self.__read_file( f"/sys/block/{self.blk_device}/queue/logical_block_size" ) ) except: return None def __parse_disk_state(self): # Do we pass SMART testing? if "PASSED" in self.__parse_smartctl( self.__smartctl_output, r"SMART overall-health self-assessment test result:\s*(.*)", ): return "normal" else: return "failed" def __parse_disk_temp(self): try: return int( self.__parse_smartctl( self.__smartctl_output, r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) ) except: return None def __parse_disk_type(self): if self.__parse_smartctl( self.__smartctl_output, r"Rotation Rate:\s+Solid State Device*(.)" ): return "SSD" else: return "HDD" def __parse_ssd_life_span(self): if self.__parse_disk_type() == "SSD": disk_span_raw = self.__parse_smartctl( self.__smartctl_output, r"^231 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", ) # Did we have SMART value 231? if disk_span_raw: return 100 - int(disk_span_raw) # Return None if we are HDD, or can't get SSD life return None def get_payload(self): # Return our disk info return self.__resp def run_main(): # Are we supported? if get_ubnt_shortname() not in DEVICE_DISK_INFO: raise Exception( f"Error: Your Unifi device of {get_ubnt_shortname()} is not yet supported by ustorage! Exiting..." ) # Before we do all this work, have we ran before? If so, load in last run data and see if we can use it cache_data = None if os.path.isfile(CACHE_FILE): with open(CACHE_FILE, "r") as f: cache_data = json.loads(f.read()) # Get current list of block devices current_block_devs = ( os.popen( f"{SMARTCTL_PATH}" + " --scan | grep 'dev' | awk '{print $1}' | sed -e 's|/dev/||'" ) .read() .splitlines() ) # If we have a cache, do block devices match, and are we not expired? if so, return # the cached result instead of regenerating. if cache_data: if sorted(cache_data["block_devices"]) == sorted(current_block_devs) and ( datetime.datetime.now().timestamp() < cache_data["expiration"] ): return json.dumps(cache_data["response"]) # For each of our scsi IDs/ports, get disk info using our class and stash it in our response list ustorage_response = [] for port, scsi_id in DEVICE_DISK_INFO[get_ubnt_shortname()]["scsi_map"].items(): # Load and append our data ustorage_response.append(UNVRDiskInfo(port, scsi_id).get_payload()) # Now build our data to save save_data = { "block_devices": current_block_devs, # TODO: Figure out the right timeframe for this expiration "expiration": ( datetime.datetime.now() + datetime.timedelta(minutes=2) ).timestamp(), "response": ustorage_response, } # Save before we return... with open(CACHE_FILE, "w") as f: f.write(json.dumps(save_data)) # And were done here! return json.dumps(save_data["response"]) if __name__ == "__main__": # Only work if disk inspect is called if len(sys.argv) == 3 and sys.argv[1] == "disk" and sys.argv[2] == "inspect": print(run_main())