Compare commits
17 Commits
0f9cb4b684
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 5996fbdc30 | |||
|
8f4824f576
|
|||
|
edc2166011
|
|||
|
31ecf67ca7
|
|||
| ac4863b0d4 | |||
| b65f36bb2d | |||
| 410f8882ee | |||
| 1da7816741 | |||
| 31178bace8 | |||
| fb468e83c2 | |||
| b20e8dfc3a | |||
| 9fe9a6576f | |||
| d405bf7caf | |||
| 4bbc0fa97e | |||
|
a0f8055328
|
|||
| 323451428b | |||
|
81c7757546
|
@@ -5,6 +5,31 @@ All notable changes to RigDoctor are recorded here. Format follows
|
||||
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
|
||||
release tag (so the auto-updater, D18, can compare versions).
|
||||
|
||||
## [0.43.0] - 2026-05-29
|
||||
### Added
|
||||
- **GPU stress test + close thermal monitoring** (`rigdoctor stress`, and a "Stress test…" button
|
||||
on System Health). Runs a GPU load and samples sensors at a high rate (default 0.5 s), then
|
||||
reports per-metric min/avg/**peak**, how long the core spent above each temperature threshold,
|
||||
power vs the limit, throttling (decoded from the NVML clocks-event bitmask), and any GPU **fault**
|
||||
(Xid / VA-space freeze / a query-timeout hang) that hit during the window — the on-demand way to
|
||||
reproduce load-correlated crashes. The load comes from an explicit `--command` (a game or a tool
|
||||
like gpu-burn), an auto-detected loader (gpu-burn/vkmark/glmark2/vkcube), or **monitor-only** when
|
||||
none is found (you launch the game; it tracks temps while you play).
|
||||
- **Drive health & wear in the health report.** A new `core/drives.py` parses the full
|
||||
`smartctl --json` for every drive into prioritized findings: the SMART verdict, a derived
|
||||
**life-left %** (NVMe `percentage_used` or the SATA wear-leveling attribute), **power-on hours**,
|
||||
data written (TBW), temperature, and the early-failure predictors (reallocated / pending /
|
||||
offline-uncorrectable sectors, NVMe media errors, low available spare). Replaces the old
|
||||
pass/fail-only SMART check; flows through the same elevated path (GUI launch / `sudo rigdoctor
|
||||
report`), degrading to per-drive "needs root" notes unprivileged.
|
||||
|
||||
### Fixed
|
||||
- **GUI "Add game…" can now link a launcher.** The dialog only asked for a name, so a custom
|
||||
game (e.g. SPT) couldn't be given its launch command or log folder from the app — those were
|
||||
CLI-only, leaving it unlaunchable from the GUI. It's now a proper form: name + an optional
|
||||
launch command/script (with a **Browse…** file picker) + an optional log folder (auto-detected
|
||||
from the script's folder when left blank).
|
||||
|
||||
## [0.42.0] - 2026-05-29
|
||||
### Added
|
||||
- **Detect hard freezes that log no Xid.** The kernel-log scanner caught Xid codes, OOM, panic,
|
||||
|
||||
@@ -30,6 +30,16 @@ freeze are usually lost. RigDoctor pulls it together and keeps the evidence.
|
||||
or share a live **terminal session** for remote help.
|
||||
- **Self-updating** — `apt upgrade`, or the in-app updater.
|
||||
|
||||
## Screenshots
|
||||
|
||||
| Dashboard | Inventory |
|
||||
|---|---|
|
||||
|  |  |
|
||||
|
||||
**Share** — a read-only or interactive terminal session over the relay, for remote help:
|
||||
|
||||

|
||||
|
||||
## Install
|
||||
|
||||
### Debian / Ubuntu — `.deb`
|
||||
@@ -42,30 +52,20 @@ apt pulls the GUI dependencies (PySide6, pyte) automatically:
|
||||
sudo apt install ./rigdoctor_*_all.deb # CLI only: add --no-install-recommends
|
||||
```
|
||||
|
||||
**Or add the apt repository** for `apt install` + automatic updates. The registry is public and
|
||||
GPG-signed — no token needed; just add the signing key and a deb822 source:
|
||||
**Or add the apt repository** for `apt install` + automatic updates (the registry is public and
|
||||
GPG-signed — no token needed):
|
||||
|
||||
```bash
|
||||
# signing key → dearmored into the keyring
|
||||
sudo install -d -m 0755 /etc/apt/keyrings
|
||||
curl -fsSL https://git.jesseyvanofferen.com/api/packages/jessey/debian/repository.key \
|
||||
| sudo gpg --dearmor -o /etc/apt/keyrings/gitea-jessey.gpg
|
||||
|
||||
# the source (modern deb822 format, GPG-verified, all-arch)
|
||||
sudo tee /etc/apt/sources.list.d/rigdoctor.sources >/dev/null <<'EOF'
|
||||
Types: deb
|
||||
URIs: https://git.jesseyvanofferen.com/api/packages/jessey/debian
|
||||
Suites: stable
|
||||
Components: main
|
||||
Architectures: all
|
||||
Signed-By: /etc/apt/keyrings/gitea-jessey.gpg
|
||||
EOF
|
||||
|
||||
sudo apt update && sudo apt install rigdoctor
|
||||
sudo curl https://git.jesseyvanofferen.com/api/packages/jessey/debian/repository.key -o /etc/apt/keyrings/gitea-jessey.asc
|
||||
echo "deb [arch=all signed-by=/etc/apt/keyrings/gitea-jessey.asc] https://git.jesseyvanofferen.com/api/packages/jessey/debian stable main" | sudo tee /etc/apt/sources.list.d/gitea.list
|
||||
sudo apt update
|
||||
sudo apt install rigdoctor
|
||||
```
|
||||
|
||||
Then `sudo apt upgrade` keeps it current.
|
||||
|
||||
Then `sudo apt upgrade` keeps it current.
|
||||
|
||||
### Any distro — self-extracting `.run` (no root)
|
||||
|
||||
Download **`rigdoctor-<version>-installer.run`** from the releases page and run it. It installs
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 171 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 141 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 78 KiB |
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "rigdoctor"
|
||||
version = "0.42.0"
|
||||
version = "0.43.0"
|
||||
description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
|
||||
|
||||
__version__ = "0.42.0"
|
||||
__version__ = "0.43.0"
|
||||
|
||||
+51
-2
@@ -298,10 +298,10 @@ def cmd_collect_priv(args) -> int:
|
||||
"""Internal: emit root-only data (SMART + dmidecode) as JSON, run via pkexec at launch."""
|
||||
from dataclasses import asdict
|
||||
|
||||
from .core.health import check_smart
|
||||
from .core import drives
|
||||
from .core.inventory import _dmidecode
|
||||
|
||||
data = {"smart": [asdict(f) for f in check_smart()], "dmidecode": _dmidecode()}
|
||||
data = {"drives": [asdict(d) for d in drives.collect()], "dmidecode": _dmidecode()}
|
||||
print(json.dumps(data))
|
||||
return 0
|
||||
|
||||
@@ -640,6 +640,47 @@ def cmd_games_remove(args) -> int:
|
||||
return 1
|
||||
|
||||
|
||||
def cmd_stress(args) -> int:
|
||||
import shlex as _shlex
|
||||
|
||||
from .core import stress
|
||||
from .render import format_raw, render_stress
|
||||
|
||||
command = _shlex.split(args.command) if args.command else None
|
||||
if not args.json:
|
||||
loaders = stress.available_loaders()
|
||||
if command:
|
||||
print(f"Stressing with: {' '.join(command)}")
|
||||
elif loaders:
|
||||
print(f"Stressing with auto-detected loader: {loaders[0]}")
|
||||
else:
|
||||
print("No GPU load tool found and no --command given — MONITOR-ONLY mode.")
|
||||
print(f" Launch the game/app now; I'll closely track temps for up to {int(args.duration)}s.")
|
||||
print(f" Sampling every {args.interval:g}s. Press Ctrl-C to stop early.\n")
|
||||
|
||||
def _tick(sample, elapsed) -> None:
|
||||
by = {r.key: r for r in sample.readings}
|
||||
bits = [f"{elapsed:5.0f}s"]
|
||||
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
|
||||
("gpu.util", "util"), ("gpu.clock.core", "clk")):
|
||||
r = by.get(key)
|
||||
if r is not None and r.value is not None:
|
||||
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
|
||||
print(" " + " ".join(bits) + " ", end="\r", flush=True)
|
||||
|
||||
result = stress.run(duration=args.duration, interval=args.interval, command=command,
|
||||
on_tick=None if args.json else _tick)
|
||||
if not args.json:
|
||||
print() # end the live line
|
||||
|
||||
if args.json:
|
||||
from dataclasses import asdict
|
||||
print(json.dumps(asdict(result), indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print(render_stress(result))
|
||||
return 0 if result.severity in ("ok", "info") else 1
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
prog="rigdoctor",
|
||||
@@ -657,6 +698,14 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI")
|
||||
mp.set_defaults(func=cmd_monitor)
|
||||
|
||||
st = sub.add_parser("stress", help="GPU stress + close thermal monitoring (repro load crashes)")
|
||||
st.add_argument("-d", "--duration", type=float, default=120.0, help="run for this many seconds (default 120)")
|
||||
st.add_argument("-n", "--interval", type=float, default=0.5, help="sampling interval in seconds (default 0.5)")
|
||||
st.add_argument("--command", default=None,
|
||||
help="load generator to run (e.g. a game or 'gpu-burn 60'); omit to auto-detect or monitor-only")
|
||||
st.add_argument("--json", action="store_true", help="output JSON")
|
||||
st.set_defaults(func=cmd_stress)
|
||||
|
||||
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
|
||||
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
|
||||
|
||||
|
||||
@@ -0,0 +1,229 @@
|
||||
"""Drive health & wear (M-drives): per-disk SMART stats parsed from smartctl JSON.
|
||||
|
||||
Unlike a GPU, storage exposes a real health/wear story, so this reads it in full: the overall
|
||||
SMART verdict, a derived **life-left %** (NVMe ``percentage_used`` or the SATA wear-leveling
|
||||
attribute), **power-on hours** (the drive's runtime), data written (TBW), temperature, and the
|
||||
early-failure predictors (reallocated / pending / offline-uncorrectable sectors, NVMe media
|
||||
errors, available spare). Turned into prioritized health findings.
|
||||
|
||||
smartctl needs root, so collection runs through the same elevated path as the other root-only
|
||||
checks (``rigdoctor collect-priv`` via pkexec at GUI launch, or ``sudo rigdoctor report``).
|
||||
Parsing is JSON-based (smartctl ``--json``), which is stable across drive types. Stdlib only;
|
||||
degrades gracefully — no smartctl, no root, or an unparseable device yields an info finding.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .health import CRITICAL, INFO, OK, WARNING, Finding
|
||||
|
||||
# NVMe writes are counted in 512-KB "data units"; 1 unit = 1000 * 512 bytes.
|
||||
_NVME_UNIT_BYTES = 512_000
|
||||
_LBA_BYTES = 512 # SATA Total_LBAs_Written counts 512-byte sectors
|
||||
|
||||
|
||||
@dataclass
|
||||
class DriveHealth:
|
||||
device: str
|
||||
model: str = ""
|
||||
kind: str = "" # "nvme" | "sata" | "scsi"
|
||||
passed: bool | None = None # SMART overall verdict; None if unknown / needs root
|
||||
needs_root: bool = False
|
||||
health_pct: int | None = None # derived life-left %
|
||||
percent_used: int | None = None # NVMe wear used %
|
||||
power_on_hours: int | None = None
|
||||
temp_c: int | None = None
|
||||
data_written_tb: float | None = None
|
||||
reallocated: int | None = None # SATA reallocated sectors (id 5)
|
||||
pending: int | None = None # SATA current-pending sectors (id 197)
|
||||
offline_uncorrectable: int | None = None # SATA id 198
|
||||
available_spare: int | None = None # NVMe %
|
||||
available_spare_threshold: int | None = None
|
||||
media_errors: int | None = None # NVMe
|
||||
|
||||
|
||||
# --- collection (root) ----------------------------------------------------------------
|
||||
|
||||
def _scan_devices() -> list[str]:
|
||||
try:
|
||||
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return []
|
||||
return [ln.split()[0] for ln in proc.stdout.splitlines() if ln.strip().startswith("/dev/")]
|
||||
|
||||
|
||||
def _smartctl_json(device: str) -> dict | None:
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["smartctl", "--json=c", "-H", "-A", "-i", device],
|
||||
capture_output=True, text=True, timeout=20,
|
||||
)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return None
|
||||
try:
|
||||
return json.loads(proc.stdout)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _ata_attr(data: dict, attr_id: int) -> int | None:
|
||||
for row in data.get("ata_smart_attributes", {}).get("table", []):
|
||||
if row.get("id") == attr_id:
|
||||
raw = row.get("raw", {})
|
||||
return raw.get("value")
|
||||
return None
|
||||
|
||||
|
||||
def _ata_norm_value(data: dict, attr_id: int) -> int | None:
|
||||
"""The normalized 'value' (100→0 life indicator) for an ATA attribute."""
|
||||
for row in data.get("ata_smart_attributes", {}).get("table", []):
|
||||
if row.get("id") == attr_id:
|
||||
return row.get("value")
|
||||
return None
|
||||
|
||||
|
||||
def parse(device: str, data: dict | None) -> DriveHealth:
|
||||
"""Build a DriveHealth from smartctl JSON (pure-ish; no IO of its own)."""
|
||||
d = DriveHealth(device=device)
|
||||
if not data:
|
||||
d.needs_root = True
|
||||
return d
|
||||
|
||||
d.model = data.get("model_name") or data.get("scsi_model_name") or ""
|
||||
proto = (data.get("device", {}).get("protocol") or "").lower()
|
||||
d.kind = "nvme" if "nvme" in proto else ("sata" if "ata" in proto else (proto or ""))
|
||||
|
||||
status = data.get("smart_status")
|
||||
if isinstance(status, dict) and "passed" in status:
|
||||
d.passed = bool(status["passed"])
|
||||
else:
|
||||
# No verdict and a non-zero exit usually means we couldn't open the device (needs root).
|
||||
if data.get("smartctl", {}).get("exit_status", 0) and not status:
|
||||
d.needs_root = True
|
||||
|
||||
temp = data.get("temperature", {}).get("current")
|
||||
d.temp_c = int(temp) if isinstance(temp, (int, float)) else None
|
||||
poh = data.get("power_on_time", {}).get("hours")
|
||||
d.power_on_hours = int(poh) if isinstance(poh, (int, float)) else None
|
||||
|
||||
if d.kind == "nvme":
|
||||
log = data.get("nvme_smart_health_information_log", {})
|
||||
d.percent_used = log.get("percentage_used")
|
||||
d.available_spare = log.get("available_spare")
|
||||
d.available_spare_threshold = log.get("available_spare_threshold")
|
||||
d.media_errors = log.get("media_errors")
|
||||
if d.temp_c is None and isinstance(log.get("temperature"), (int, float)):
|
||||
d.temp_c = int(log["temperature"])
|
||||
units = log.get("data_units_written")
|
||||
if isinstance(units, (int, float)):
|
||||
d.data_written_tb = round(units * _NVME_UNIT_BYTES / 1e12, 2)
|
||||
if isinstance(d.percent_used, (int, float)):
|
||||
d.health_pct = max(0, 100 - int(d.percent_used))
|
||||
else: # SATA / ATA
|
||||
d.reallocated = _ata_attr(data, 5)
|
||||
d.pending = _ata_attr(data, 197)
|
||||
d.offline_uncorrectable = _ata_attr(data, 198)
|
||||
lbas = _ata_attr(data, 241) # Total_LBAs_Written
|
||||
if isinstance(lbas, (int, float)) and lbas > 0:
|
||||
d.data_written_tb = round(lbas * _LBA_BYTES / 1e12, 2)
|
||||
wear = _ata_norm_value(data, 177) # Wear_Leveling_Count (Samsung): normalized = life left
|
||||
if wear is None:
|
||||
wear = _ata_norm_value(data, 231) # SSD_Life_Left on some drives
|
||||
if isinstance(wear, int):
|
||||
d.health_pct = wear
|
||||
return d
|
||||
|
||||
|
||||
def collect() -> list[DriveHealth]:
|
||||
"""Per-drive health for every SMART-capable device (needs root for real data)."""
|
||||
if shutil.which("smartctl") is None:
|
||||
return []
|
||||
return [parse(dev, _smartctl_json(dev)) for dev in _scan_devices()]
|
||||
|
||||
|
||||
def from_dicts(rows: list[dict]) -> list[DriveHealth]:
|
||||
"""Rebuild DriveHealth objects from the privileged collector's JSON."""
|
||||
out: list[DriveHealth] = []
|
||||
for r in rows:
|
||||
if isinstance(r, dict) and r.get("device"):
|
||||
fields = {k: r.get(k) for k in DriveHealth.__dataclass_fields__}
|
||||
out.append(DriveHealth(**fields))
|
||||
return out
|
||||
|
||||
|
||||
# --- findings -------------------------------------------------------------------------
|
||||
|
||||
def _stats_line(d: DriveHealth) -> str:
|
||||
parts: list[str] = []
|
||||
if d.health_pct is not None:
|
||||
parts.append(f"{d.health_pct}% life left")
|
||||
elif d.percent_used is not None:
|
||||
parts.append(f"{d.percent_used}% used")
|
||||
if d.power_on_hours is not None:
|
||||
parts.append(f"{d.power_on_hours:,} h powered on")
|
||||
if d.data_written_tb is not None:
|
||||
parts.append(f"{d.data_written_tb:g} TB written")
|
||||
if d.temp_c is not None:
|
||||
parts.append(f"{d.temp_c}°C")
|
||||
if d.available_spare is not None:
|
||||
parts.append(f"spare {d.available_spare}%")
|
||||
return " · ".join(parts)
|
||||
|
||||
|
||||
def to_findings(drives: list[DriveHealth]) -> list[Finding]:
|
||||
if not drives:
|
||||
if shutil.which("smartctl") is None:
|
||||
return [Finding(INFO, "Storage", "SMART not checked (smartmontools missing)",
|
||||
"Disk self-health couldn't be read.",
|
||||
"Install it: `sudo apt install smartmontools`")]
|
||||
return []
|
||||
findings: list[Finding] = []
|
||||
for d in drives:
|
||||
name = d.model or d.device
|
||||
if d.needs_root:
|
||||
findings.append(Finding(INFO, "Storage", f"{name}: SMART needs root",
|
||||
"Reading drive health requires elevated access.",
|
||||
"Run: `sudo rigdoctor report` (or launch the GUI, which asks once)."))
|
||||
continue
|
||||
|
||||
stats = _stats_line(d)
|
||||
# Severity from the failure predictors, worst first.
|
||||
bad = []
|
||||
if d.passed is False:
|
||||
bad.append("SMART overall self-assessment FAILED")
|
||||
for label, val in (("reallocated sectors", d.reallocated),
|
||||
("pending sectors", d.pending),
|
||||
("offline-uncorrectable sectors", d.offline_uncorrectable),
|
||||
("NVMe media errors", d.media_errors)):
|
||||
if isinstance(val, int) and val > 0:
|
||||
bad.append(f"{val} {label}")
|
||||
spare_low = (isinstance(d.available_spare, int) and isinstance(d.available_spare_threshold, int)
|
||||
and d.available_spare < d.available_spare_threshold)
|
||||
worn = isinstance(d.percent_used, int) and d.percent_used >= 90
|
||||
hot = isinstance(d.temp_c, int) and d.temp_c >= 70
|
||||
|
||||
if d.passed is False or bad:
|
||||
findings.append(Finding(
|
||||
CRITICAL, "Storage", f"{name}: failing ({stats})" if stats else f"{name}: failing",
|
||||
"; ".join(bad) + ".",
|
||||
"Back up this drive now and plan to replace it."))
|
||||
elif spare_low or worn:
|
||||
findings.append(Finding(
|
||||
WARNING, "Storage", f"{name}: worn ({stats})",
|
||||
("Available spare below the drive's threshold." if spare_low else
|
||||
f"NVMe wear at {d.percent_used}% used — near end of rated life."),
|
||||
"Back up important data and budget for a replacement."))
|
||||
elif hot:
|
||||
findings.append(Finding(
|
||||
WARNING, "Storage", f"{name}: hot ({stats})",
|
||||
f"Drive temperature is {d.temp_c}°C.",
|
||||
"Improve case/M.2 airflow; sustained heat shortens SSD life."))
|
||||
else:
|
||||
findings.append(Finding(
|
||||
OK, "Storage", f"{name}: healthy" + (f" ({stats})" if stats else ""),
|
||||
"SMART self-assessment passed." if d.passed else ""))
|
||||
return findings
|
||||
@@ -260,47 +260,19 @@ def check_nvidia_module() -> list[Finding]:
|
||||
)]
|
||||
|
||||
|
||||
def _smart_devices() -> list[str]:
|
||||
try:
|
||||
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return []
|
||||
devices = []
|
||||
for line in proc.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("/dev/"):
|
||||
devices.append(line.split()[0])
|
||||
return devices
|
||||
def check_drives() -> list[Finding]:
|
||||
"""Per-drive SMART health + wear/runtime stats (see core/drives.py).
|
||||
|
||||
Uses the session's elevated collection when present (GUI launch / pkexec), else reads
|
||||
smartctl directly — which only returns real data as root, so the unprivileged case yields
|
||||
'needs root' info findings pointing at `sudo rigdoctor report`.
|
||||
"""
|
||||
from . import drives, elevation
|
||||
|
||||
def check_smart() -> list[Finding]:
|
||||
if shutil.which("smartctl") is None:
|
||||
return [Finding(
|
||||
INFO, "Storage", "SMART not checked (smartmontools missing)",
|
||||
"Disk self-health couldn't be read.",
|
||||
"Install it for disk health checks: `sudo apt install smartmontools`",
|
||||
)]
|
||||
devices = _smart_devices()
|
||||
if not devices:
|
||||
return [Finding(
|
||||
INFO, "Storage", "SMART: couldn't enumerate drives",
|
||||
"Reading SMART usually needs root.",
|
||||
"Run: `sudo rigdoctor report`",
|
||||
)]
|
||||
findings: list[Finding] = []
|
||||
for dev in devices:
|
||||
try:
|
||||
proc = subprocess.run(["smartctl", "-H", dev], capture_output=True, text=True, timeout=15)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
continue
|
||||
combined = proc.stdout + proc.stderr
|
||||
if "Permission denied" in combined or "requires root" in combined.lower():
|
||||
findings.append(Finding(INFO, "Storage", f"SMART for {dev} needs root", "", "Run: `sudo rigdoctor report`"))
|
||||
elif "PASSED" in combined:
|
||||
findings.append(Finding(OK, "Storage", f"SMART OK: {dev}", "Overall-health self-assessment passed."))
|
||||
elif "FAILED" in combined or "FAILING_NOW" in combined:
|
||||
findings.append(Finding(CRITICAL, "Storage", f"SMART FAILED: {dev}", "The drive reports failing health.", "Back up now and replace the drive."))
|
||||
return findings
|
||||
priv = elevation.privileged()
|
||||
if priv is not None and priv.get("drives") is not None:
|
||||
return drives.to_findings(drives.from_dicts(priv["drives"]))
|
||||
return drives.to_findings(drives.collect())
|
||||
|
||||
|
||||
def check_live_temps() -> list[Finding]:
|
||||
@@ -398,25 +370,19 @@ def check_memory_speed() -> list[Finding]:
|
||||
def run_health_checks(include_journal: bool = True) -> list[Finding]:
|
||||
"""Run all checks and return findings sorted by severity (worst first).
|
||||
|
||||
SMART needs root; if the session collected it via launch elevation, use that
|
||||
instead of re-running smartctl (which would just report "needs root").
|
||||
Drive SMART and RAM speed need root; if the session collected them via launch elevation,
|
||||
those checks use the cached data instead of re-running (which would just report "needs root").
|
||||
|
||||
`include_journal=False` skips the 7-day kernel-journal scan — used by the crash
|
||||
analysis, which scans the previous (crashed) boot specifically instead.
|
||||
"""
|
||||
from . import elevation
|
||||
|
||||
findings: list[Finding] = []
|
||||
findings += check_nvidia_driver()
|
||||
findings += check_nvidia_module()
|
||||
if include_journal:
|
||||
findings += check_journal()
|
||||
findings += check_journal_persistence()
|
||||
priv = elevation.privileged()
|
||||
if priv is not None and priv.get("smart") is not None:
|
||||
findings += [Finding(**d) for d in priv["smart"]]
|
||||
else:
|
||||
findings += check_smart()
|
||||
findings += check_drives()
|
||||
findings += check_live_temps()
|
||||
findings += check_pcie_links()
|
||||
findings += check_displays()
|
||||
|
||||
@@ -0,0 +1,322 @@
|
||||
"""GPU stress + close thermal monitoring — the repro tool for load-correlated crashes.
|
||||
|
||||
Run a GPU load and sample sensors at a high rate, then report peak/sustained temperatures,
|
||||
how long the GPU spent above each temperature threshold, power headroom vs the limit, whether
|
||||
it throttled, and any GPU fault (Xid / VA-space / a query timeout) that hit during the window.
|
||||
This is the on-demand way to reproduce the "only under load / only certain games" freezes
|
||||
instead of waiting for a game to trigger them.
|
||||
|
||||
The load comes from, in order: an explicit ``command`` (your game, or a loader like gpu-burn),
|
||||
an auto-detected loader on PATH (gpu-burn / vkmark / glmark2 / vkcube), or **monitor-only** when
|
||||
none is found — then you generate the load yourself (launch the game) while this closely tracks
|
||||
temps for the duration.
|
||||
|
||||
Stdlib only. Degrades gracefully: no nvidia-smi → no GPU stats; a loader that won't start →
|
||||
monitor-only with a note; missing journal access → no fault scan, just the telemetry.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from . import health
|
||||
from .sample import Sample
|
||||
from .sampler import Sampler
|
||||
from .sources import available_sources
|
||||
|
||||
# Default temperature dwell thresholds (°C). 83 is Ampere's typical thermal-throttle point;
|
||||
# 90+ is hot; sustained 95+ on the core (or 100+ on GDDR6 memory) is a cooling problem.
|
||||
DEFAULT_THRESHOLDS = (80, 85, 90, 95)
|
||||
|
||||
# Known GPU load generators, best (heaviest / most deterministic) first. argv builder takes the
|
||||
# remaining duration so a self-terminating loader (gpu-burn) bounds itself; the windowed
|
||||
# benchmarks loop until we kill them. None are required — detection is best-effort.
|
||||
_LOADERS: list[tuple[str, Callable[[float], list[str]]]] = [
|
||||
("gpu-burn", lambda secs: ["gpu-burn", str(max(1, int(secs)))]),
|
||||
("vkmark", lambda _s: ["vkmark", "--run-forever"]),
|
||||
("glmark2", lambda _s: ["glmark2", "--run-forever"]),
|
||||
("vkcube", lambda _s: ["vkcube"]),
|
||||
]
|
||||
|
||||
# NVML clocks-event bits that mean the clocks are being *held back* (a throttle), decoded from
|
||||
# the active-reasons bitmask so we don't depend on per-field name differences across drivers.
|
||||
_THROTTLE_BITS = {
|
||||
0x008: "HW slowdown",
|
||||
0x020: "SW thermal slowdown",
|
||||
0x040: "HW thermal slowdown",
|
||||
0x080: "HW power-brake slowdown",
|
||||
}
|
||||
_POWERCAP_BIT = 0x004 # hitting the power limit — expected under load, reported separately
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricStat:
|
||||
key: str # e.g. "gpu.temp", "gpu.power", "gpu.clock.core"
|
||||
label: str # human label for the report
|
||||
unit: str
|
||||
min: float
|
||||
avg: float
|
||||
max: float
|
||||
samples: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class _Tick:
|
||||
dt: float # seconds this tick represents (for dwell-time weighting)
|
||||
values: dict[str, float] # reading key -> value across all sources (Nones dropped)
|
||||
throttle: list[str] # active throttle reasons this tick
|
||||
power_capped: bool
|
||||
lost: bool # query timeout / no GPU response this tick
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressResult:
|
||||
load: str # "command: …" | "auto: gpu-burn" | "monitor-only"
|
||||
duration: float # seconds actually monitored
|
||||
samples: int
|
||||
interval: float
|
||||
stats: list[MetricStat] = field(default_factory=list)
|
||||
peak_temp: float | None = None
|
||||
peak_mem_temp: float | None = None
|
||||
avg_temp: float | None = None
|
||||
time_above: dict[int, float] = field(default_factory=dict) # threshold °C -> seconds at/above
|
||||
max_power: float | None = None
|
||||
power_limit: float | None = None
|
||||
power_capped: bool = False
|
||||
throttled: bool = False
|
||||
throttle_reasons: list[str] = field(default_factory=list)
|
||||
gpu_lost: bool = False
|
||||
faults: list[str] = field(default_factory=list) # Xid/VA-space titles in the window
|
||||
aborted: bool = False # Ctrl-C or the load exited early
|
||||
severity: str = health.OK
|
||||
verdict: str = ""
|
||||
|
||||
|
||||
# --- load resolution ------------------------------------------------------------------
|
||||
|
||||
def available_loaders() -> list[str]:
|
||||
"""Known GPU load tools found on PATH (heaviest first)."""
|
||||
return [name for name, _ in _LOADERS if shutil.which(name)]
|
||||
|
||||
|
||||
def _start_load(command: list[str] | None, duration: float) -> tuple[subprocess.Popen | None, str]:
|
||||
"""Start the load process and return (proc, description). proc is None for monitor-only."""
|
||||
if command:
|
||||
try:
|
||||
proc = subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
return proc, "command: " + " ".join(command)
|
||||
except (OSError, ValueError) as exc:
|
||||
return None, f"monitor-only (command failed to start: {exc})"
|
||||
for name, build in _LOADERS:
|
||||
if shutil.which(name):
|
||||
try:
|
||||
proc = subprocess.Popen(build(duration), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
return proc, f"auto: {name}"
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
return None, "monitor-only"
|
||||
|
||||
|
||||
def _stop_load(proc: subprocess.Popen | None) -> None:
|
||||
if proc is None or proc.poll() is not None:
|
||||
return
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
|
||||
|
||||
# --- throttle / fault probes ----------------------------------------------------------
|
||||
|
||||
def _throttle_state() -> tuple[list[str], bool]:
|
||||
"""(active throttle reasons, power-capped) decoded from the clocks-event bitmask."""
|
||||
if shutil.which("nvidia-smi") is None:
|
||||
return [], False
|
||||
raw = ""
|
||||
for field_name in ("clocks_event_reasons.active", "clocks_throttle_reasons.active"):
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["nvidia-smi", f"--query-gpu={field_name}", "--format=csv,noheader"],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
continue
|
||||
raw = proc.stdout.strip().splitlines()[0].strip() if proc.stdout.strip() else ""
|
||||
if raw and raw.lower() not in ("n/a", "not supported", "[n/a]"):
|
||||
break
|
||||
try:
|
||||
bits = int(raw, 16)
|
||||
except ValueError:
|
||||
return [], False
|
||||
reasons = [name for bit, name in _THROTTLE_BITS.items() if bits & bit]
|
||||
return reasons, bool(bits & _POWERCAP_BIT)
|
||||
|
||||
|
||||
def _faults_since(start_ts: float) -> list[str]:
|
||||
"""Titles of GPU/PCIe/hardware faults logged to the kernel journal since the run began."""
|
||||
out = health._journalctl(["-k", "--no-pager", "-o", "cat", "--since", f"@{int(start_ts)}"])
|
||||
if not out:
|
||||
return []
|
||||
return [f.title for f in health.scan_journal_text(out)
|
||||
if f.category in ("GPU", "PCIe", "Hardware", "Kernel")]
|
||||
|
||||
|
||||
def _tick_values(sample: Sample) -> tuple[dict[str, float], bool]:
|
||||
"""Reading key -> value across all sources (Nones dropped), plus whether the GPU
|
||||
failed to respond (an nvidia-smi query timeout — a hang/lost signal)."""
|
||||
values: dict[str, float] = {}
|
||||
lost = False
|
||||
for r in sample.readings:
|
||||
if r.source == "gpu" and r.metric == "status" and r.label == "query-timeout":
|
||||
lost = True
|
||||
if r.value is not None:
|
||||
values[r.key] = r.value
|
||||
return values, lost
|
||||
|
||||
|
||||
# --- pure analysis (unit-testable, no IO) ---------------------------------------------
|
||||
|
||||
_REPORT_KEYS = {
|
||||
"gpu.temp": ("GPU core temp", "°C"),
|
||||
"gpu.temp.memory": ("GPU memory temp", "°C"),
|
||||
"gpu.power": ("GPU power", "W"),
|
||||
"gpu.util": ("GPU utilization", "%"),
|
||||
"gpu.mem_util": ("VRAM controller util", "%"),
|
||||
"gpu.clock.core": ("Core clock", "MHz"),
|
||||
"gpu.clock.memory": ("Memory clock", "MHz"),
|
||||
"gpu.fan": ("Fan", "%"),
|
||||
"gpu.mem_used": ("VRAM used", "MiB"),
|
||||
"cpu.temp": ("CPU temp", "°C"),
|
||||
}
|
||||
|
||||
|
||||
def summarize(ticks: list[_Tick], *, load: str, interval: float, faults: list[str],
|
||||
thresholds=DEFAULT_THRESHOLDS) -> StressResult:
|
||||
"""Build a StressResult from collected ticks — pure, so it's tested with synthetic input."""
|
||||
duration = sum(t.dt for t in ticks)
|
||||
result = StressResult(load=load, duration=round(duration, 1), samples=len(ticks),
|
||||
interval=interval, faults=faults)
|
||||
|
||||
series: dict[str, list[float]] = {}
|
||||
throttle_seen: set[str] = set()
|
||||
time_above = {th: 0.0 for th in thresholds}
|
||||
for t in ticks:
|
||||
for key, value in t.values.items():
|
||||
series.setdefault(key, []).append(value)
|
||||
throttle_seen.update(t.throttle)
|
||||
if t.power_capped:
|
||||
result.power_capped = True
|
||||
if t.lost:
|
||||
result.gpu_lost = True
|
||||
core = t.values.get("gpu.temp")
|
||||
if core is not None:
|
||||
for th in thresholds:
|
||||
if core >= th:
|
||||
time_above[th] += t.dt
|
||||
|
||||
for key, (label, unit) in _REPORT_KEYS.items():
|
||||
vals = series.get(key)
|
||||
if not vals:
|
||||
continue
|
||||
stat = MetricStat(key, label, unit, round(min(vals), 1),
|
||||
round(sum(vals) / len(vals), 1), round(max(vals), 1), len(vals))
|
||||
result.stats.append(stat)
|
||||
if key == "gpu.temp":
|
||||
result.peak_temp, result.avg_temp = stat.max, stat.avg
|
||||
elif key == "gpu.temp.memory":
|
||||
result.peak_mem_temp = stat.max
|
||||
elif key == "gpu.power":
|
||||
result.max_power = stat.max
|
||||
|
||||
# power_limit isn't a reported metric (it's ~constant); pull it from the raw series.
|
||||
if "gpu.power_limit" in series:
|
||||
result.power_limit = max(series["gpu.power_limit"])
|
||||
|
||||
result.throttle_reasons = sorted(throttle_seen)
|
||||
result.throttled = bool(throttle_seen)
|
||||
result.time_above = {th: round(secs, 1) for th, secs in time_above.items() if secs > 0}
|
||||
|
||||
_verdict(result)
|
||||
return result
|
||||
|
||||
|
||||
def _verdict(r: StressResult) -> None:
|
||||
"""Set severity + a plain-language conclusion from the gathered signals."""
|
||||
peak = f"{r.peak_temp:.0f}°C" if r.peak_temp is not None else "?"
|
||||
if r.gpu_lost or any(t for t in r.faults):
|
||||
r.severity = health.CRITICAL
|
||||
cause = "; ".join(r.faults) if r.faults else "the GPU stopped responding (query timeout)"
|
||||
r.verdict = (f"GPU fault during the stress run: {cause}. This reproduces the crash under "
|
||||
f"load — capture/keep these logs. Peak core temp {peak}.")
|
||||
return
|
||||
if r.throttled:
|
||||
r.severity = health.WARNING
|
||||
r.verdict = (f"Thermal/HW throttling detected ({', '.join(r.throttle_reasons)}) — the GPU "
|
||||
f"held clocks back to stay safe. Peak core temp {peak}. Improve cooling/airflow.")
|
||||
return
|
||||
if r.peak_temp is not None and r.peak_temp >= 90:
|
||||
r.severity = health.WARNING
|
||||
r.verdict = (f"No fault, but the core peaked at {peak} — hot. Watch GDDR6/VRM cooling; "
|
||||
"sustained high temps shorten the card's life and precede instability.")
|
||||
return
|
||||
if r.peak_temp is None:
|
||||
r.severity = health.INFO
|
||||
r.verdict = "No GPU telemetry was captured (nvidia-smi unavailable?)."
|
||||
return
|
||||
capped = " (power-limited — hitting the cap, which is normal)" if r.power_capped else ""
|
||||
r.verdict = f"Stable: peaked at {peak} with no faults or throttling{capped}."
|
||||
|
||||
|
||||
# --- the run loop (IO) ----------------------------------------------------------------
|
||||
|
||||
def run(duration: float = 120.0, interval: float = 0.5, command: list[str] | None = None,
|
||||
thresholds=DEFAULT_THRESHOLDS, on_tick: Callable[[Sample, float], None] | None = None,
|
||||
should_stop: Callable[[], bool] | None = None) -> StressResult:
|
||||
"""Drive a GPU load for ``duration`` seconds, sampling every ``interval``, and report.
|
||||
|
||||
Stops early on Ctrl-C, if a GPU query times out (likely hang), if the load process exits, or
|
||||
when ``should_stop()`` returns True (the GUI's Stop button). ``on_tick(sample, elapsed)`` is
|
||||
called each tick for live display.
|
||||
"""
|
||||
sampler = Sampler(available_sources())
|
||||
proc, load_desc = _start_load(command, duration)
|
||||
start = time.monotonic()
|
||||
start_ts = time.time()
|
||||
ticks: list[_Tick] = []
|
||||
last = start
|
||||
aborted = False
|
||||
try:
|
||||
while True:
|
||||
sample = sampler.sample()
|
||||
now = time.monotonic()
|
||||
dt = now - last
|
||||
last = now
|
||||
values, lost = _tick_values(sample)
|
||||
reasons, capped = _throttle_state()
|
||||
ticks.append(_Tick(dt=dt, values=values, throttle=reasons, power_capped=capped, lost=lost))
|
||||
if on_tick is not None:
|
||||
on_tick(sample, now - start)
|
||||
if lost: # GPU stopped responding — stop now, it may be hung/lost
|
||||
break
|
||||
if should_stop is not None and should_stop(): # GUI Stop button
|
||||
aborted = True
|
||||
break
|
||||
if proc is not None and proc.poll() is not None: # the load finished/exited
|
||||
break
|
||||
if (now - start) >= duration:
|
||||
break
|
||||
time.sleep(max(0.0, interval - (time.monotonic() - now)))
|
||||
except KeyboardInterrupt:
|
||||
aborted = True
|
||||
finally:
|
||||
_stop_load(proc)
|
||||
|
||||
faults = _faults_since(start_ts)
|
||||
result = summarize(ticks, load=load_desc, interval=interval, faults=faults, thresholds=thresholds)
|
||||
result.aborted = aborted or (proc is not None and command is not None and result.duration < duration - interval)
|
||||
return result
|
||||
@@ -430,19 +430,78 @@ class GamesPage(QWidget):
|
||||
self._banner.hide()
|
||||
|
||||
def _add_custom_game(self) -> None:
|
||||
"""Manually add a game no launcher reports (e.g. SPT), then rescan to show it."""
|
||||
from PySide6.QtWidgets import QInputDialog
|
||||
|
||||
"""Manually add a game no launcher reports (e.g. SPT): name + an optional launch
|
||||
command/script (so it can be launched under crash-capture) and log folder."""
|
||||
from ..core import customgames
|
||||
|
||||
name, ok = QInputDialog.getText(
|
||||
self, "Add game", "Game name (e.g. SPT) — for titles no launcher reports:")
|
||||
if not ok:
|
||||
dlg = QDialog(self)
|
||||
dlg.setWindowTitle("Add game")
|
||||
dlg.setMinimumWidth(560)
|
||||
v = QVBoxLayout(dlg)
|
||||
v.setContentsMargins(20, 18, 20, 16)
|
||||
v.setSpacing(10)
|
||||
|
||||
intro = QLabel(
|
||||
"Add a game no launcher reports — a standalone mod launcher like SPT, an itch.io "
|
||||
"download, or any hand-installed game.")
|
||||
intro.setWordWrap(True)
|
||||
v.addWidget(intro)
|
||||
|
||||
name_edit = QLineEdit()
|
||||
name_edit.setPlaceholderText("SPT")
|
||||
v.addWidget(QLabel("Game name"))
|
||||
v.addWidget(name_edit)
|
||||
|
||||
cmd_edit = QLineEdit()
|
||||
cmd_edit.setPlaceholderText("e.g. /run/media/.../Escape-From-Tarkov/tarkov.sh")
|
||||
cmd_row = QHBoxLayout()
|
||||
cmd_row.addWidget(cmd_edit, 1)
|
||||
cmd_browse = QPushButton("Browse…")
|
||||
cmd_row.addWidget(cmd_browse, 0)
|
||||
v.addWidget(QLabel("Launch command / script (optional — enables launch + auto-capture)"))
|
||||
v.addLayout(cmd_row)
|
||||
|
||||
log_edit = QLineEdit()
|
||||
log_edit.setPlaceholderText("auto-detected from the script's folder (its logs/ subfolder)")
|
||||
log_row = QHBoxLayout()
|
||||
log_row.addWidget(log_edit, 1)
|
||||
log_browse = QPushButton("Browse…")
|
||||
log_row.addWidget(log_browse, 0)
|
||||
v.addWidget(QLabel("Log folder (optional — read into crash diagnostics)"))
|
||||
v.addLayout(log_row)
|
||||
|
||||
def _pick_command() -> None:
|
||||
path, _ = QFileDialog.getOpenFileName(dlg, "Select the launch script/executable")
|
||||
if path:
|
||||
cmd_edit.setText(path)
|
||||
|
||||
def _pick_logdir() -> None:
|
||||
path = QFileDialog.getExistingDirectory(dlg, "Select the game's log folder")
|
||||
if path:
|
||||
log_edit.setText(path)
|
||||
|
||||
cmd_browse.clicked.connect(_pick_command)
|
||||
log_browse.clicked.connect(_pick_logdir)
|
||||
|
||||
buttons = QHBoxLayout()
|
||||
buttons.addStretch(1)
|
||||
cancel = QPushButton("Cancel")
|
||||
cancel.clicked.connect(dlg.reject)
|
||||
buttons.addWidget(cancel)
|
||||
add = QPushButton("Add")
|
||||
add.setObjectName("PrimaryButton")
|
||||
add.setDefault(True)
|
||||
add.clicked.connect(dlg.accept)
|
||||
buttons.addWidget(add)
|
||||
v.addLayout(buttons)
|
||||
|
||||
if dlg.exec() != QDialog.DialogCode.Accepted:
|
||||
return
|
||||
name = name.strip()
|
||||
name = name_edit.text().strip()
|
||||
if not name:
|
||||
return
|
||||
if customgames.add(name):
|
||||
if customgames.add(name, command=cmd_edit.text().strip() or None,
|
||||
logdir=log_edit.text().strip() or None):
|
||||
self.refresh()
|
||||
else:
|
||||
QMessageBox.information(self, "Add game", f"'{name}' is already in your games.")
|
||||
|
||||
@@ -39,6 +39,9 @@ class HealthPage(QWidget):
|
||||
self._status = QLabel("")
|
||||
self._status.setObjectName("Muted")
|
||||
header.addWidget(self._status)
|
||||
self._stress_btn = QPushButton("Stress test…")
|
||||
self._stress_btn.clicked.connect(self._open_stress)
|
||||
header.addWidget(self._stress_btn)
|
||||
self._run_btn = QPushButton("Run health report")
|
||||
self._run_btn.setObjectName("PrimaryButton")
|
||||
self._run_btn.clicked.connect(self._run)
|
||||
@@ -59,6 +62,11 @@ class HealthPage(QWidget):
|
||||
|
||||
QTimer.singleShot(300, self._run) # auto-run shortly after the window opens
|
||||
|
||||
def _open_stress(self) -> None:
|
||||
from .stress_dialog import StressDialog
|
||||
|
||||
StressDialog(self).exec()
|
||||
|
||||
def _run(self) -> None:
|
||||
self._run_btn.setEnabled(False)
|
||||
self._status.setText("Scanning logs, SMART, and driver…")
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
"""GPU stress + thermal-monitor dialog (GUI front-end for core/stress.py).
|
||||
|
||||
Runs the stress monitor in a background thread, streams a live one-line readout, and shows the
|
||||
rendered result (telemetry stats + verdict) when it finishes. A Stop button ends the run early
|
||||
via a cooperative flag; closing the dialog mid-run stops it too.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtGui import QFont
|
||||
from PySide6.QtWidgets import (
|
||||
QDialog,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QLineEdit,
|
||||
QPushButton,
|
||||
QSpinBox,
|
||||
QTextEdit,
|
||||
QVBoxLayout,
|
||||
)
|
||||
|
||||
|
||||
class StressDialog(QDialog):
|
||||
_tick = Signal(str) # live one-line readout (worker thread -> GUI)
|
||||
_done = Signal(object) # stress.StressResult when the run finishes
|
||||
|
||||
def __init__(self, parent=None) -> None:
|
||||
super().__init__(parent)
|
||||
self._stop = threading.Event()
|
||||
self._running = False
|
||||
self._tick.connect(self._on_tick)
|
||||
self._done.connect(self._on_done)
|
||||
self.setWindowTitle("GPU stress + thermal monitor")
|
||||
self.resize(640, 460)
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 16)
|
||||
root.setSpacing(12)
|
||||
|
||||
intro = QLabel(
|
||||
"Run a GPU load and closely watch temps. Reports peak/sustained temps, time spent "
|
||||
"hot, throttling, and any GPU fault (Xid / driver freeze) during the run.")
|
||||
intro.setWordWrap(True)
|
||||
root.addWidget(intro)
|
||||
|
||||
from ..core import stress
|
||||
loaders = stress.available_loaders()
|
||||
self._mode = QLabel(
|
||||
f"Load tool detected: {loaders[0]} — it'll drive the load." if loaders else
|
||||
"No GPU load tool installed → MONITOR-ONLY: start this, then launch your game; "
|
||||
"it tracks temps while you play. (Or give a command below.)")
|
||||
self._mode.setObjectName("Muted")
|
||||
self._mode.setWordWrap(True)
|
||||
root.addWidget(self._mode)
|
||||
|
||||
form = QHBoxLayout()
|
||||
form.addWidget(QLabel("Duration (s):"))
|
||||
self._duration = QSpinBox()
|
||||
self._duration.setRange(5, 3600)
|
||||
self._duration.setValue(120)
|
||||
form.addWidget(self._duration)
|
||||
form.addSpacing(12)
|
||||
form.addWidget(QLabel("Command (optional):"))
|
||||
self._command = QLineEdit()
|
||||
self._command.setPlaceholderText("e.g. /…/tarkov.sh or gpu-burn 60")
|
||||
form.addWidget(self._command, 1)
|
||||
root.addLayout(form)
|
||||
|
||||
self._live = QLabel("—")
|
||||
self._live.setFont(QFont("monospace"))
|
||||
self._live.setStyleSheet("background: #0d0f13; color: #cfd3da; border: 1px solid #2a2f39; "
|
||||
"border-radius: 8px; padding: 8px;")
|
||||
root.addWidget(self._live)
|
||||
|
||||
self._report = QTextEdit()
|
||||
self._report.setReadOnly(True)
|
||||
self._report.setFont(QFont("monospace"))
|
||||
self._report.setVisible(False)
|
||||
root.addWidget(self._report, 1)
|
||||
|
||||
buttons = QHBoxLayout()
|
||||
buttons.addStretch(1)
|
||||
self._stop_btn = QPushButton("Stop")
|
||||
self._stop_btn.setEnabled(False)
|
||||
self._stop_btn.clicked.connect(self._on_stop)
|
||||
buttons.addWidget(self._stop_btn)
|
||||
self._start_btn = QPushButton("Start")
|
||||
self._start_btn.setObjectName("PrimaryButton")
|
||||
self._start_btn.clicked.connect(self._on_start)
|
||||
buttons.addWidget(self._start_btn)
|
||||
root.addLayout(buttons)
|
||||
|
||||
def _on_start(self) -> None:
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._stop.clear()
|
||||
self._start_btn.setEnabled(False)
|
||||
self._stop_btn.setEnabled(True)
|
||||
self._report.setVisible(False)
|
||||
self._live.setText("starting…")
|
||||
duration = float(self._duration.value())
|
||||
command_text = self._command.text().strip()
|
||||
threading.Thread(target=self._work, args=(duration, command_text), daemon=True).start()
|
||||
|
||||
def _work(self, duration: float, command_text: str) -> None:
|
||||
import shlex
|
||||
|
||||
from ..core import stress
|
||||
|
||||
command = shlex.split(command_text) if command_text else None
|
||||
|
||||
def _tick(sample, elapsed) -> None:
|
||||
by = {r.key: r for r in sample.readings}
|
||||
from ..render import format_raw
|
||||
bits = [f"{elapsed:5.0f}s"]
|
||||
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
|
||||
("gpu.util", "util"), ("gpu.clock.core", "clk"),
|
||||
("gpu.temp.memory", "vram")):
|
||||
r = by.get(key)
|
||||
if r is not None and r.value is not None:
|
||||
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
|
||||
self._tick.emit(" ".join(bits))
|
||||
|
||||
try:
|
||||
result = stress.run(duration=duration, interval=0.5, command=command,
|
||||
on_tick=_tick, should_stop=self._stop.is_set)
|
||||
except Exception as exc: # never let a worker crash take down the dialog
|
||||
result = exc
|
||||
self._done.emit(result)
|
||||
|
||||
def _on_tick(self, text: str) -> None:
|
||||
self._live.setText(text)
|
||||
|
||||
def _on_done(self, result) -> None:
|
||||
from ..render import render_stress
|
||||
|
||||
self._running = False
|
||||
self._start_btn.setEnabled(True)
|
||||
self._stop_btn.setEnabled(False)
|
||||
if isinstance(result, Exception):
|
||||
self._report.setPlainText(f"Stress run failed: {result}")
|
||||
else:
|
||||
self._report.setPlainText(render_stress(result))
|
||||
self._report.setVisible(True)
|
||||
|
||||
def _on_stop(self) -> None:
|
||||
self._stop.set()
|
||||
self._stop_btn.setEnabled(False)
|
||||
self._live.setText("stopping…")
|
||||
|
||||
def closeEvent(self, event) -> None: # stop the run if the dialog is closed mid-flight
|
||||
self._stop.set()
|
||||
super().closeEvent(event)
|
||||
@@ -118,6 +118,32 @@ def render_health(findings: list, title: str = "Health report") -> str:
|
||||
return "\n".join(lines).rstrip()
|
||||
|
||||
|
||||
def render_stress(result) -> str:
|
||||
"""Render a stress.StressResult: telemetry stats, temp dwell time, and the verdict."""
|
||||
lines = ["GPU stress + thermal monitor", ""]
|
||||
lines.append(f" Load : {result.load}")
|
||||
lines.append(f" Duration : {_fmt_duration(result.duration)} · {result.samples} samples "
|
||||
f"@ {result.interval:g}s" + (" (stopped early)" if result.aborted else ""))
|
||||
if result.stats:
|
||||
lines += ["", f" {'Metric':<22}{'min':>12}{'avg':>12}{'max':>12}"]
|
||||
for s in result.stats:
|
||||
u = s.unit
|
||||
lines.append(f" {s.label:<22}{format_raw(s.min, u):>12}{format_raw(s.avg, u):>12}"
|
||||
f"{format_raw(s.max, u):>12}")
|
||||
if result.time_above:
|
||||
spans = " ".join(f"≥{th}°C: {_fmt_duration(secs)}" for th, secs in sorted(result.time_above.items()))
|
||||
lines += ["", f" Time at temp (core): {spans}"]
|
||||
if result.max_power is not None and result.power_limit:
|
||||
cap = " — hit the power cap" if result.power_capped else ""
|
||||
lines.append(f" Power peak: {result.max_power:.0f} W of {result.power_limit:.0f} W limit{cap}")
|
||||
if result.throttle_reasons:
|
||||
lines.append(f" Throttling: {', '.join(result.throttle_reasons)}")
|
||||
if result.faults:
|
||||
lines.append(f" Faults : {'; '.join(result.faults)}")
|
||||
lines += ["", f"[{_SEV_LABEL.get(result.severity, '?')}] {result.verdict}"]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_summary(summary: Summary, log_path=None) -> str:
|
||||
if summary.samples == 0 and not summary.events:
|
||||
where = f" ({log_path})" if log_path else ""
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
"""Tests for drive health parsing & findings (synthetic smartctl JSON)."""
|
||||
|
||||
import unittest
|
||||
from dataclasses import asdict
|
||||
|
||||
from rigdoctor.core import drives
|
||||
from rigdoctor.core.health import CRITICAL, INFO, OK, WARNING
|
||||
|
||||
_NVME_OK = {
|
||||
"model_name": "Samsung SSD 980 PRO 1TB",
|
||||
"device": {"protocol": "NVMe"},
|
||||
"smart_status": {"passed": True},
|
||||
"temperature": {"current": 41},
|
||||
"power_on_time": {"hours": 1234},
|
||||
"nvme_smart_health_information_log": {
|
||||
"percentage_used": 3, "available_spare": 100, "available_spare_threshold": 10,
|
||||
"media_errors": 0, "data_units_written": 200_000_000, # ~102 TB
|
||||
},
|
||||
}
|
||||
|
||||
_NVME_WORN = {
|
||||
"model_name": "Worn NVMe",
|
||||
"device": {"protocol": "NVMe"},
|
||||
"smart_status": {"passed": True},
|
||||
"nvme_smart_health_information_log": {"percentage_used": 96, "available_spare": 100,
|
||||
"available_spare_threshold": 10},
|
||||
}
|
||||
|
||||
_SATA_FAILING = {
|
||||
"model_name": "Samsung SSD 870 QVO 1TB",
|
||||
"device": {"protocol": "ATA"},
|
||||
"smart_status": {"passed": False},
|
||||
"temperature": {"current": 35},
|
||||
"power_on_time": {"hours": 5000},
|
||||
"ata_smart_attributes": {"table": [
|
||||
{"id": 5, "name": "Reallocated_Sector_Ct", "value": 80, "raw": {"value": 12}},
|
||||
{"id": 177, "name": "Wear_Leveling_Count", "value": 88, "raw": {"value": 300}},
|
||||
{"id": 241, "name": "Total_LBAs_Written", "value": 99, "raw": {"value": 2_000_000_000}},
|
||||
]},
|
||||
}
|
||||
|
||||
|
||||
class ParseTests(unittest.TestCase):
|
||||
def test_nvme_parse(self):
|
||||
d = drives.parse("/dev/nvme0", _NVME_OK)
|
||||
self.assertEqual(d.kind, "nvme")
|
||||
self.assertTrue(d.passed)
|
||||
self.assertEqual(d.percent_used, 3)
|
||||
self.assertEqual(d.health_pct, 97) # 100 - percentage_used
|
||||
self.assertEqual(d.power_on_hours, 1234)
|
||||
self.assertEqual(d.temp_c, 41)
|
||||
self.assertAlmostEqual(d.data_written_tb, 102.4, places=1)
|
||||
|
||||
def test_sata_parse(self):
|
||||
d = drives.parse("/dev/sda", _SATA_FAILING)
|
||||
self.assertEqual(d.kind, "sata")
|
||||
self.assertFalse(d.passed)
|
||||
self.assertEqual(d.reallocated, 12) # raw value
|
||||
self.assertEqual(d.health_pct, 88) # normalized wear-leveling value
|
||||
self.assertAlmostEqual(d.data_written_tb, 1.02, places=1)
|
||||
|
||||
def test_needs_root_when_no_data(self):
|
||||
d = drives.parse("/dev/sda", None)
|
||||
self.assertTrue(d.needs_root)
|
||||
|
||||
def test_roundtrip_through_dicts(self):
|
||||
d = drives.parse("/dev/nvme0", _NVME_OK)
|
||||
back = drives.from_dicts([asdict(d)])
|
||||
self.assertEqual(len(back), 1)
|
||||
self.assertEqual(back[0].model, d.model)
|
||||
self.assertEqual(back[0].health_pct, d.health_pct)
|
||||
|
||||
|
||||
class FindingTests(unittest.TestCase):
|
||||
def test_healthy_nvme_is_ok_with_stats(self):
|
||||
f = drives.to_findings([drives.parse("/dev/nvme0", _NVME_OK)])[0]
|
||||
self.assertEqual(f.severity, OK)
|
||||
self.assertIn("97% life left", f.title)
|
||||
self.assertIn("1,234 h", f.title)
|
||||
|
||||
def test_failing_sata_is_critical(self):
|
||||
f = drives.to_findings([drives.parse("/dev/sda", _SATA_FAILING)])[0]
|
||||
self.assertEqual(f.severity, CRITICAL)
|
||||
self.assertIn("FAILED", f.detail)
|
||||
self.assertIn("reallocated sectors", f.detail)
|
||||
|
||||
def test_worn_nvme_is_warning(self):
|
||||
f = drives.to_findings([drives.parse("/dev/nvme1", _NVME_WORN)])[0]
|
||||
self.assertEqual(f.severity, WARNING)
|
||||
self.assertIn("worn", f.title)
|
||||
|
||||
def test_needs_root_is_info(self):
|
||||
f = drives.to_findings([drives.parse("/dev/sda", None)])[0]
|
||||
self.assertEqual(f.severity, INFO)
|
||||
self.assertIn("needs root", f.title)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,77 @@
|
||||
"""Tests for the GPU stress + thermal-monitor analysis (synthetic ticks, no real GPU)."""
|
||||
|
||||
import unittest
|
||||
|
||||
from rigdoctor.core import stress
|
||||
from rigdoctor.core.health import CRITICAL, OK, WARNING
|
||||
|
||||
|
||||
def _tick(temp=None, power=None, throttle=(), capped=False, lost=False, dt=1.0, **extra):
|
||||
values = {}
|
||||
if temp is not None:
|
||||
values["gpu.temp"] = temp
|
||||
if power is not None:
|
||||
values["gpu.power"] = power
|
||||
values.update(extra)
|
||||
return stress._Tick(dt=dt, values=values, throttle=list(throttle), power_capped=capped, lost=lost)
|
||||
|
||||
|
||||
class SummarizeTests(unittest.TestCase):
|
||||
def test_stable_run_is_ok(self):
|
||||
ticks = [_tick(temp=t, power=200, **{"gpu.power_limit": 280}) for t in (60, 65, 70, 72)]
|
||||
r = stress.summarize(ticks, load="monitor-only", interval=1.0, faults=[])
|
||||
self.assertEqual(r.severity, OK)
|
||||
self.assertEqual(r.peak_temp, 72)
|
||||
self.assertEqual(r.max_power, 200)
|
||||
self.assertEqual(r.power_limit, 280)
|
||||
self.assertFalse(r.throttled)
|
||||
self.assertIn("Stable", r.verdict)
|
||||
|
||||
def test_dwell_time_above_thresholds(self):
|
||||
# 3 ticks of 2s each at 82/86/92 °C → ≥80 for all 6s, ≥85 for 4s, ≥90 for 2s.
|
||||
ticks = [_tick(temp=82, dt=2.0), _tick(temp=86, dt=2.0), _tick(temp=92, dt=2.0)]
|
||||
r = stress.summarize(ticks, load="x", interval=2.0, faults=[])
|
||||
self.assertEqual(r.time_above[80], 6.0)
|
||||
self.assertEqual(r.time_above[85], 4.0)
|
||||
self.assertEqual(r.time_above[90], 2.0)
|
||||
self.assertNotIn(95, r.time_above) # never reached → omitted
|
||||
|
||||
def test_throttling_is_a_warning(self):
|
||||
ticks = [_tick(temp=88, throttle=["HW thermal slowdown"])]
|
||||
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
|
||||
self.assertEqual(r.severity, WARNING)
|
||||
self.assertTrue(r.throttled)
|
||||
self.assertIn("HW thermal slowdown", r.throttle_reasons)
|
||||
|
||||
def test_high_temp_without_throttle_is_a_warning(self):
|
||||
r = stress.summarize([_tick(temp=93)], load="x", interval=1.0, faults=[])
|
||||
self.assertEqual(r.severity, WARNING)
|
||||
self.assertIn("hot", r.verdict.lower())
|
||||
|
||||
def test_gpu_lost_is_critical(self):
|
||||
ticks = [_tick(temp=70), _tick(lost=True)]
|
||||
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
|
||||
self.assertEqual(r.severity, CRITICAL)
|
||||
self.assertTrue(r.gpu_lost)
|
||||
|
||||
def test_journal_fault_is_critical(self):
|
||||
r = stress.summarize([_tick(temp=70)], load="x", interval=1.0,
|
||||
faults=["NVIDIA Xid 79 ×1"])
|
||||
self.assertEqual(r.severity, CRITICAL)
|
||||
self.assertIn("Xid 79", r.verdict)
|
||||
|
||||
def test_no_telemetry_is_info(self):
|
||||
r = stress.summarize([_tick()], load="monitor-only", interval=1.0, faults=[])
|
||||
self.assertEqual(r.severity, "info")
|
||||
self.assertIsNone(r.peak_temp)
|
||||
|
||||
|
||||
class ThrottleDecodeTests(unittest.TestCase):
|
||||
def test_throttle_bits_map_to_reasons(self):
|
||||
# the constants used by _throttle_state decode the NVML active-reasons bitmask
|
||||
self.assertIn("HW thermal slowdown", stress._THROTTLE_BITS.values())
|
||||
self.assertIn("SW thermal slowdown", stress._THROTTLE_BITS.values())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user