feat: add hdds/temps/version to display

* Write a mock ustorage tool to parse disk info for ulcmd
* Pass system temp info from unvr-fan-daemon to mock-ubnt-api via tmp file
* add mock ubnt-tools to make ulcmd happy and report our SN
* code cleanup and documentation update
This commit is contained in:
Chris Blake 2024-05-26 14:58:01 -05:00
parent a4f7f862c2
commit 9c08f287fc
7 changed files with 268 additions and 58 deletions

View file

@ -86,14 +86,12 @@ To restore back to the factory UNVR-Pro firmware, you can do the following steps
* Installation is Hard * Installation is Hard
* Need to simplify the install process, this should be much easier once I can get latest GPL kernel source (no more uboot env stuff) * Need to simplify the install process, this should be much easier once I can get latest GPL kernel source (no more uboot env stuff)
* Touchscreen
* Disks do not populate ATM, because there is no grpc service mocking/replacing ustated, which ulcmd uses to get disk info.
* OpenMediaVault * OpenMediaVault
* BTRFS does not work, period * BTRFS does not work, period
* No kernel module in UBNT kernel, need new kernel source and we can make so many things better... * No kernel module in UBNT kernel, need new kernel source and we can make so many things better...
* Might try to build an out-of-tree module for this, more research needed * Might try to build an out-of-tree module for this, more research needed
* Reset Button * Reset Button
* Does literally nothing ATM, not sure if it's worth having it do something or not * Only works to reboot the system
## Disclaimer ## Disclaimer

View file

@ -1,16 +1,39 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os
from flask import Flask, jsonify from flask import Flask, jsonify
from functools import lru_cache
import socket import socket
def __get_system_temp():
try:
with open("/tmp/.unvr_temp", "r") as f:
return float(f.read())
except (IOError, OSError, PermissionError) as e:
print(f"Warning: Unable to get device temp!")
return None
@lru_cache(None)
def __get_omv_version():
return os.popen("dpkg-query -W -f='${Version}' openmediavault").read()
app = Flask(__name__) app = Flask(__name__)
@app.route("/api/info") @app.route("/api/info")
def api_info(): def api_info():
print(socket.gethostname())
payload = { payload = {
"isSetup": True, "isSetup": True,
"hostname": socket.gethostname(), "hostname": socket.gethostname(),
"hardware": {
"firmwareVersion": f"OMV {__get_omv_version()}", # OMV version
},
"cpu": {
"temperature": __get_system_temp(),
},
} }
return jsonify(payload) return jsonify(payload)
@ -18,7 +41,7 @@ def api_info():
# No controllers for you # No controllers for you
@app.route("/api/controllers") @app.route("/api/controllers")
def api_controllers(): def api_controllers():
return jsonify({}) return jsonify([])
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -0,0 +1,13 @@
#!/bin/bash
if [ "$1" == "id" ]; then
BOARD_SYSID=$(grep systemid /proc/ubnthal/system.info | sed 's|systemid=||g')
BOARD_SERIALNO=$(grep serialno /proc/ubnthal/system.info | sed 's|serialno=||g')
# BOM is in SPI on EEPROM part, offset D000 is start of TlvInfo. BOM starts with 113-
BOARD_BOM=$(dd if=/dev/mtd4 bs=64 skip=832 count=1 status=none | strings | grep 113-)
echo "board.sysid=0x${BOARD_SYSID}"
echo "board.serialno=${BOARD_SERIALNO}"
echo "board.bom=${BOARD_BOM}"
else
echo "Unknown ubnt-tools cmd: $@" >> /tmp/ubnt-tools-unknown.log
fi

View file

