UNVR-NAS/overlay/filesystem/usr/bin/ustorage

222 lines
7 KiB
Text
Raw Normal View History

#!/usr/bin/python3
import datetime
import json
import os
import re
import sys
from ubnthelpers import get_ubnt_shortname
DEVICE_DISK_INFO = {
"UNVRPRO": {
"scsi_map": {
1: "7:0:0:0",
2: "5:0:0:0",
3: "3:0:0:0",
4: "6:0:0:0",
5: "4:0:0:0",
6: "0:0:0:0",
7: "2:0:0:0",
}
},
}
CACHE_FILE = "/tmp/.ustorage_cache"
SMARTCTL_PATH = "/usr/sbin/smartctl"
class UNVRDiskInfo:
def __init__(self, disk_slot: int, scsi_id: str):
self.disk_slot = disk_slot
self.scsi_id = scsi_id
self.blk_device = None
self.__smartctl_output = None
# Default no disk response
self.__resp = {
"healthy": "none",
"reason": [],
"slot": self.disk_slot,
"state": "nodisk",
}
# Trigger a disk scan, and update smartctl if a disk exists
if self.__scan_for_disk():
self.__smartctl_output = self.__get_smartctl_data()
# Since we exist, build our return payload :)
self.__resp = {
"bad_sector": self.__parse_bad_sector(),
"estimate": None, # Not sure what this is used for sadly
"model": self.__read_file(
f"/sys/class/scsi_disk/{self.scsi_id}/device/model"
),
"node": self.blk_device,
"size": self.__parse_disk_size(),
"slot": self.disk_slot,
"state": self.__parse_disk_state(),
"temperature": self.__parse_disk_temp(),
"type": self.__parse_disk_type(),
"life_span": self.__parse_ssd_life_span(),
}
def __scan_for_disk(self):
# Our path for the SCSI ID should always exist, but play it safe
if os.path.exists(f"/sys/class/scsi_disk/{self.scsi_id}"):
# Now, do we have a block device attached?
blkdirlist = os.listdir(f"/sys/class/scsi_disk/{self.scsi_id}/device/block")
if len(blkdirlist) > 0 and blkdirlist[0].startswith("sd"):
# We found our disk, it has a /dev/sd* entry
self.blk_device = blkdirlist[0]
return True
# No disk, return false
return False
def __get_smartctl_data(self):
# Get our response from smartctl for the device for us to parse later
return os.popen(f"{SMARTCTL_PATH} -iHA /dev/{self.blk_device}").read()
def __parse_smartctl(self, input: str, regex: str):
# Used to assist in parsing smartctl output
search = re.search(
regex,
input,
re.MULTILINE,
)
if bool(search):
return search.group(1)
else:
return None
def __read_file(self, path: str):
with open(path) as f:
s = f.read().rstrip("\n").rstrip()
return s
def __parse_bad_sector(self):
try:
return int(
self.__parse_smartctl(
self.__smartctl_output,
r"^ 5 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
)
except:
return None
def __parse_disk_size(self):
try:
return int(self.__read_file(f"/sys/block/{self.blk_device}/size")) * int(
self.__read_file(
f"/sys/block/{self.blk_device}/queue/logical_block_size"
)
)
except:
return None
def __parse_disk_state(self):
# Do we pass SMART testing?
if "PASSED" in self.__parse_smartctl(
self.__smartctl_output,
r"SMART overall-health self-assessment test result:\s*(.*)",
):
return "normal"
else:
return "failed"
def __parse_disk_temp(self):
try:
return int(
self.__parse_smartctl(
self.__smartctl_output,
r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
)
except:
return None
def __parse_disk_type(self):
if self.__parse_smartctl(
self.__smartctl_output, r"Rotation Rate:\s+Solid State Device*(.)"
):
return "SSD"
else:
return "HDD"
def __parse_ssd_life_span(self):
if self.__parse_disk_type() == "SSD":
disk_span_raw = self.__parse_smartctl(
self.__smartctl_output,
r"^231 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
# Did we have SMART value 231?
if disk_span_raw:
return 100 - int(disk_span_raw)
# Return None if we are HDD, or can't get SSD life
return None
def get_payload(self):
# Return our disk info
return self.__resp
def run_main():
# Are we supported?
if get_ubnt_shortname() not in DEVICE_DISK_INFO:
raise Exception(
f"Error: Your Unifi device of {get_ubnt_shortname()} is not yet supported by ustorage! Exiting..."
)
# Before we do all this work, have we ran before? If so, load in last run data and see if we can use it
cache_data = None
if os.path.isfile(CACHE_FILE):
with open(CACHE_FILE, "r") as f:
cache_data = json.loads(f.read())
# Get current list of block devices
current_block_devs = (
os.popen(
f"{SMARTCTL_PATH}"
+ " --scan | grep 'dev' | awk '{print $1}' | sed -e 's|/dev/||'"
)
.read()
.splitlines()
)
# If we have a cache, do block devices match, and are we not expired? if so, return
# the cached result instead of regenerating.
if cache_data:
if sorted(cache_data["block_devices"]) == sorted(current_block_devs) and (
datetime.datetime.now().timestamp() < cache_data["expiration"]
):
return json.dumps(cache_data["response"])
# For each of our scsi IDs/ports, get disk info using our class and stash it in our response list
ustorage_response = []
for port, scsi_id in DEVICE_DISK_INFO[get_ubnt_shortname()]["scsi_map"].items():
# Load and append our data
ustorage_response.append(UNVRDiskInfo(port, scsi_id).get_payload())
# Now build our data to save
save_data = {
"block_devices": current_block_devs,
# TODO: Figure out the right timeframe for this expiration
"expiration": (
datetime.datetime.now() + datetime.timedelta(minutes=2)
).timestamp(),
"response": ustorage_response,
}
# Save before we return...
with open(CACHE_FILE, "w") as f:
f.write(json.dumps(save_data))
# And were done here!
return json.dumps(save_data["response"])
if __name__ == "__main__":
# Only work if disk inspect is called
if len(sys.argv) == 3 and sys.argv[1] == "disk" and sys.argv[2] == "inspect":
print(run_main())