Release 0.0.5: health report (M4), installer (M9), update check (M13)
release / release (push) Successful in 13s
release / release (push) Successful in 13s
M4 — health report (the 0.0.4 CHANGELOG entry, folded into this release):
- core/health.py: scan journalctl (Xid/panic/OOM/MCE/AER/thermal), SMART,
NVIDIA driver mismatch, journald persistence, live temps -> findings
- CLI `rigdoctor report` (text/JSON); GUI Health tab; scanner tests
M9 — installer (first cut):
- core/{catalog,sysenv,installer}.py; `rigdoctor install [--check] [-y]`
- GUI Setup tab: detect distro/GPU, show optional components, one-click
install of missing apt packages via pkexec/sudo
M13 — update check (check half):
- core/updates.py; sidebar shows up-to-date / "Update to v…" / unavailable
Plus tests, version bump to 0.0.5, CHANGELOG, and doc status updates.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
|
||||
|
||||
__version__ = "0.0.3"
|
||||
__version__ = "0.0.5"
|
||||
|
||||
+67
-3
@@ -164,9 +164,66 @@ def cmd_record_report(args) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_install(args) -> int:
|
||||
from .core import installer, sysenv
|
||||
|
||||
print(f"Distro: {sysenv.distro_name()}")
|
||||
pm = sysenv.package_manager()
|
||||
print(f"Package manager: {pm or 'none (only apt is supported)'}")
|
||||
print(f"GPU: {', '.join(sysenv.gpu_vendors()) or 'unknown'}\n")
|
||||
|
||||
status = installer.component_status()
|
||||
print("Optional components:")
|
||||
for component, present in status:
|
||||
mark = "✓" if present else "✗"
|
||||
print(f" [{mark}] {component.name:<22} — {component.enables}")
|
||||
if not present:
|
||||
print(f" apt: {' '.join(component.apt)}")
|
||||
|
||||
missing = [c for c, present in status if not present]
|
||||
if not missing:
|
||||
print("\nAll optional components are installed. ✔")
|
||||
return 0
|
||||
|
||||
packages = installer.missing_packages(missing)
|
||||
print(f"\nMissing packages: {' '.join(packages)}")
|
||||
if args.check:
|
||||
return 0
|
||||
if pm != "apt":
|
||||
print(f"Automatic install needs apt. Install manually:\n sudo apt install {' '.join(packages)}")
|
||||
return 1
|
||||
if not args.yes:
|
||||
try:
|
||||
reply = input(f"\nInstall {len(packages)} package(s) now? [y/N] ").strip().lower()
|
||||
except EOFError:
|
||||
reply = "n"
|
||||
if reply not in ("y", "yes"):
|
||||
print("Aborted.")
|
||||
return 1
|
||||
|
||||
print("Installing (you may be prompted for your password)…")
|
||||
rc, out = installer.install_packages(packages)
|
||||
print(out[-2000:])
|
||||
if rc == 0:
|
||||
still = [c.name for c, present in installer.component_status() if not present]
|
||||
print("\nStill missing: " + (", ".join(still) if still else "none ✔"))
|
||||
else:
|
||||
print(f"\nInstall failed (exit {rc}).")
|
||||
return rc
|
||||
|
||||
|
||||
def cmd_report(args) -> int:
|
||||
print("`report` (M4 health report) is not implemented yet — next on the roadmap.")
|
||||
return 2
|
||||
from dataclasses import asdict
|
||||
|
||||
from .core.health import run_health_checks
|
||||
from .render import render_health
|
||||
|
||||
findings = run_health_checks()
|
||||
if args.json:
|
||||
print(json.dumps([asdict(f) for f in findings], indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print(render_health(findings))
|
||||
return 0
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
@@ -188,6 +245,11 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
|
||||
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
|
||||
|
||||
inst = sub.add_parser("install", help="set up optional system dependencies (M9)")
|
||||
inst.add_argument("--check", action="store_true", help="report status only; install nothing")
|
||||
inst.add_argument("-y", "--yes", action="store_true", help="install without confirmation")
|
||||
inst.set_defaults(func=cmd_install)
|
||||
|
||||
rec = sub.add_parser("record", help="crash-capture logger (M3)")
|
||||
rec_sub = rec.add_subparsers(dest="record_cmd", required=True)
|
||||
|
||||
@@ -209,7 +271,9 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
report_p.add_argument("--log", default=None, help="path to a capture log")
|
||||
report_p.set_defaults(func=cmd_record_report)
|
||||
|
||||
sub.add_parser("report", help="health report (coming soon)").set_defaults(func=cmd_report)
|
||||
rep = sub.add_parser("report", help="health report (M4): scan logs/SMART/driver for issues")
|
||||
rep.add_argument("--json", action="store_true", help="output JSON instead of text")
|
||||
rep.set_defaults(func=cmd_report)
|
||||
return p
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Installable component catalog (M9): optional system tools and what they enable.
|
||||
|
||||
apt-only (D15). Core monitoring (M1/M3/M4) needs no packages — these are optional
|
||||
enrichments the installer can add. Each component is detected by a representative
|
||||
command (present == usable).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Component:
|
||||
id: str
|
||||
name: str
|
||||
bundle: str
|
||||
enables: str # capability unlocked when present
|
||||
apt: tuple[str, ...] # apt package name(s)
|
||||
command: str # command used to detect presence
|
||||
|
||||
|
||||
COMPONENTS: tuple[Component, ...] = (
|
||||
Component(
|
||||
"smartmontools", "SMART disk health", "Diagnostics",
|
||||
"Disk health (SMART) in the health report (M4)", ("smartmontools",), "smartctl",
|
||||
),
|
||||
Component(
|
||||
"lm-sensors", "lm-sensors", "Diagnostics",
|
||||
"Extra motherboard / voltage sensors", ("lm-sensors",), "sensors",
|
||||
),
|
||||
Component(
|
||||
"dmidecode", "dmidecode", "Diagnostics",
|
||||
"Motherboard / BIOS / RAM details for system inventory (M5)", ("dmidecode",), "dmidecode",
|
||||
),
|
||||
Component(
|
||||
"pciutils", "pciutils", "Diagnostics",
|
||||
"PCIe topology + GPU detection (lspci)", ("pciutils",), "lspci",
|
||||
),
|
||||
Component(
|
||||
"libnotify", "Desktop notifications", "Monitoring",
|
||||
"Desktop alert notifications (M8)", ("libnotify-bin",), "notify-send",
|
||||
),
|
||||
)
|
||||
@@ -0,0 +1,245 @@
|
||||
"""Health report (M4): scan kernel logs + SMART + driver/library state into a
|
||||
prioritized, plain-language findings list with suggested fixes (read-only, D9).
|
||||
|
||||
Stdlib-only. Every check degrades gracefully — a missing tool/permission yields an
|
||||
info finding, never an exception.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
CRITICAL = "critical"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
OK = "ok"
|
||||
_ORDER = {CRITICAL: 0, WARNING: 1, INFO: 2, OK: 3}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Finding:
|
||||
severity: str # critical | warning | info | ok
|
||||
category: str # GPU, Kernel, Memory, Storage, Thermal, Driver, PCIe, Logs
|
||||
title: str
|
||||
detail: str = ""
|
||||
suggestion: str = ""
|
||||
|
||||
|
||||
# --- NVIDIA Xid knowledge (the seed crash is Xid 79) --------------------------
|
||||
_XID_INFO: dict[int, tuple[str, str]] = {
|
||||
13: (WARNING, "Graphics engine exception (often an app/driver bug or unstable overclock)"),
|
||||
31: (WARNING, "GPU memory page fault (usually a driver or application bug)"),
|
||||
43: (WARNING, "GPU stopped processing a task (application error)"),
|
||||
45: (INFO, "Preemptive channel removal (often a side-effect of another error or a reboot)"),
|
||||
48: (CRITICAL, "Double-bit ECC error — VRAM hardware fault"),
|
||||
62: (CRITICAL, "Internal microcontroller halt (often follows instability)"),
|
||||
79: (CRITICAL, "GPU has fallen off the bus — hardware: power delivery, PCIe link, or thermals"),
|
||||
94: (CRITICAL, "Contained ECC error"),
|
||||
95: (CRITICAL, "Uncontained ECC error"),
|
||||
119: (CRITICAL, "GSP RPC timeout — GPU System Processor hang"),
|
||||
120: (CRITICAL, "GSP error — GPU System Processor fault"),
|
||||
}
|
||||
_XID_SUGGEST: dict[int, str] = {
|
||||
79: "Check PSU/power cables and reseat the GPU/riser; test a lower power limit "
|
||||
"(`sudo nvidia-smi -pl <watts>`) and capture a session with `rigdoctor record`.",
|
||||
48: "Persistent VRAM ECC errors mean failing memory — RMA the card if it recurs.",
|
||||
119: "GSP hangs are often driver-version specific — try a different driver branch.",
|
||||
120: "GSP errors are often driver-version specific — try a different driver branch.",
|
||||
}
|
||||
_XID_RE = re.compile(r"Xid(?:\s*\([^)]*\))?:?\s*(\d+)")
|
||||
|
||||
|
||||
def scan_journal_text(text: str) -> list[Finding]:
|
||||
"""Parse kernel-log text into findings (separated from IO so it's testable)."""
|
||||
lines = text.splitlines()
|
||||
findings: list[Finding] = []
|
||||
|
||||
xids: dict[int, int] = {}
|
||||
for line in lines:
|
||||
if "Xid" in line:
|
||||
m = _XID_RE.search(line)
|
||||
if m:
|
||||
code = int(m.group(1))
|
||||
xids[code] = xids.get(code, 0) + 1
|
||||
for code in sorted(xids):
|
||||
severity, desc = _XID_INFO.get(code, (WARNING, f"NVIDIA GPU error (Xid {code})"))
|
||||
suggest = _XID_SUGGEST.get(code, "Look up this Xid code in NVIDIA's Xid error documentation.")
|
||||
findings.append(Finding(severity, "GPU", f"NVIDIA Xid {code} ×{xids[code]}", desc, suggest))
|
||||
|
||||
oom = sum(1 for ln in lines if "Out of memory" in ln or "oom-kill" in ln or "oom_reaper" in ln)
|
||||
if oom:
|
||||
findings.append(Finding(
|
||||
WARNING, "Memory", f"Out-of-memory kills ×{oom}",
|
||||
"The kernel killed processes to reclaim RAM.",
|
||||
"Close memory-heavy apps, add zram/swap, or investigate a leak.",
|
||||
))
|
||||
|
||||
if any("Kernel panic" in ln for ln in lines):
|
||||
findings.append(Finding(
|
||||
CRITICAL, "Kernel", "Kernel panic recorded",
|
||||
"The kernel hit an unrecoverable error.",
|
||||
"Note the panic message; review recent driver/kernel updates and hardware.",
|
||||
))
|
||||
|
||||
if any("mce:" in ln or "Machine check" in ln or "Hardware Error" in ln for ln in lines):
|
||||
findings.append(Finding(
|
||||
CRITICAL, "Hardware", "Machine Check Exception (MCE)",
|
||||
"The CPU reported a hardware error.",
|
||||
"Run memtest86 for RAM, check CPU temps/voltages, and review the MCE detail.",
|
||||
))
|
||||
|
||||
if any("AER:" in ln or "PCIe Bus Error" in ln or ("pcieport" in ln and "error" in ln.lower()) for ln in lines):
|
||||
findings.append(Finding(
|
||||
WARNING, "PCIe", "PCIe bus errors (AER)",
|
||||
"Correctable/uncorrectable PCIe errors were logged.",
|
||||
"Reseat the device and check risers/cabling; AER storms can precede a GPU drop.",
|
||||
))
|
||||
|
||||
low = [ln.lower() for ln in lines]
|
||||
if any(("thermal" in ln and ("critical" in ln or "throttl" in ln)) or "temperature above threshold" in ln for ln in low):
|
||||
findings.append(Finding(
|
||||
WARNING, "Thermal", "Thermal events logged",
|
||||
"The system logged thermal throttling / critical-temperature events.",
|
||||
"Improve airflow/cooling and check fan curves; watch live temps on the dashboard.",
|
||||
))
|
||||
|
||||
if any("amdgpu" in ln and "reset" in ln for ln in low):
|
||||
findings.append(Finding(
|
||||
CRITICAL, "GPU", "AMD GPU reset (amdgpu)",
|
||||
"The AMD GPU was reset after a hang.",
|
||||
"Check power/thermals/driver; capture a session with `rigdoctor record`.",
|
||||
))
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def _journalctl(args: list[str]) -> str | None:
|
||||
if shutil.which("journalctl") is None:
|
||||
return None
|
||||
try:
|
||||
proc = subprocess.run(["journalctl", *args], capture_output=True, text=True, timeout=25)
|
||||
return proc.stdout
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return None
|
||||
|
||||
|
||||
def check_journal() -> list[Finding]:
|
||||
out = _journalctl(["-k", "--no-pager", "-o", "cat", "--since", "-7 days"])
|
||||
if out is None:
|
||||
return [Finding(
|
||||
INFO, "Logs", "Couldn't read the kernel journal",
|
||||
"journalctl is unavailable or not readable.",
|
||||
"Ensure systemd/journald is present and your user is in the 'systemd-journal' or 'adm' group.",
|
||||
)]
|
||||
findings = scan_journal_text(out)
|
||||
if not findings:
|
||||
findings.append(Finding(
|
||||
OK, "Logs", "No notable kernel errors (last 7 days)",
|
||||
"No Xid, panic, OOM, MCE, PCIe AER, or thermal events found.",
|
||||
))
|
||||
return findings
|
||||
|
||||
|
||||
def check_journal_persistence() -> list[Finding]:
|
||||
if Path("/var/log/journal").is_dir():
|
||||
return []
|
||||
return [Finding(
|
||||
WARNING, "Logs", "journald isn't persistent across reboots",
|
||||
"Crash-boot kernel logs are discarded on reboot, so a hard freeze's evidence can vanish.",
|
||||
"Enable persistent logging: `sudo mkdir -p /var/log/journal && sudo systemctl restart systemd-journald`",
|
||||
)]
|
||||
|
||||
|
||||
def check_nvidia_driver() -> list[Finding]:
|
||||
if shutil.which("nvidia-smi") is None:
|
||||
return []
|
||||
try:
|
||||
proc = subprocess.run(["nvidia-smi"], capture_output=True, text=True, timeout=10)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return []
|
||||
if "Driver/library version mismatch" in (proc.stdout + proc.stderr):
|
||||
return [Finding(
|
||||
CRITICAL, "Driver", "NVIDIA driver/library version mismatch",
|
||||
"The loaded kernel module and the userspace NVIDIA libraries differ — GPU monitoring will fail until resolved.",
|
||||
"Reboot to load the matching module (or finish the interrupted driver update).",
|
||||
)]
|
||||
return []
|
||||
|
||||
|
||||
def _smart_devices() -> list[str]:
|
||||
try:
|
||||
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return []
|
||||
devices = []
|
||||
for line in proc.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("/dev/"):
|
||||
devices.append(line.split()[0])
|
||||
return devices
|
||||
|
||||
|
||||
def check_smart() -> list[Finding]:
|
||||
if shutil.which("smartctl") is None:
|
||||
return [Finding(
|
||||
INFO, "Storage", "SMART not checked (smartmontools missing)",
|
||||
"Disk self-health couldn't be read.",
|
||||
"Install it for disk health checks: `sudo apt install smartmontools`",
|
||||
)]
|
||||
devices = _smart_devices()
|
||||
if not devices:
|
||||
return [Finding(
|
||||
INFO, "Storage", "SMART: couldn't enumerate drives",
|
||||
"Reading SMART usually needs root.",
|
||||
"Run: `sudo rigdoctor report`",
|
||||
)]
|
||||
findings: list[Finding] = []
|
||||
for dev in devices:
|
||||
try:
|
||||
proc = subprocess.run(["smartctl", "-H", dev], capture_output=True, text=True, timeout=15)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
continue
|
||||
combined = proc.stdout + proc.stderr
|
||||
if "Permission denied" in combined or "requires root" in combined.lower():
|
||||
findings.append(Finding(INFO, "Storage", f"SMART for {dev} needs root", "", "Run: `sudo rigdoctor report`"))
|
||||
elif "PASSED" in combined:
|
||||
findings.append(Finding(OK, "Storage", f"SMART OK: {dev}", "Overall-health self-assessment passed."))
|
||||
elif "FAILED" in combined or "FAILING_NOW" in combined:
|
||||
findings.append(Finding(CRITICAL, "Storage", f"SMART FAILED: {dev}", "The drive reports failing health.", "Back up now and replace the drive."))
|
||||
return findings
|
||||
|
||||
|
||||
def check_live_temps() -> list[Finding]:
|
||||
from .sampler import Sampler
|
||||
from .sources import available_sources
|
||||
|
||||
sample = Sampler(available_sources()).sample()
|
||||
hot = [
|
||||
(r.source, r.label or r.metric, r.value)
|
||||
for r in sample.readings
|
||||
if r.unit == "°C" and r.value is not None and r.value >= 90
|
||||
]
|
||||
if not hot:
|
||||
return []
|
||||
worst = max(hot, key=lambda x: x[2])
|
||||
detail = "; ".join(f"{s} {label} {v:.0f}°C" for s, label, v in hot)
|
||||
return [Finding(
|
||||
WARNING, "Thermal", f"High temperature right now ({worst[2]:.0f}°C)",
|
||||
detail, "Check cooling/airflow and reduce load.",
|
||||
)]
|
||||
|
||||
|
||||
def run_health_checks() -> list[Finding]:
|
||||
"""Run all checks and return findings sorted by severity (worst first)."""
|
||||
findings: list[Finding] = []
|
||||
findings += check_nvidia_driver()
|
||||
findings += check_journal()
|
||||
findings += check_journal_persistence()
|
||||
findings += check_smart()
|
||||
findings += check_live_temps()
|
||||
findings.sort(key=lambda f: _ORDER.get(f.severity, 9))
|
||||
return findings
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Optional-dependency installer (M9): figure out what's missing and install it.
|
||||
|
||||
apt-only (D15). Installs run via pkexec/sudo so a normal user gets a single auth
|
||||
prompt; nothing is installed without an explicit confirmation by the caller.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
from collections.abc import Callable
|
||||
|
||||
from . import sysenv
|
||||
from .catalog import COMPONENTS, Component
|
||||
|
||||
|
||||
def component_status(present: Callable[[str], bool] | None = None) -> list[tuple[Component, bool]]:
|
||||
"""Pair each catalog component with whether it's installed (command present)."""
|
||||
present = present or sysenv.has_command
|
||||
return [(c, present(c.command)) for c in COMPONENTS]
|
||||
|
||||
|
||||
def missing_packages(components: list[Component]) -> list[str]:
|
||||
"""De-duplicated apt package list for the given components, order preserved."""
|
||||
packages: list[str] = []
|
||||
for component in components:
|
||||
for pkg in component.apt:
|
||||
if pkg not in packages:
|
||||
packages.append(pkg)
|
||||
return packages
|
||||
|
||||
|
||||
def apt_install_command(packages: list[str]) -> list[str]:
|
||||
"""Build an `apt-get update && install` command, elevated if we're not root."""
|
||||
inner = "apt-get update && apt-get install -y " + " ".join(shlex.quote(p) for p in packages)
|
||||
cmd = ["/bin/sh", "-c", inner]
|
||||
if os.geteuid() == 0:
|
||||
return cmd
|
||||
if shutil.which("pkexec"):
|
||||
return ["pkexec", *cmd]
|
||||
if shutil.which("sudo"):
|
||||
return ["sudo", *cmd]
|
||||
return cmd # no privilege escalation available — will likely fail, surfaced to the caller
|
||||
|
||||
|
||||
def install_packages(packages: list[str]) -> tuple[int, str]:
|
||||
"""Install the given packages. Returns (exit_code, combined_output)."""
|
||||
if not packages:
|
||||
return (0, "Nothing to install.")
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
apt_install_command(packages), capture_output=True, text=True, timeout=900
|
||||
)
|
||||
return (proc.returncode, proc.stdout + proc.stderr)
|
||||
except (subprocess.SubprocessError, OSError) as exc:
|
||||
return (1, str(exc))
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Environment detection for the installer (M9)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
|
||||
def package_manager() -> str | None:
|
||||
"""Only apt is supported (D15); return 'apt' if present, else None."""
|
||||
if shutil.which("apt-get") or shutil.which("apt"):
|
||||
return "apt"
|
||||
return None
|
||||
|
||||
|
||||
def has_command(cmd: str) -> bool:
|
||||
return shutil.which(cmd) is not None
|
||||
|
||||
|
||||
def distro_name() -> str:
|
||||
try:
|
||||
data: dict[str, str] = {}
|
||||
with open("/etc/os-release") as f:
|
||||
for line in f:
|
||||
key, _, value = line.partition("=")
|
||||
data[key.strip()] = value.strip().strip('"')
|
||||
return data.get("PRETTY_NAME") or data.get("NAME") or "Linux"
|
||||
except OSError:
|
||||
return "Linux"
|
||||
|
||||
|
||||
def gpu_vendors() -> list[str]:
|
||||
vendors: list[str] = []
|
||||
if shutil.which("nvidia-smi"):
|
||||
vendors.append("NVIDIA")
|
||||
out = ""
|
||||
if shutil.which("lspci"):
|
||||
try:
|
||||
out = subprocess.run(["lspci"], capture_output=True, text=True, timeout=10).stdout
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
out = ""
|
||||
low = out.lower()
|
||||
if "nvidia" in low and "NVIDIA" not in vendors:
|
||||
vendors.append("NVIDIA")
|
||||
if ("amd/ati" in low or "advanced micro devices" in low or "radeon" in low) and "AMD" not in vendors:
|
||||
vendors.append("AMD")
|
||||
if "intel" in low and any(k in low for k in ("vga", "display", "graphics")) and "Intel" not in vendors:
|
||||
vendors.append("Intel")
|
||||
return vendors
|
||||
@@ -0,0 +1,41 @@
|
||||
"""Update check (M13, check half): ask the Gitea releases API for the latest version.
|
||||
|
||||
Stdlib-only (urllib). Self-update isn't built yet; this only *detects* a newer
|
||||
release. Any failure (network, or the instance requiring sign-in for the API)
|
||||
returns None so callers can degrade gracefully.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import urllib.request
|
||||
|
||||
from .. import __version__
|
||||
|
||||
GITEA_BASE = "https://git.jesseyvanofferen.com"
|
||||
REPO = "jessey/rigdoctor"
|
||||
LATEST_API = f"{GITEA_BASE}/api/v1/repos/{REPO}/releases/latest"
|
||||
RELEASES_PAGE = f"{GITEA_BASE}/{REPO}/releases"
|
||||
|
||||
|
||||
def _parse(version: str) -> tuple[int, ...]:
|
||||
return tuple(int(p) for p in version.lstrip("vV").split(".") if p.isdigit())
|
||||
|
||||
|
||||
def is_newer(latest: str, current: str = __version__) -> bool:
|
||||
try:
|
||||
return _parse(latest) > _parse(current)
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
|
||||
|
||||
def check_latest(timeout: float = 4.0) -> str | None:
|
||||
"""Return the latest release tag (e.g. 'v0.0.5'), or None if it can't be determined."""
|
||||
try:
|
||||
req = urllib.request.Request(LATEST_API, headers={"Accept": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 (https only)
|
||||
data = json.load(resp)
|
||||
tag = data.get("tag_name")
|
||||
return tag or None
|
||||
except Exception:
|
||||
return None
|
||||
@@ -0,0 +1,125 @@
|
||||
"""Health page (M4 in the GUI): runs the health checks and shows findings as cards."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from PySide6.QtCore import Qt, QTimer, Signal
|
||||
from PySide6.QtWidgets import (
|
||||
QFrame,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QScrollArea,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from .theme import ACCENT, CRIT, GOOD, MUTED, WARN
|
||||
|
||||
_SEV = {
|
||||
"critical": ("CRITICAL", CRIT),
|
||||
"warning": ("WARNING", WARN),
|
||||
"info": ("INFO", MUTED),
|
||||
"ok": ("OK", GOOD),
|
||||
}
|
||||
|
||||
|
||||
def _finding_widget(finding) -> QFrame:
|
||||
label, color = _SEV.get(finding.severity, ("?", MUTED))
|
||||
card = QFrame()
|
||||
card.setObjectName("Card")
|
||||
v = QVBoxLayout(card)
|
||||
v.setContentsMargins(16, 12, 16, 12)
|
||||
v.setSpacing(4)
|
||||
|
||||
head = QLabel(f"{label} · {finding.category}: {finding.title}")
|
||||
head.setStyleSheet(f"color: {color}; font-weight: 700; background: transparent;")
|
||||
head.setWordWrap(True)
|
||||
v.addWidget(head)
|
||||
|
||||
if finding.detail:
|
||||
detail = QLabel(finding.detail)
|
||||
detail.setObjectName("Muted")
|
||||
detail.setWordWrap(True)
|
||||
v.addWidget(detail)
|
||||
if finding.suggestion:
|
||||
suggestion = QLabel(f"→ {finding.suggestion}")
|
||||
suggestion.setStyleSheet(f"color: {ACCENT}; background: transparent;")
|
||||
suggestion.setWordWrap(True)
|
||||
v.addWidget(suggestion)
|
||||
return card
|
||||
|
||||
|
||||
class HealthPage(QWidget):
|
||||
_result = Signal(object) # list[Finding]
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setObjectName("Page")
|
||||
self._result.connect(self._render_findings)
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
root.setSpacing(16)
|
||||
|
||||
header = QHBoxLayout()
|
||||
title = QLabel("Health")
|
||||
title.setObjectName("PageTitle")
|
||||
header.addWidget(title)
|
||||
header.addStretch(1)
|
||||
self._status = QLabel("")
|
||||
self._status.setObjectName("Muted")
|
||||
header.addWidget(self._status)
|
||||
self._run_btn = QPushButton("Run health report")
|
||||
self._run_btn.setObjectName("PrimaryButton")
|
||||
self._run_btn.clicked.connect(self._run)
|
||||
header.addWidget(self._run_btn)
|
||||
root.addLayout(header)
|
||||
|
||||
scroll = QScrollArea()
|
||||
scroll.setWidgetResizable(True)
|
||||
scroll.setFrameShape(QFrame.Shape.NoFrame)
|
||||
scroll.setStyleSheet("background: transparent;")
|
||||
self._container = QWidget()
|
||||
self._list = QVBoxLayout(self._container)
|
||||
self._list.setContentsMargins(0, 0, 0, 0)
|
||||
self._list.setSpacing(10)
|
||||
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
|
||||
scroll.setWidget(self._container)
|
||||
root.addWidget(scroll, 1)
|
||||
|
||||
QTimer.singleShot(300, self._run) # auto-run shortly after the window opens
|
||||
|
||||
def _run(self) -> None:
|
||||
self._run_btn.setEnabled(False)
|
||||
self._status.setText("Scanning logs, SMART, and driver…")
|
||||
threading.Thread(target=self._work, daemon=True).start()
|
||||
|
||||
def _work(self) -> None:
|
||||
from ..core.health import run_health_checks
|
||||
|
||||
try:
|
||||
findings = run_health_checks()
|
||||
except Exception:
|
||||
findings = []
|
||||
self._result.emit(findings)
|
||||
|
||||
def _render_findings(self, findings) -> None:
|
||||
while self._list.count():
|
||||
item = self._list.takeAt(0)
|
||||
w = item.widget()
|
||||
if w is not None:
|
||||
w.deleteLater()
|
||||
|
||||
crit = sum(1 for f in findings if f.severity == "critical")
|
||||
warn = sum(1 for f in findings if f.severity == "warning")
|
||||
self._status.setText(
|
||||
f"{crit} critical · {warn} warning · {len(findings)} checks · "
|
||||
f"{time.strftime('%H:%M:%S')}"
|
||||
)
|
||||
for finding in findings:
|
||||
self._list.addWidget(_finding_widget(finding))
|
||||
self._list.addStretch(1)
|
||||
self._run_btn.setEnabled(True)
|
||||
@@ -2,7 +2,10 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from PySide6.QtCore import Qt
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, QUrl, Signal
|
||||
from PySide6.QtGui import QDesktopServices
|
||||
from PySide6.QtWidgets import (
|
||||
QButtonGroup,
|
||||
QFrame,
|
||||
@@ -16,19 +19,23 @@ from PySide6.QtWidgets import (
|
||||
)
|
||||
|
||||
from .. import __version__
|
||||
from ..core import updates
|
||||
from .dashboard import Dashboard
|
||||
from .health_page import HealthPage
|
||||
from .recorder_page import RecorderPage
|
||||
from .theme import ACCENT, MUTED
|
||||
from .setup_page import SetupPage
|
||||
from .theme import ACCENT, GOOD, MUTED
|
||||
from .worker import SamplerWorker
|
||||
|
||||
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Inventory"]
|
||||
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Setup", "Inventory"]
|
||||
_PLACEHOLDERS = {
|
||||
"Health": "The health report (M4) — log scan + plain-language findings — lands here.",
|
||||
"Inventory": "System inventory (M5) — CPU/GPU/board/RAM/drivers — lands here.",
|
||||
}
|
||||
|
||||
|
||||
class MainWindow(QMainWindow):
|
||||
_update_checked = Signal(object) # latest tag (str) or None
|
||||
|
||||
def __init__(self, interval: float = 1.0) -> None:
|
||||
super().__init__()
|
||||
self.setWindowTitle("RigDoctor")
|
||||
@@ -48,10 +55,13 @@ class MainWindow(QMainWindow):
|
||||
self._stack = QStackedWidget()
|
||||
self.dashboard = Dashboard()
|
||||
self.recorder_page = RecorderPage()
|
||||
self.health_page = HealthPage()
|
||||
self.setup_page = SetupPage()
|
||||
self._stack.addWidget(self.dashboard) # 0 Dashboard
|
||||
self._stack.addWidget(self.recorder_page) # 1 Logs
|
||||
self._stack.addWidget(self._placeholder_page("Health", _PLACEHOLDERS["Health"])) # 2
|
||||
self._stack.addWidget(self._placeholder_page("Inventory", _PLACEHOLDERS["Inventory"])) # 3
|
||||
self._stack.addWidget(self.health_page) # 2 Health
|
||||
self._stack.addWidget(self.setup_page) # 3 Setup
|
||||
self._stack.addWidget(self._placeholder_page("Inventory", _PLACEHOLDERS["Inventory"])) # 4
|
||||
content_layout.addWidget(self._stack)
|
||||
|
||||
layout.addWidget(self._build_sidebar())
|
||||
@@ -61,6 +71,10 @@ class MainWindow(QMainWindow):
|
||||
self._worker.sampled.connect(self.dashboard.update_sample)
|
||||
self._worker.start()
|
||||
|
||||
# Background update check (M13); result lands in the sidebar.
|
||||
self._update_checked.connect(self._show_update_state)
|
||||
threading.Thread(target=self._check_updates, daemon=True).start()
|
||||
|
||||
def _build_sidebar(self) -> QFrame:
|
||||
bar = QFrame()
|
||||
bar.setObjectName("Sidebar")
|
||||
@@ -95,8 +109,33 @@ class MainWindow(QMainWindow):
|
||||
version = QLabel(f"v{__version__}")
|
||||
version.setObjectName("Muted")
|
||||
v.addWidget(version)
|
||||
|
||||
# Update state (filled in by the background check).
|
||||
self._update_label = QLabel("checking for updates…")
|
||||
self._update_label.setObjectName("Muted")
|
||||
v.addWidget(self._update_label)
|
||||
self._update_btn = QPushButton()
|
||||
self._update_btn.setObjectName("PrimaryButton")
|
||||
self._update_btn.setCursor(Qt.CursorShape.PointingHandCursor)
|
||||
self._update_btn.clicked.connect(lambda: QDesktopServices.openUrl(QUrl(updates.RELEASES_PAGE)))
|
||||
self._update_btn.setVisible(False)
|
||||
v.addWidget(self._update_btn)
|
||||
return bar
|
||||
|
||||
def _check_updates(self) -> None:
|
||||
self._update_checked.emit(updates.check_latest())
|
||||
|
||||
def _show_update_state(self, latest) -> None:
|
||||
if not latest:
|
||||
self._update_label.setText("update check unavailable")
|
||||
return
|
||||
if updates.is_newer(latest, __version__):
|
||||
self._update_label.setText(f'<span style="color:{GOOD};">{latest} available</span>')
|
||||
self._update_btn.setText(f"Update to {latest}")
|
||||
self._update_btn.setVisible(True)
|
||||
else:
|
||||
self._update_label.setText("up-to-date")
|
||||
|
||||
def _placeholder_page(self, title: str, description: str) -> QWidget:
|
||||
page = QWidget()
|
||||
page.setObjectName("Page")
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
"""Setup page (M9 in the GUI): show environment + optional components, install missing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtWidgets import (
|
||||
QFrame,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QTextEdit,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from ..core import installer, sysenv
|
||||
from .theme import GOOD, MUTED
|
||||
|
||||
|
||||
def _panel(title: str) -> tuple[QFrame, QVBoxLayout]:
|
||||
frame = QFrame()
|
||||
frame.setObjectName("Card")
|
||||
frame.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Maximum)
|
||||
layout = QVBoxLayout(frame)
|
||||
layout.setContentsMargins(16, 14, 16, 14)
|
||||
layout.setSpacing(8)
|
||||
label = QLabel(title)
|
||||
label.setStyleSheet("font-weight: 700; background: transparent;")
|
||||
layout.addWidget(label)
|
||||
return frame, layout
|
||||
|
||||
|
||||
class SetupPage(QWidget):
|
||||
_installed = Signal(int, str)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setObjectName("Page")
|
||||
self._installed.connect(self._on_installed)
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
root.setSpacing(16)
|
||||
|
||||
title = QLabel("Setup")
|
||||
title.setObjectName("PageTitle")
|
||||
root.addWidget(title)
|
||||
|
||||
env_card, env_layout = _panel("Environment")
|
||||
self._env = QLabel("")
|
||||
self._env.setObjectName("Muted")
|
||||
env_layout.addWidget(self._env)
|
||||
root.addWidget(env_card)
|
||||
|
||||
comp_card, comp_layout = _panel("Optional components")
|
||||
self._components = QVBoxLayout()
|
||||
self._components.setSpacing(6)
|
||||
comp_layout.addLayout(self._components)
|
||||
controls = QHBoxLayout()
|
||||
self._install_btn = QPushButton("Install missing")
|
||||
self._install_btn.setObjectName("PrimaryButton")
|
||||
self._install_btn.clicked.connect(self._install)
|
||||
self._refresh_btn = QPushButton("Re-check")
|
||||
self._refresh_btn.clicked.connect(self._refresh)
|
||||
controls.addWidget(self._install_btn)
|
||||
controls.addWidget(self._refresh_btn)
|
||||
controls.addStretch(1)
|
||||
comp_layout.addLayout(controls)
|
||||
root.addWidget(comp_card)
|
||||
|
||||
self._output = QTextEdit()
|
||||
self._output.setObjectName("Report")
|
||||
self._output.setReadOnly(True)
|
||||
self._output.setMinimumHeight(180)
|
||||
self._output.setVisible(False)
|
||||
root.addWidget(self._output)
|
||||
root.addStretch(1)
|
||||
|
||||
self._refresh()
|
||||
|
||||
def _refresh(self) -> None:
|
||||
self._env.setText(
|
||||
f"Distro: {sysenv.distro_name()} "
|
||||
f"Package manager: {sysenv.package_manager() or 'none (apt required)'} "
|
||||
f"GPU: {', '.join(sysenv.gpu_vendors()) or 'unknown'}"
|
||||
)
|
||||
while self._components.count():
|
||||
item = self._components.takeAt(0)
|
||||
w = item.widget()
|
||||
if w is not None:
|
||||
w.deleteLater()
|
||||
|
||||
status = installer.component_status()
|
||||
for component, present in status:
|
||||
mark = "✓" if present else "✗"
|
||||
color = GOOD if present else MUTED
|
||||
row = QLabel(f"<span style='color:{color}'>[{mark}]</span> "
|
||||
f"<b>{component.name}</b> — {component.enables}")
|
||||
row.setTextFormat(Qt.TextFormat.RichText)
|
||||
row.setWordWrap(True)
|
||||
self._components.addWidget(row)
|
||||
|
||||
self._missing = [c for c, present in status if not present]
|
||||
self._install_btn.setEnabled(bool(self._missing) and sysenv.package_manager() == "apt")
|
||||
if not self._missing:
|
||||
self._install_btn.setText("All installed ✔")
|
||||
|
||||
def _install(self) -> None:
|
||||
packages = installer.missing_packages(self._missing)
|
||||
if not packages:
|
||||
return
|
||||
self._install_btn.setEnabled(False)
|
||||
self._install_btn.setText("Installing… (may prompt for password)")
|
||||
self._output.setVisible(True)
|
||||
self._output.setPlainText(f"Installing: {' '.join(packages)}\n")
|
||||
threading.Thread(target=self._work, args=(packages,), daemon=True).start()
|
||||
|
||||
def _work(self, packages: list[str]) -> None:
|
||||
rc, out = installer.install_packages(packages)
|
||||
self._installed.emit(rc, out)
|
||||
|
||||
def _on_installed(self, rc: int, out: str) -> None:
|
||||
self._output.setPlainText(out[-4000:])
|
||||
self._install_btn.setText("Install missing")
|
||||
self._refresh()
|
||||
@@ -99,6 +99,25 @@ def _aggregate_peaks(maxima: dict) -> list[tuple[str, str, float, str, float, st
|
||||
return rows
|
||||
|
||||
|
||||
_SEV_LABEL = {"critical": "CRITICAL", "warning": "WARNING", "info": "INFO", "ok": "OK"}
|
||||
|
||||
|
||||
def render_health(findings: list) -> str:
|
||||
if not findings:
|
||||
return "Health report: no findings."
|
||||
crit = sum(1 for f in findings if f.severity == "critical")
|
||||
warn = sum(1 for f in findings if f.severity == "warning")
|
||||
lines = ["Health report", "", f" {crit} critical · {warn} warning · {len(findings)} checks", ""]
|
||||
for f in findings:
|
||||
lines.append(f"[{_SEV_LABEL.get(f.severity, '?')}] {f.category}: {f.title}")
|
||||
if f.detail:
|
||||
lines.append(f" {f.detail}")
|
||||
if f.suggestion:
|
||||
lines.append(f" → {f.suggestion}")
|
||||
lines.append("")
|
||||
return "\n".join(lines).rstrip()
|
||||
|
||||
|
||||
def render_summary(summary: Summary, log_path=None) -> str:
|
||||
if summary.samples == 0 and not summary.events:
|
||||
where = f" ({log_path})" if log_path else ""
|
||||
|
||||
Reference in New Issue
Block a user