@ -1,10 +1,9 @@
#!/usr/bin/python3 #!/usr/bin/python3
from functools import lru_cache
import os import os
import re import re
import time import time
UBNTHAL_PATH = "/proc/ubnthal/system.info" from ubnthelpers import get_ubnt_shortname
SMARTCTL_PATH = "/usr/sbin/smartctl" SMARTCTL_PATH = "/usr/sbin/smartctl"
@ -25,28 +24,15 @@ THERMAL_SYS_PATHS = {
} }
@lru_cache(None)
def __get_ubnt_device():
try:
with open(UBNTHAL_PATH, "r") as f:
ubnthal_model = [i for i in f.readlines() if i.startswith("shortname=")][0]
return ubnthal_model.lstrip("shortname=").rstrip("\n")
except FileNotFoundError:
print(
f"Error: unable to open {UBNTHAL_PATH}; is the ubnthal kernel module loaded?!"
)
raise
def __get_board_temps(): def __get_board_temps():
# Are we supported? # Are we supported?
if __get_ubnt_device() not in THERMAL_SYS_PATHS: if get_ubnt_shortname() not in THERMAL_SYS_PATHS:
raise Exception( raise Exception(
f"Error: Your Unifi device of {__get_ubnt_device()} is not yet supported by unvr-fan-daemon! Exiting..." f"Error: Your Unifi device of {get_ubnt_shortname()} is not yet supported by unvr-fan-daemon! Exiting..."
) )
# For each of our paths, get the temps, and append to single return list # For each of our paths, get the temps, and append to single return list
board_temps = [] board_temps = []
for path in THERMAL_SYS_PATHS[__get_ubnt_device()]["thermal"]: for path in THERMAL_SYS_PATHS[get_ubnt_shortname()]["thermal"]:
try: try:
with open(path, "r") as f: with open(path, "r") as f:
board_temps.append(int(f.readline().rstrip("\n"))) board_temps.append(int(f.readline().rstrip("\n")))
@ -74,6 +60,7 @@ def __get_disk_temps():
# For each disk, get the temp, and append to our list # For each disk, get the temp, and append to our list
disk_temps = [] disk_temps = []
for dev in devices: for dev in devices:
# Sadly this is slow, SMART data pulls are not fast...
dev_temp = re.search( dev_temp = re.search(
r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$", r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
os.popen(f"{SMARTCTL_PATH} -A {dev}").read(), os.popen(f"{SMARTCTL_PATH} -A {dev}").read(),
@ -104,21 +91,28 @@ def __calculate_fan_speed(temp):
def __set_fan_speed(speed: int): def __set_fan_speed(speed: int):
# Set the fans # Set the fans
for fan in THERMAL_SYS_PATHS[__get_ubnt_device()]["fan_pwms"]: for fan in THERMAL_SYS_PATHS[get_ubnt_shortname()]["fan_pwms"]:
try: try:
with open(fan, "w") as f: with open(fan, "w") as f:
f.write(str(speed)) f.write(str(speed))
except FileNotFoundError: except FileNotFoundError:
print( print(
f"Error: Unable to write to PWM at {path}! Why can't we set fan speed!?" f"Error: Unable to write to PWM at {fan}! Why can't we set fan speed!?"
) )
raise raise
if __name__ == "__main__": def __write_out_temp(temp: int, path: str = "/tmp/.unvr_temp"):
# Trigger our model load so it's cached try:
__get_ubnt_device() with open(path, "w+") as f:
f.write(str(temp))
except (IOError, OSError, PermissionError) as e:
print(
f"Warning: Unable to write to temp file at {path}; ulcmd won't get the system temp! Error was: {e}"
)
if __name__ == "__main__":
# Cache so we only write to PWMs if this changes # Cache so we only write to PWMs if this changes
last_fanspeed = 0 last_fanspeed = 0
@ -126,16 +120,20 @@ if __name__ == "__main__":
# Start with debug write to max speed so we hear it :) # Start with debug write to max speed so we hear it :)
__set_fan_speed(255) __set_fan_speed(255)
time.sleep(1) time.sleep(0.5)
# Start our main loop # Start our main loop
while True: while True:
# Get the fanspeed we wanna set based on temps # Get the fanspeed we wanna set based on temps
temp = ( temp = (
sorted(__get_board_temps() + __get_disk_temps(), reverse=True)[0] // 1000 sorted(__get_board_temps() + __get_disk_temps(), reverse=True)[0] / 1000.0
) # Move temp to C, ignore decimals ) # Move temp to C
fanspeed = __calculate_fan_speed(temp) fanspeed = __calculate_fan_speed(temp)
# Write out for consumption by ulcmd via mock-ubnt-api
__write_out_temp(temp)
# If there's a change in calculated fan speed, set it # If there's a change in calculated fan speed, set it
if last_fanspeed != fanspeed: if last_fanspeed != fanspeed:
print(f"Setting fan PWMs to {fanspeed} due to temp of {temp}C") print(f"Setting fan PWMs to {fanspeed} due to temp of {temp}C")
@ -143,4 +141,4 @@ if __name__ == "__main__":
last_fanspeed = fanspeed last_fanspeed = fanspeed
# Sleep and run again # Sleep and run again
time.sleep(5) time.sleep(10)

View file

@ -0,0 +1,141 @@
#!/usr/bin/python3
import sys
import os
import re
import json
from ubnthelpers import get_ubnt_shortname
DEVICE_DISK_INFO = {
"UNVRPRO": {
"scsi_map": {
1: "7:0:0:0",
2: "5:0:0:0",
3: "3:0:0:0",
4: "6:0:0:0",
5: "4:0:0:0",
6: "0:0:0:0",
7: "2:0:0:0",
}
},
}
SMARTCTL_PATH = "/usr/sbin/smartctl"
def __read_file(path: str):
with open(path) as f:
s = f.read().rstrip("\n").rstrip()
return s
def __parse_smartctl(input: str, regex: str):
search = re.search(
regex,
input,
re.MULTILINE,
)
if bool(search):
return search.group(1)
else:
return None
def __find_and_map_disks():
# Are we supported?
if get_ubnt_shortname() not in DEVICE_DISK_INFO:
raise Exception(
f"Error: Your Unifi device of {get_ubnt_shortname()} is not yet supported by ustorage! Exiting..."
)
# For each of our scsi IDs, see if we exist in proc (aka a disk is there)
ustorage_response = []
for port, scsi_id in DEVICE_DISK_INFO[get_ubnt_shortname()]["scsi_map"].items():
if os.path.exists(f"/sys/class/scsi_disk/{scsi_id}"):
# Disk is here, now find out what sd device it is so we can get drive deets
blkdirlist = os.listdir(f"/sys/class/scsi_disk/{scsi_id}/device/block")
if len(blkdirlist) > 0 and blkdirlist[0].startswith("sd"):
# We found our disk, it has a /dev/sd* entry
# Let's get our smartdata we need
disk_node = blkdirlist[0]
disk_smartdata = os.popen(
f"{SMARTCTL_PATH} -iHA /dev/{disk_node}"
).read()
disk_temp = int(
__parse_smartctl(
disk_smartdata,
r"^194 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
)
disk_bad_sectors = int(
__parse_smartctl(
disk_smartdata,
r"^ 5 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
)
disk_size = int(__read_file(f"/sys/block/{disk_node}/size")) * int(
__read_file(f"/sys/block/{disk_node}/queue/logical_block_size")
)
# Do we pass SMART testing?
if "PASSED" in __parse_smartctl(
disk_smartdata,
r"SMART overall-health self-assessment test result:\s*(.*)",
):
disk_state = "normal"
else:
disk_state = "failed"
# Are we an SSD?
if __parse_smartctl(
disk_smartdata, r"Rotation Rate:\s+Solid State Device*(.)"
):
disk_type = "SSD"
# SSD disks also need to report their life_span
disk_span_raw = __parse_smartctl(
disk_smartdata,
r"^231 [\w-]+\s+0x\d+\s+\d+\s+\d+\s+\d+\s+[\w-]+\s+\w+\s+\S+\s+(\d+)(?:\s[\(][^)]*[\)])?$",
)
life_span = 100 # Default assume
# Did we have SMART value 231?
if disk_span_raw:
life_span = 100 - int(disk_span_raw)
else:
disk_type = "HDD"
life_span = None
# Generate and add our disk object
diskdata = {
"bad_sector": disk_bad_sectors,
"estimate": None, # No idea what this is for, maybe rebuilds?
"model": __read_file(
f"/sys/class/scsi_disk/{scsi_id}/device/model"
),
"node": disk_node,
"size": disk_size,
"slot": port,
"state": disk_state,
"temperature": disk_temp,
"type": disk_type,
"life_span": life_span,
}
ustorage_response.append(diskdata)
else:
print(
f"Error: Unable to find block device name for disk at SCSI ID ${scsi_id}! Exiting..."
)
sys.exit(1)
else:
# Disk doesn't exist, add the offline entry.
nodisk = {"healthy": "none", "reason": [], "slot": port, "state": "nodisk"}
ustorage_response.append(nodisk)
return json.dumps(ustorage_response)
if __name__ == "__main__":
# Yes this is dirty and lazy, but this should be the only cmd ulcmd calls
try:
if len(sys.argv) == 3 and sys.argv[1] == "disk" and sys.argv[2] == "inspect":
print(__find_and_map_disks())
except Exception as e:
raise

View file

@ -0,0 +1,21 @@
#!/usr/bin/python3
from functools import lru_cache
"""
A handful of Ubiquiti Unifi device specific functions
"""
UBNTHAL_PATH = "/proc/ubnthal/system.info"
@lru_cache(None)
def get_ubnt_shortname() -> str:
try:
with open(UBNTHAL_PATH, "r") as f:
ubnthal_model = [i for i in f.readlines() if i.startswith("shortname=")][0]
return ubnthal_model.lstrip("shortname=").rstrip("\n")
except FileNotFoundError:
print(
f"Error: unable to open {UBNTHAL_PATH}; is the ubnthal kernel module loaded?!"
)
raise

View file

@ -1,14 +1,12 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
import binascii
import mmap import mmap
import sys import sys
import zlib import zlib
from io import BytesIO
from pathlib import Path from pathlib import Path
def main(fwfile: str, savedir: str): def main(fwfile: str, savedir: str):
# Does the file exist? # Does the file exist?
if not Path(fwfile).is_file(): if not Path(fwfile).is_file():
@ -21,7 +19,7 @@ def main(fwfile: str, savedir: str):
Path(savedir).mkdir(parents=True) Path(savedir).mkdir(parents=True)
# Start parsing the OTA file # Start parsing the OTA file
with open(fwfile, 'rb+') as f: with open(fwfile, "rb+") as f:
# Read from disk, not memory # Read from disk, not memory
mm = mmap.mmap(f.fileno(), 0) mm = mmap.mmap(f.fileno(), 0)
@ -30,12 +28,16 @@ def main(fwfile: str, savedir: str):
# Do we have the UBNT header? # Do we have the UBNT header?
if ubntheader[0:4].decode("utf-8") != "UBNT": if ubntheader[0:4].decode("utf-8") != "UBNT":
print(f"Error: {fwfile} is missing the UBNT header! Is this the right firmware file?") print(
f"Error: {fwfile} is missing the UBNT header! Is this the right firmware file?"
)
sys.exit(1) sys.exit(1)
# Is the header CRC valid? # Is the header CRC valid?
if zlib.crc32(ubntheader) != ubntheadercrc: if zlib.crc32(ubntheader) != ubntheadercrc:
print(f"Error: {fwfile} has in incorrect CRC for it's header! Please re-download the file!") print(
f"Error: {fwfile} has in incorrect CRC for it's header! Please re-download the file!"
)
sys.exit(1) sys.exit(1)
# If we are here, that's a great sign :) # If we are here, that's a great sign :)
@ -43,10 +45,9 @@ def main(fwfile: str, savedir: str):
print(f"Loaded in firmware file {ubnt_fw_ver_string}") print(f"Loaded in firmware file {ubnt_fw_ver_string}")
# Start parsing out all of the files in the OTA # Start parsing out all of the files in the OTA
#files = []
fcount = 1 fcount = 1
while True: while True:
file_header_offset = mm.find(b'\x46\x49\x4C\x45') # FILE in hex bye string file_header_offset = mm.find(b"\x46\x49\x4C\x45") # FILE in hex bye string
# Are we done scanning the file? # Are we done scanning the file?
if file_header_offset == -1: if file_header_offset == -1:
break break
@ -54,24 +55,37 @@ def main(fwfile: str, savedir: str):
# We found one, seek to it # We found one, seek to it
mm.seek(file_header_offset) mm.seek(file_header_offset)
file_header = mm.read(0x38) # Entire header file_header = mm.read(0x38) # Entire header
file_position = file_header[39] # Increments with files read, can be used to validate this is a file for us file_position = file_header[
file_location = file_header_offset+0x38 # header location - header = data :) 39
] # Increments with files read, can be used to validate this is a file for us
file_location = (
file_header_offset + 0x38
) # header location - header = data :)
# Is this a VALID file?! # Is this a VALID file?!
if fcount == file_position: if fcount == file_position:
file_name = file_header[4:33].decode("utf-8").rstrip('\x00') # Name/type of FILE file_name = (
file_header[4:33].decode("utf-8").rstrip("\x00")
) # Name/type of FILE
file_length = int(file_header[48:52].hex(), 16) file_length = int(file_header[48:52].hex(), 16)
print(f"{file_name} is at offset {"0x%0.2X" % file_location}, {file_length} bytes") print(
f"{file_name} is at offset {"0x%0.2X" % file_location}, {file_length} bytes"
)
# print(int(file_header[52:56].hex(), 16)) # Maybe reserved memory or partition size? We don't use this tho # print(int(file_header[52:56].hex(), 16)) # Maybe reserved memory or partition size? We don't use this tho
#files.extend([(file_position, file_name, file_location, file_length)]) fcount = fcount + 1 # Increment on find!
fcount = fcount+1 # Increment on find!
fcontents = mm.read(file_length) # Read into memory fcontents = mm.read(file_length) # Read into memory
file_footer_crc32 = mm.read(0x8)[0:4] # Read in tailing 8 bytes (crc32) footer, but we only want the first 4 file_footer_crc32 = mm.read(0x8)[
0:4
] # Read in tailing 8 bytes (crc32) footer, but we only want the first 4
# Does our calculated crc32 match the unifi footer in the img? # Does our calculated crc32 match the unifi footer in the img?
if hex(zlib.crc32(file_header + fcontents)).lstrip('0x') != file_footer_crc32.hex().lstrip('0'): if hex(zlib.crc32(file_header + fcontents)).lstrip(
print(f"Error: Contents of {file_name} does not match the Unifi CRC! Please re-download the file!") "0x"
) != file_footer_crc32.hex().lstrip("0"):
print(
f"Error: Contents of {file_name} does not match the Unifi CRC! Please re-download the file!"
)
sys.exit(1) sys.exit(1)
# Write file out since our mmap position is now AT the data, and we parsed the length # Write file out since our mmap position is now AT the data, and we parsed the length
@ -87,6 +101,8 @@ def main(fwfile: str, savedir: str):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser("ubnt-fw-parse for Dream Machines") parser = argparse.ArgumentParser("ubnt-fw-parse for Dream Machines")
parser.add_argument("file", help="The Ubiquiti firmware file to parse", type=str) parser.add_argument("file", help="The Ubiquiti firmware file to parse", type=str)
parser.add_argument("savedir", help="The directory to save the parsed files to", type=str) parser.add_argument(
"savedir", help="The directory to save the parsed files to", type=str
)
args = parser.parse_args() args = parser.parse_args()
main(args.file, args.savedir) main(args.file, args.savedir)