Compare commits

..

3 Commits

Author SHA1 Message Date
jessey 8f4824f576 chore(release): v0.43.0
tests / core (pull_request) Successful in 13s
tests / gui-smoke (pull_request) Successful in 31s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:59:13 +02:00
jessey edc2166011 feat(health): GPU stress monitor + per-drive SMART health/wear
Two diagnostics for the load-correlated GPU crashes and for storage wear.

GPU stress (`rigdoctor stress` + a System Health "Stress test…" dialog): drive a GPU
load and sample sensors at high rate, then report per-metric min/avg/peak, time spent
above each temp threshold, power vs limit, throttling (decoded from the NVML
clocks-event bitmask), and any GPU fault (Xid / VA-space freeze / query-timeout hang)
in the window. Load source: explicit --command, an auto-detected loader, or
monitor-only (you launch the game). Analysis is a pure, unit-tested function.

Drive health (core/drives.py): parse full `smartctl --json` per drive into prioritized
findings — SMART verdict, derived life-left % (NVMe percentage_used or SATA
wear-leveling), power-on hours, TBW, temperature, and failure predictors
(reallocated/pending/offline sectors, NVMe media errors, low spare). Replaces the old
pass/fail-only check_smart; runs through the same elevated path (collect-priv / sudo),
degrading to "needs root" notes unprivileged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:59:06 +02:00
jessey 31ecf67ca7 fix(games): let the GUI Add-game dialog link a launcher & log folder
The "Add game…" button only prompted for a name (single-field QInputDialog), so a
custom game couldn't be given its launch command or log dir from the GUI. Replace it
with a proper dialog: name + an optional launch command/script (with a file browser)
+ an optional log folder (auto-detected from the script's folder when left blank).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:58:56 +02:00
13 changed files with 1077 additions and 60 deletions
+25
View File
@@ -5,6 +5,31 @@ All notable changes to RigDoctor are recorded here. Format follows
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
release tag (so the auto-updater, D18, can compare versions).
## [0.43.0] - 2026-05-29
### Added
- **GPU stress test + close thermal monitoring** (`rigdoctor stress`, and a "Stress test…" button
on System Health). Runs a GPU load and samples sensors at a high rate (default 0.5 s), then
reports per-metric min/avg/**peak**, how long the core spent above each temperature threshold,
power vs the limit, throttling (decoded from the NVML clocks-event bitmask), and any GPU **fault**
(Xid / VA-space freeze / a query-timeout hang) that hit during the window — the on-demand way to
reproduce load-correlated crashes. The load comes from an explicit `--command` (a game or a tool
like gpu-burn), an auto-detected loader (gpu-burn/vkmark/glmark2/vkcube), or **monitor-only** when
none is found (you launch the game; it tracks temps while you play).
- **Drive health & wear in the health report.** A new `core/drives.py` parses the full
`smartctl --json` for every drive into prioritized findings: the SMART verdict, a derived
**life-left %** (NVMe `percentage_used` or the SATA wear-leveling attribute), **power-on hours**,
data written (TBW), temperature, and the early-failure predictors (reallocated / pending /
offline-uncorrectable sectors, NVMe media errors, low available spare). Replaces the old
pass/fail-only SMART check; flows through the same elevated path (GUI launch / `sudo rigdoctor
report`), degrading to per-drive "needs root" notes unprivileged.
### Fixed
- **GUI "Add game…" can now link a launcher.** The dialog only asked for a name, so a custom
game (e.g. SPT) couldn't be given its launch command or log folder from the app — those were
CLI-only, leaving it unlaunchable from the GUI. It's now a proper form: name + an optional
launch command/script (with a **Browse…** file picker) + an optional log folder (auto-detected
from the script's folder when left blank).
## [0.42.0] - 2026-05-29
### Added
- **Detect hard freezes that log no Xid.** The kernel-log scanner caught Xid codes, OOM, panic,
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "rigdoctor"
version = "0.42.0"
version = "0.43.0"
description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
readme = "README.md"
requires-python = ">=3.11"
+1 -1
View File
@@ -1,3 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.42.0"
__version__ = "0.43.0"
+51 -2
View File
@@ -298,10 +298,10 @@ def cmd_collect_priv(args) -> int:
"""Internal: emit root-only data (SMART + dmidecode) as JSON, run via pkexec at launch."""
from dataclasses import asdict
from .core.health import check_smart
from .core import drives
from .core.inventory import _dmidecode
data = {"smart": [asdict(f) for f in check_smart()], "dmidecode": _dmidecode()}
data = {"drives": [asdict(d) for d in drives.collect()], "dmidecode": _dmidecode()}
print(json.dumps(data))
return 0
@@ -640,6 +640,47 @@ def cmd_games_remove(args) -> int:
return 1
def cmd_stress(args) -> int:
import shlex as _shlex
from .core import stress
from .render import format_raw, render_stress
command = _shlex.split(args.command) if args.command else None
if not args.json:
loaders = stress.available_loaders()
if command:
print(f"Stressing with: {' '.join(command)}")
elif loaders:
print(f"Stressing with auto-detected loader: {loaders[0]}")
else:
print("No GPU load tool found and no --command given — MONITOR-ONLY mode.")
print(f" Launch the game/app now; I'll closely track temps for up to {int(args.duration)}s.")
print(f" Sampling every {args.interval:g}s. Press Ctrl-C to stop early.\n")
def _tick(sample, elapsed) -> None:
by = {r.key: r for r in sample.readings}
bits = [f"{elapsed:5.0f}s"]
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
("gpu.util", "util"), ("gpu.clock.core", "clk")):
r = by.get(key)
if r is not None and r.value is not None:
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
print(" " + " ".join(bits) + " ", end="\r", flush=True)
result = stress.run(duration=args.duration, interval=args.interval, command=command,
on_tick=None if args.json else _tick)
if not args.json:
print() # end the live line
if args.json:
from dataclasses import asdict
print(json.dumps(asdict(result), indent=2, ensure_ascii=False))
else:
print(render_stress(result))
return 0 if result.severity in ("ok", "info") else 1
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="rigdoctor",
@@ -657,6 +698,14 @@ def build_parser() -> argparse.ArgumentParser:
mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI")
mp.set_defaults(func=cmd_monitor)
st = sub.add_parser("stress", help="GPU stress + close thermal monitoring (repro load crashes)")
st.add_argument("-d", "--duration", type=float, default=120.0, help="run for this many seconds (default 120)")
st.add_argument("-n", "--interval", type=float, default=0.5, help="sampling interval in seconds (default 0.5)")
st.add_argument("--command", default=None,
help="load generator to run (e.g. a game or 'gpu-burn 60'); omit to auto-detect or monitor-only")
st.add_argument("--json", action="store_true", help="output JSON")
st.set_defaults(func=cmd_stress)
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
+229
View File
@@ -0,0 +1,229 @@
"""Drive health & wear (M-drives): per-disk SMART stats parsed from smartctl JSON.
Unlike a GPU, storage exposes a real health/wear story, so this reads it in full: the overall
SMART verdict, a derived **life-left %** (NVMe ``percentage_used`` or the SATA wear-leveling
attribute), **power-on hours** (the drive's runtime), data written (TBW), temperature, and the
early-failure predictors (reallocated / pending / offline-uncorrectable sectors, NVMe media
errors, available spare). Turned into prioritized health findings.
smartctl needs root, so collection runs through the same elevated path as the other root-only
checks (``rigdoctor collect-priv`` via pkexec at GUI launch, or ``sudo rigdoctor report``).
Parsing is JSON-based (smartctl ``--json``), which is stable across drive types. Stdlib only;
degrades gracefully — no smartctl, no root, or an unparseable device yields an info finding.
"""
from __future__ import annotations
import json
import shutil
import subprocess
from dataclasses import dataclass
from .health import CRITICAL, INFO, OK, WARNING, Finding
# NVMe writes are counted in 512-KB "data units"; 1 unit = 1000 * 512 bytes.
_NVME_UNIT_BYTES = 512_000
_LBA_BYTES = 512 # SATA Total_LBAs_Written counts 512-byte sectors
@dataclass
class DriveHealth:
device: str
model: str = ""
kind: str = "" # "nvme" | "sata" | "scsi"
passed: bool | None = None # SMART overall verdict; None if unknown / needs root
needs_root: bool = False
health_pct: int | None = None # derived life-left %
percent_used: int | None = None # NVMe wear used %
power_on_hours: int | None = None
temp_c: int | None = None
data_written_tb: float | None = None
reallocated: int | None = None # SATA reallocated sectors (id 5)
pending: int | None = None # SATA current-pending sectors (id 197)
offline_uncorrectable: int | None = None # SATA id 198
available_spare: int | None = None # NVMe %
available_spare_threshold: int | None = None
media_errors: int | None = None # NVMe
# --- collection (root) ----------------------------------------------------------------
def _scan_devices() -> list[str]:
try:
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
except (subprocess.SubprocessError, OSError):
return []
return [ln.split()[0] for ln in proc.stdout.splitlines() if ln.strip().startswith("/dev/")]
def _smartctl_json(device: str) -> dict | None:
try:
proc = subprocess.run(
["smartctl", "--json=c", "-H", "-A", "-i", device],
capture_output=True, text=True, timeout=20,
)
except (subprocess.SubprocessError, OSError):
return None
try:
return json.loads(proc.stdout)
except (ValueError, TypeError):
return None
def _ata_attr(data: dict, attr_id: int) -> int | None:
for row in data.get("ata_smart_attributes", {}).get("table", []):
if row.get("id") == attr_id:
raw = row.get("raw", {})
return raw.get("value")
return None
def _ata_norm_value(data: dict, attr_id: int) -> int | None:
"""The normalized 'value' (100→0 life indicator) for an ATA attribute."""
for row in data.get("ata_smart_attributes", {}).get("table", []):
if row.get("id") == attr_id:
return row.get("value")
return None
def parse(device: str, data: dict | None) -> DriveHealth:
"""Build a DriveHealth from smartctl JSON (pure-ish; no IO of its own)."""
d = DriveHealth(device=device)
if not data:
d.needs_root = True
return d
d.model = data.get("model_name") or data.get("scsi_model_name") or ""
proto = (data.get("device", {}).get("protocol") or "").lower()
d.kind = "nvme" if "nvme" in proto else ("sata" if "ata" in proto else (proto or ""))
status = data.get("smart_status")
if isinstance(status, dict) and "passed" in status:
d.passed = bool(status["passed"])
else:
# No verdict and a non-zero exit usually means we couldn't open the device (needs root).
if data.get("smartctl", {}).get("exit_status", 0) and not status:
d.needs_root = True
temp = data.get("temperature", {}).get("current")
d.temp_c = int(temp) if isinstance(temp, (int, float)) else None
poh = data.get("power_on_time", {}).get("hours")
d.power_on_hours = int(poh) if isinstance(poh, (int, float)) else None
if d.kind == "nvme":
log = data.get("nvme_smart_health_information_log", {})
d.percent_used = log.get("percentage_used")
d.available_spare = log.get("available_spare")
d.available_spare_threshold = log.get("available_spare_threshold")
d.media_errors = log.get("media_errors")
if d.temp_c is None and isinstance(log.get("temperature"), (int, float)):
d.temp_c = int(log["temperature"])
units = log.get("data_units_written")
if isinstance(units, (int, float)):
d.data_written_tb = round(units * _NVME_UNIT_BYTES / 1e12, 2)
if isinstance(d.percent_used, (int, float)):
d.health_pct = max(0, 100 - int(d.percent_used))
else: # SATA / ATA
d.reallocated = _ata_attr(data, 5)
d.pending = _ata_attr(data, 197)
d.offline_uncorrectable = _ata_attr(data, 198)
lbas = _ata_attr(data, 241) # Total_LBAs_Written
if isinstance(lbas, (int, float)) and lbas > 0:
d.data_written_tb = round(lbas * _LBA_BYTES / 1e12, 2)
wear = _ata_norm_value(data, 177) # Wear_Leveling_Count (Samsung): normalized = life left
if wear is None:
wear = _ata_norm_value(data, 231) # SSD_Life_Left on some drives
if isinstance(wear, int):
d.health_pct = wear
return d
def collect() -> list[DriveHealth]:
"""Per-drive health for every SMART-capable device (needs root for real data)."""
if shutil.which("smartctl") is None:
return []
return [parse(dev, _smartctl_json(dev)) for dev in _scan_devices()]
def from_dicts(rows: list[dict]) -> list[DriveHealth]:
"""Rebuild DriveHealth objects from the privileged collector's JSON."""
out: list[DriveHealth] = []
for r in rows:
if isinstance(r, dict) and r.get("device"):
fields = {k: r.get(k) for k in DriveHealth.__dataclass_fields__}
out.append(DriveHealth(**fields))
return out
# --- findings -------------------------------------------------------------------------
def _stats_line(d: DriveHealth) -> str:
parts: list[str] = []
if d.health_pct is not None:
parts.append(f"{d.health_pct}% life left")
elif d.percent_used is not None:
parts.append(f"{d.percent_used}% used")
if d.power_on_hours is not None:
parts.append(f"{d.power_on_hours:,} h powered on")
if d.data_written_tb is not None:
parts.append(f"{d.data_written_tb:g} TB written")
if d.temp_c is not None:
parts.append(f"{d.temp_c}°C")
if d.available_spare is not None:
parts.append(f"spare {d.available_spare}%")
return " · ".join(parts)
def to_findings(drives: list[DriveHealth]) -> list[Finding]:
if not drives:
if shutil.which("smartctl") is None:
return [Finding(INFO, "Storage", "SMART not checked (smartmontools missing)",
"Disk self-health couldn't be read.",
"Install it: `sudo apt install smartmontools`")]
return []
findings: list[Finding] = []
for d in drives:
name = d.model or d.device
if d.needs_root:
findings.append(Finding(INFO, "Storage", f"{name}: SMART needs root",
"Reading drive health requires elevated access.",
"Run: `sudo rigdoctor report` (or launch the GUI, which asks once)."))
continue
stats = _stats_line(d)
# Severity from the failure predictors, worst first.
bad = []
if d.passed is False:
bad.append("SMART overall self-assessment FAILED")
for label, val in (("reallocated sectors", d.reallocated),
("pending sectors", d.pending),
("offline-uncorrectable sectors", d.offline_uncorrectable),
("NVMe media errors", d.media_errors)):
if isinstance(val, int) and val > 0:
bad.append(f"{val} {label}")
spare_low = (isinstance(d.available_spare, int) and isinstance(d.available_spare_threshold, int)
and d.available_spare < d.available_spare_threshold)
worn = isinstance(d.percent_used, int) and d.percent_used >= 90
hot = isinstance(d.temp_c, int) and d.temp_c >= 70
if d.passed is False or bad:
findings.append(Finding(
CRITICAL, "Storage", f"{name}: failing ({stats})" if stats else f"{name}: failing",
"; ".join(bad) + ".",
"Back up this drive now and plan to replace it."))
elif spare_low or worn:
findings.append(Finding(
WARNING, "Storage", f"{name}: worn ({stats})",
("Available spare below the drive's threshold." if spare_low else
f"NVMe wear at {d.percent_used}% used — near end of rated life."),
"Back up important data and budget for a replacement."))
elif hot:
findings.append(Finding(
WARNING, "Storage", f"{name}: hot ({stats})",
f"Drive temperature is {d.temp_c}°C.",
"Improve case/M.2 airflow; sustained heat shortens SSD life."))
else:
findings.append(Finding(
OK, "Storage", f"{name}: healthy" + (f" ({stats})" if stats else ""),
"SMART self-assessment passed." if d.passed else ""))
return findings
+14 -48
View File
@@ -260,47 +260,19 @@ def check_nvidia_module() -> list[Finding]:
)]
def _smart_devices() -> list[str]:
try:
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
except (subprocess.SubprocessError, OSError):
return []
devices = []
for line in proc.stdout.splitlines():
line = line.strip()
if line.startswith("/dev/"):
devices.append(line.split()[0])
return devices
def check_drives() -> list[Finding]:
"""Per-drive SMART health + wear/runtime stats (see core/drives.py).
Uses the session's elevated collection when present (GUI launch / pkexec), else reads
smartctl directly — which only returns real data as root, so the unprivileged case yields
'needs root' info findings pointing at `sudo rigdoctor report`.
"""
from . import drives, elevation
def check_smart() -> list[Finding]:
if shutil.which("smartctl") is None:
return [Finding(
INFO, "Storage", "SMART not checked (smartmontools missing)",
"Disk self-health couldn't be read.",
"Install it for disk health checks: `sudo apt install smartmontools`",
)]
devices = _smart_devices()
if not devices:
return [Finding(
INFO, "Storage", "SMART: couldn't enumerate drives",
"Reading SMART usually needs root.",
"Run: `sudo rigdoctor report`",
)]
findings: list[Finding] = []
for dev in devices:
try:
proc = subprocess.run(["smartctl", "-H", dev], capture_output=True, text=True, timeout=15)
except (subprocess.SubprocessError, OSError):
continue
combined = proc.stdout + proc.stderr
if "Permission denied" in combined or "requires root" in combined.lower():
findings.append(Finding(INFO, "Storage", f"SMART for {dev} needs root", "", "Run: `sudo rigdoctor report`"))
elif "PASSED" in combined:
findings.append(Finding(OK, "Storage", f"SMART OK: {dev}", "Overall-health self-assessment passed."))
elif "FAILED" in combined or "FAILING_NOW" in combined:
findings.append(Finding(CRITICAL, "Storage", f"SMART FAILED: {dev}", "The drive reports failing health.", "Back up now and replace the drive."))
return findings
priv = elevation.privileged()
if priv is not None and priv.get("drives") is not None:
return drives.to_findings(drives.from_dicts(priv["drives"]))
return drives.to_findings(drives.collect())
def check_live_temps() -> list[Finding]:
@@ -398,25 +370,19 @@ def check_memory_speed() -> list[Finding]:
def run_health_checks(include_journal: bool = True) -> list[Finding]:
"""Run all checks and return findings sorted by severity (worst first).
SMART needs root; if the session collected it via launch elevation, use that
instead of re-running smartctl (which would just report "needs root").
Drive SMART and RAM speed need root; if the session collected them via launch elevation,
those checks use the cached data instead of re-running (which would just report "needs root").
`include_journal=False` skips the 7-day kernel-journal scan — used by the crash
analysis, which scans the previous (crashed) boot specifically instead.
"""
from . import elevation
findings: list[Finding] = []
findings += check_nvidia_driver()
findings += check_nvidia_module()
if include_journal:
findings += check_journal()
findings += check_journal_persistence()
priv = elevation.privileged()
if priv is not None and priv.get("smart") is not None:
findings += [Finding(**d) for d in priv["smart"]]
else:
findings += check_smart()
findings += check_drives()
findings += check_live_temps()
findings += check_pcie_links()
findings += check_displays()
+322
View File
@@ -0,0 +1,322 @@
"""GPU stress + close thermal monitoring — the repro tool for load-correlated crashes.
Run a GPU load and sample sensors at a high rate, then report peak/sustained temperatures,
how long the GPU spent above each temperature threshold, power headroom vs the limit, whether
it throttled, and any GPU fault (Xid / VA-space / a query timeout) that hit during the window.
This is the on-demand way to reproduce the "only under load / only certain games" freezes
instead of waiting for a game to trigger them.
The load comes from, in order: an explicit ``command`` (your game, or a loader like gpu-burn),
an auto-detected loader on PATH (gpu-burn / vkmark / glmark2 / vkcube), or **monitor-only** when
none is found — then you generate the load yourself (launch the game) while this closely tracks
temps for the duration.
Stdlib only. Degrades gracefully: no nvidia-smi → no GPU stats; a loader that won't start →
monitor-only with a note; missing journal access → no fault scan, just the telemetry.
"""
from __future__ import annotations
import shutil
import subprocess
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from . import health
from .sample import Sample
from .sampler import Sampler
from .sources import available_sources
# Default temperature dwell thresholds (°C). 83 is Ampere's typical thermal-throttle point;
# 90+ is hot; sustained 95+ on the core (or 100+ on GDDR6 memory) is a cooling problem.
DEFAULT_THRESHOLDS = (80, 85, 90, 95)
# Known GPU load generators, best (heaviest / most deterministic) first. argv builder takes the
# remaining duration so a self-terminating loader (gpu-burn) bounds itself; the windowed
# benchmarks loop until we kill them. None are required — detection is best-effort.
_LOADERS: list[tuple[str, Callable[[float], list[str]]]] = [
("gpu-burn", lambda secs: ["gpu-burn", str(max(1, int(secs)))]),
("vkmark", lambda _s: ["vkmark", "--run-forever"]),
("glmark2", lambda _s: ["glmark2", "--run-forever"]),
("vkcube", lambda _s: ["vkcube"]),
]
# NVML clocks-event bits that mean the clocks are being *held back* (a throttle), decoded from
# the active-reasons bitmask so we don't depend on per-field name differences across drivers.
_THROTTLE_BITS = {
0x008: "HW slowdown",
0x020: "SW thermal slowdown",
0x040: "HW thermal slowdown",
0x080: "HW power-brake slowdown",
}
_POWERCAP_BIT = 0x004 # hitting the power limit — expected under load, reported separately
@dataclass
class MetricStat:
key: str # e.g. "gpu.temp", "gpu.power", "gpu.clock.core"
label: str # human label for the report
unit: str
min: float
avg: float
max: float
samples: int
@dataclass
class _Tick:
dt: float # seconds this tick represents (for dwell-time weighting)
values: dict[str, float] # reading key -> value across all sources (Nones dropped)
throttle: list[str] # active throttle reasons this tick
power_capped: bool
lost: bool # query timeout / no GPU response this tick
@dataclass
class StressResult:
load: str # "command: …" | "auto: gpu-burn" | "monitor-only"
duration: float # seconds actually monitored
samples: int
interval: float
stats: list[MetricStat] = field(default_factory=list)
peak_temp: float | None = None
peak_mem_temp: float | None = None
avg_temp: float | None = None
time_above: dict[int, float] = field(default_factory=dict) # threshold °C -> seconds at/above
max_power: float | None = None
power_limit: float | None = None
power_capped: bool = False
throttled: bool = False
throttle_reasons: list[str] = field(default_factory=list)
gpu_lost: bool = False
faults: list[str] = field(default_factory=list) # Xid/VA-space titles in the window
aborted: bool = False # Ctrl-C or the load exited early
severity: str = health.OK
verdict: str = ""
# --- load resolution ------------------------------------------------------------------
def available_loaders() -> list[str]:
"""Known GPU load tools found on PATH (heaviest first)."""
return [name for name, _ in _LOADERS if shutil.which(name)]
def _start_load(command: list[str] | None, duration: float) -> tuple[subprocess.Popen | None, str]:
"""Start the load process and return (proc, description). proc is None for monitor-only."""
if command:
try:
proc = subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return proc, "command: " + " ".join(command)
except (OSError, ValueError) as exc:
return None, f"monitor-only (command failed to start: {exc})"
for name, build in _LOADERS:
if shutil.which(name):
try:
proc = subprocess.Popen(build(duration), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return proc, f"auto: {name}"
except (OSError, ValueError):
continue
return None, "monitor-only"
def _stop_load(proc: subprocess.Popen | None) -> None:
if proc is None or proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
# --- throttle / fault probes ----------------------------------------------------------
def _throttle_state() -> tuple[list[str], bool]:
"""(active throttle reasons, power-capped) decoded from the clocks-event bitmask."""
if shutil.which("nvidia-smi") is None:
return [], False
raw = ""
for field_name in ("clocks_event_reasons.active", "clocks_throttle_reasons.active"):
try:
proc = subprocess.run(
["nvidia-smi", f"--query-gpu={field_name}", "--format=csv,noheader"],
capture_output=True, text=True, timeout=5,
)
except (subprocess.SubprocessError, OSError):
continue
raw = proc.stdout.strip().splitlines()[0].strip() if proc.stdout.strip() else ""
if raw and raw.lower() not in ("n/a", "not supported", "[n/a]"):
break
try:
bits = int(raw, 16)
except ValueError:
return [], False
reasons = [name for bit, name in _THROTTLE_BITS.items() if bits & bit]
return reasons, bool(bits & _POWERCAP_BIT)
def _faults_since(start_ts: float) -> list[str]:
"""Titles of GPU/PCIe/hardware faults logged to the kernel journal since the run began."""
out = health._journalctl(["-k", "--no-pager", "-o", "cat", "--since", f"@{int(start_ts)}"])
if not out:
return []
return [f.title for f in health.scan_journal_text(out)
if f.category in ("GPU", "PCIe", "Hardware", "Kernel")]
def _tick_values(sample: Sample) -> tuple[dict[str, float], bool]:
"""Reading key -> value across all sources (Nones dropped), plus whether the GPU
failed to respond (an nvidia-smi query timeout — a hang/lost signal)."""
values: dict[str, float] = {}
lost = False
for r in sample.readings:
if r.source == "gpu" and r.metric == "status" and r.label == "query-timeout":
lost = True
if r.value is not None:
values[r.key] = r.value
return values, lost
# --- pure analysis (unit-testable, no IO) ---------------------------------------------
_REPORT_KEYS = {
"gpu.temp": ("GPU core temp", "°C"),
"gpu.temp.memory": ("GPU memory temp", "°C"),
"gpu.power": ("GPU power", "W"),
"gpu.util": ("GPU utilization", "%"),
"gpu.mem_util": ("VRAM controller util", "%"),
"gpu.clock.core": ("Core clock", "MHz"),
"gpu.clock.memory": ("Memory clock", "MHz"),
"gpu.fan": ("Fan", "%"),
"gpu.mem_used": ("VRAM used", "MiB"),
"cpu.temp": ("CPU temp", "°C"),
}
def summarize(ticks: list[_Tick], *, load: str, interval: float, faults: list[str],
thresholds=DEFAULT_THRESHOLDS) -> StressResult:
"""Build a StressResult from collected ticks — pure, so it's tested with synthetic input."""
duration = sum(t.dt for t in ticks)
result = StressResult(load=load, duration=round(duration, 1), samples=len(ticks),
interval=interval, faults=faults)
series: dict[str, list[float]] = {}
throttle_seen: set[str] = set()
time_above = {th: 0.0 for th in thresholds}
for t in ticks:
for key, value in t.values.items():
series.setdefault(key, []).append(value)
throttle_seen.update(t.throttle)
if t.power_capped:
result.power_capped = True
if t.lost:
result.gpu_lost = True
core = t.values.get("gpu.temp")
if core is not None:
for th in thresholds:
if core >= th:
time_above[th] += t.dt
for key, (label, unit) in _REPORT_KEYS.items():
vals = series.get(key)
if not vals:
continue
stat = MetricStat(key, label, unit, round(min(vals), 1),
round(sum(vals) / len(vals), 1), round(max(vals), 1), len(vals))
result.stats.append(stat)
if key == "gpu.temp":
result.peak_temp, result.avg_temp = stat.max, stat.avg
elif key == "gpu.temp.memory":
result.peak_mem_temp = stat.max
elif key == "gpu.power":
result.max_power = stat.max
# power_limit isn't a reported metric (it's ~constant); pull it from the raw series.
if "gpu.power_limit" in series:
result.power_limit = max(series["gpu.power_limit"])
result.throttle_reasons = sorted(throttle_seen)
result.throttled = bool(throttle_seen)
result.time_above = {th: round(secs, 1) for th, secs in time_above.items() if secs > 0}
_verdict(result)
return result
def _verdict(r: StressResult) -> None:
"""Set severity + a plain-language conclusion from the gathered signals."""
peak = f"{r.peak_temp:.0f}°C" if r.peak_temp is not None else "?"
if r.gpu_lost or any(t for t in r.faults):
r.severity = health.CRITICAL
cause = "; ".join(r.faults) if r.faults else "the GPU stopped responding (query timeout)"
r.verdict = (f"GPU fault during the stress run: {cause}. This reproduces the crash under "
f"load — capture/keep these logs. Peak core temp {peak}.")
return
if r.throttled:
r.severity = health.WARNING
r.verdict = (f"Thermal/HW throttling detected ({', '.join(r.throttle_reasons)}) — the GPU "
f"held clocks back to stay safe. Peak core temp {peak}. Improve cooling/airflow.")
return
if r.peak_temp is not None and r.peak_temp >= 90:
r.severity = health.WARNING
r.verdict = (f"No fault, but the core peaked at {peak} — hot. Watch GDDR6/VRM cooling; "
"sustained high temps shorten the card's life and precede instability.")
return
if r.peak_temp is None:
r.severity = health.INFO
r.verdict = "No GPU telemetry was captured (nvidia-smi unavailable?)."
return
capped = " (power-limited — hitting the cap, which is normal)" if r.power_capped else ""
r.verdict = f"Stable: peaked at {peak} with no faults or throttling{capped}."
# --- the run loop (IO) ----------------------------------------------------------------
def run(duration: float = 120.0, interval: float = 0.5, command: list[str] | None = None,
thresholds=DEFAULT_THRESHOLDS, on_tick: Callable[[Sample, float], None] | None = None,
should_stop: Callable[[], bool] | None = None) -> StressResult:
"""Drive a GPU load for ``duration`` seconds, sampling every ``interval``, and report.
Stops early on Ctrl-C, if a GPU query times out (likely hang), if the load process exits, or
when ``should_stop()`` returns True (the GUI's Stop button). ``on_tick(sample, elapsed)`` is
called each tick for live display.
"""
sampler = Sampler(available_sources())
proc, load_desc = _start_load(command, duration)
start = time.monotonic()
start_ts = time.time()
ticks: list[_Tick] = []
last = start
aborted = False
try:
while True:
sample = sampler.sample()
now = time.monotonic()
dt = now - last
last = now
values, lost = _tick_values(sample)
reasons, capped = _throttle_state()
ticks.append(_Tick(dt=dt, values=values, throttle=reasons, power_capped=capped, lost=lost))
if on_tick is not None:
on_tick(sample, now - start)
if lost: # GPU stopped responding — stop now, it may be hung/lost
break
if should_stop is not None and should_stop(): # GUI Stop button
aborted = True
break
if proc is not None and proc.poll() is not None: # the load finished/exited
break
if (now - start) >= duration:
break
time.sleep(max(0.0, interval - (time.monotonic() - now)))
except KeyboardInterrupt:
aborted = True
finally:
_stop_load(proc)
faults = _faults_since(start_ts)
result = summarize(ticks, load=load_desc, interval=interval, faults=faults, thresholds=thresholds)
result.aborted = aborted or (proc is not None and command is not None and result.duration < duration - interval)
return result
+67 -8
View File
@@ -430,19 +430,78 @@ class GamesPage(QWidget):
self._banner.hide()
def _add_custom_game(self) -> None:
"""Manually add a game no launcher reports (e.g. SPT), then rescan to show it."""
from PySide6.QtWidgets import QInputDialog
"""Manually add a game no launcher reports (e.g. SPT): name + an optional launch
command/script (so it can be launched under crash-capture) and log folder."""
from ..core import customgames
name, ok = QInputDialog.getText(
self, "Add game", "Game name (e.g. SPT) — for titles no launcher reports:")
if not ok:
dlg = QDialog(self)
dlg.setWindowTitle("Add game")
dlg.setMinimumWidth(560)
v = QVBoxLayout(dlg)
v.setContentsMargins(20, 18, 20, 16)
v.setSpacing(10)
intro = QLabel(
"Add a game no launcher reports — a standalone mod launcher like SPT, an itch.io "
"download, or any hand-installed game.")
intro.setWordWrap(True)
v.addWidget(intro)
name_edit = QLineEdit()
name_edit.setPlaceholderText("SPT")
v.addWidget(QLabel("Game name"))
v.addWidget(name_edit)
cmd_edit = QLineEdit()
cmd_edit.setPlaceholderText("e.g. /run/media/.../Escape-From-Tarkov/tarkov.sh")
cmd_row = QHBoxLayout()
cmd_row.addWidget(cmd_edit, 1)
cmd_browse = QPushButton("Browse…")
cmd_row.addWidget(cmd_browse, 0)
v.addWidget(QLabel("Launch command / script (optional — enables launch + auto-capture)"))
v.addLayout(cmd_row)
log_edit = QLineEdit()
log_edit.setPlaceholderText("auto-detected from the script's folder (its logs/ subfolder)")
log_row = QHBoxLayout()
log_row.addWidget(log_edit, 1)
log_browse = QPushButton("Browse…")
log_row.addWidget(log_browse, 0)
v.addWidget(QLabel("Log folder (optional — read into crash diagnostics)"))
v.addLayout(log_row)
def _pick_command() -> None:
path, _ = QFileDialog.getOpenFileName(dlg, "Select the launch script/executable")
if path:
cmd_edit.setText(path)
def _pick_logdir() -> None:
path = QFileDialog.getExistingDirectory(dlg, "Select the game's log folder")
if path:
log_edit.setText(path)
cmd_browse.clicked.connect(_pick_command)
log_browse.clicked.connect(_pick_logdir)
buttons = QHBoxLayout()
buttons.addStretch(1)
cancel = QPushButton("Cancel")
cancel.clicked.connect(dlg.reject)
buttons.addWidget(cancel)
add = QPushButton("Add")
add.setObjectName("PrimaryButton")
add.setDefault(True)
add.clicked.connect(dlg.accept)
buttons.addWidget(add)
v.addLayout(buttons)
if dlg.exec() != QDialog.DialogCode.Accepted:
return
name = name.strip()
name = name_edit.text().strip()
if not name:
return
if customgames.add(name):
if customgames.add(name, command=cmd_edit.text().strip() or None,
logdir=log_edit.text().strip() or None):
self.refresh()
else:
QMessageBox.information(self, "Add game", f"'{name}' is already in your games.")
+8
View File
@@ -39,6 +39,9 @@ class HealthPage(QWidget):
self._status = QLabel("")
self._status.setObjectName("Muted")
header.addWidget(self._status)
self._stress_btn = QPushButton("Stress test…")
self._stress_btn.clicked.connect(self._open_stress)
header.addWidget(self._stress_btn)
self._run_btn = QPushButton("Run health report")
self._run_btn.setObjectName("PrimaryButton")
self._run_btn.clicked.connect(self._run)
@@ -59,6 +62,11 @@ class HealthPage(QWidget):
QTimer.singleShot(300, self._run) # auto-run shortly after the window opens
def _open_stress(self) -> None:
from .stress_dialog import StressDialog
StressDialog(self).exec()
def _run(self) -> None:
self._run_btn.setEnabled(False)
self._status.setText("Scanning logs, SMART, and driver…")
+157
View File
@@ -0,0 +1,157 @@
"""GPU stress + thermal-monitor dialog (GUI front-end for core/stress.py).
Runs the stress monitor in a background thread, streams a live one-line readout, and shows the
rendered result (telemetry stats + verdict) when it finishes. A Stop button ends the run early
via a cooperative flag; closing the dialog mid-run stops it too.
"""
from __future__ import annotations
import threading
from PySide6.QtCore import Qt, Signal
from PySide6.QtGui import QFont
from PySide6.QtWidgets import (
QDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
QTextEdit,
QVBoxLayout,
)
class StressDialog(QDialog):
_tick = Signal(str) # live one-line readout (worker thread -> GUI)
_done = Signal(object) # stress.StressResult when the run finishes
def __init__(self, parent=None) -> None:
super().__init__(parent)
self._stop = threading.Event()
self._running = False
self._tick.connect(self._on_tick)
self._done.connect(self._on_done)
self.setWindowTitle("GPU stress + thermal monitor")
self.resize(640, 460)
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 16)
root.setSpacing(12)
intro = QLabel(
"Run a GPU load and closely watch temps. Reports peak/sustained temps, time spent "
"hot, throttling, and any GPU fault (Xid / driver freeze) during the run.")
intro.setWordWrap(True)
root.addWidget(intro)
from ..core import stress
loaders = stress.available_loaders()
self._mode = QLabel(
f"Load tool detected: {loaders[0]} — it'll drive the load." if loaders else
"No GPU load tool installed → MONITOR-ONLY: start this, then launch your game; "
"it tracks temps while you play. (Or give a command below.)")
self._mode.setObjectName("Muted")
self._mode.setWordWrap(True)
root.addWidget(self._mode)
form = QHBoxLayout()
form.addWidget(QLabel("Duration (s):"))
self._duration = QSpinBox()
self._duration.setRange(5, 3600)
self._duration.setValue(120)
form.addWidget(self._duration)
form.addSpacing(12)
form.addWidget(QLabel("Command (optional):"))
self._command = QLineEdit()
self._command.setPlaceholderText("e.g. /…/tarkov.sh or gpu-burn 60")
form.addWidget(self._command, 1)
root.addLayout(form)
self._live = QLabel("")
self._live.setFont(QFont("monospace"))
self._live.setStyleSheet("background: #0d0f13; color: #cfd3da; border: 1px solid #2a2f39; "
"border-radius: 8px; padding: 8px;")
root.addWidget(self._live)
self._report = QTextEdit()
self._report.setReadOnly(True)
self._report.setFont(QFont("monospace"))
self._report.setVisible(False)
root.addWidget(self._report, 1)
buttons = QHBoxLayout()
buttons.addStretch(1)
self._stop_btn = QPushButton("Stop")
self._stop_btn.setEnabled(False)
self._stop_btn.clicked.connect(self._on_stop)
buttons.addWidget(self._stop_btn)
self._start_btn = QPushButton("Start")
self._start_btn.setObjectName("PrimaryButton")
self._start_btn.clicked.connect(self._on_start)
buttons.addWidget(self._start_btn)
root.addLayout(buttons)
def _on_start(self) -> None:
if self._running:
return
self._running = True
self._stop.clear()
self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True)
self._report.setVisible(False)
self._live.setText("starting…")
duration = float(self._duration.value())
command_text = self._command.text().strip()
threading.Thread(target=self._work, args=(duration, command_text), daemon=True).start()
def _work(self, duration: float, command_text: str) -> None:
import shlex
from ..core import stress
command = shlex.split(command_text) if command_text else None
def _tick(sample, elapsed) -> None:
by = {r.key: r for r in sample.readings}
from ..render import format_raw
bits = [f"{elapsed:5.0f}s"]
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
("gpu.util", "util"), ("gpu.clock.core", "clk"),
("gpu.temp.memory", "vram")):
r = by.get(key)
if r is not None and r.value is not None:
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
self._tick.emit(" ".join(bits))
try:
result = stress.run(duration=duration, interval=0.5, command=command,
on_tick=_tick, should_stop=self._stop.is_set)
except Exception as exc: # never let a worker crash take down the dialog
result = exc
self._done.emit(result)
def _on_tick(self, text: str) -> None:
self._live.setText(text)
def _on_done(self, result) -> None:
from ..render import render_stress
self._running = False
self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False)
if isinstance(result, Exception):
self._report.setPlainText(f"Stress run failed: {result}")
else:
self._report.setPlainText(render_stress(result))
self._report.setVisible(True)
def _on_stop(self) -> None:
self._stop.set()
self._stop_btn.setEnabled(False)
self._live.setText("stopping…")
def closeEvent(self, event) -> None: # stop the run if the dialog is closed mid-flight
self._stop.set()
super().closeEvent(event)
+26
View File
@@ -118,6 +118,32 @@ def render_health(findings: list, title: str = "Health report") -> str:
return "\n".join(lines).rstrip()
def render_stress(result) -> str:
"""Render a stress.StressResult: telemetry stats, temp dwell time, and the verdict."""
lines = ["GPU stress + thermal monitor", ""]
lines.append(f" Load : {result.load}")
lines.append(f" Duration : {_fmt_duration(result.duration)} · {result.samples} samples "
f"@ {result.interval:g}s" + (" (stopped early)" if result.aborted else ""))
if result.stats:
lines += ["", f" {'Metric':<22}{'min':>12}{'avg':>12}{'max':>12}"]
for s in result.stats:
u = s.unit
lines.append(f" {s.label:<22}{format_raw(s.min, u):>12}{format_raw(s.avg, u):>12}"
f"{format_raw(s.max, u):>12}")
if result.time_above:
spans = " ".join(f"{th}°C: {_fmt_duration(secs)}" for th, secs in sorted(result.time_above.items()))
lines += ["", f" Time at temp (core): {spans}"]
if result.max_power is not None and result.power_limit:
cap = " — hit the power cap" if result.power_capped else ""
lines.append(f" Power peak: {result.max_power:.0f} W of {result.power_limit:.0f} W limit{cap}")
if result.throttle_reasons:
lines.append(f" Throttling: {', '.join(result.throttle_reasons)}")
if result.faults:
lines.append(f" Faults : {'; '.join(result.faults)}")
lines += ["", f"[{_SEV_LABEL.get(result.severity, '?')}] {result.verdict}"]
return "\n".join(lines)
def render_summary(summary: Summary, log_path=None) -> str:
if summary.samples == 0 and not summary.events:
where = f" ({log_path})" if log_path else ""
+99
View File
@@ -0,0 +1,99 @@
"""Tests for drive health parsing & findings (synthetic smartctl JSON)."""
import unittest
from dataclasses import asdict
from rigdoctor.core import drives
from rigdoctor.core.health import CRITICAL, INFO, OK, WARNING
_NVME_OK = {
"model_name": "Samsung SSD 980 PRO 1TB",
"device": {"protocol": "NVMe"},
"smart_status": {"passed": True},
"temperature": {"current": 41},
"power_on_time": {"hours": 1234},
"nvme_smart_health_information_log": {
"percentage_used": 3, "available_spare": 100, "available_spare_threshold": 10,
"media_errors": 0, "data_units_written": 200_000_000, # ~102 TB
},
}
_NVME_WORN = {
"model_name": "Worn NVMe",
"device": {"protocol": "NVMe"},
"smart_status": {"passed": True},
"nvme_smart_health_information_log": {"percentage_used": 96, "available_spare": 100,
"available_spare_threshold": 10},
}
_SATA_FAILING = {
"model_name": "Samsung SSD 870 QVO 1TB",
"device": {"protocol": "ATA"},
"smart_status": {"passed": False},
"temperature": {"current": 35},
"power_on_time": {"hours": 5000},
"ata_smart_attributes": {"table": [
{"id": 5, "name": "Reallocated_Sector_Ct", "value": 80, "raw": {"value": 12}},
{"id": 177, "name": "Wear_Leveling_Count", "value": 88, "raw": {"value": 300}},
{"id": 241, "name": "Total_LBAs_Written", "value": 99, "raw": {"value": 2_000_000_000}},
]},
}
class ParseTests(unittest.TestCase):
def test_nvme_parse(self):
d = drives.parse("/dev/nvme0", _NVME_OK)
self.assertEqual(d.kind, "nvme")
self.assertTrue(d.passed)
self.assertEqual(d.percent_used, 3)
self.assertEqual(d.health_pct, 97) # 100 - percentage_used
self.assertEqual(d.power_on_hours, 1234)
self.assertEqual(d.temp_c, 41)
self.assertAlmostEqual(d.data_written_tb, 102.4, places=1)
def test_sata_parse(self):
d = drives.parse("/dev/sda", _SATA_FAILING)
self.assertEqual(d.kind, "sata")
self.assertFalse(d.passed)
self.assertEqual(d.reallocated, 12) # raw value
self.assertEqual(d.health_pct, 88) # normalized wear-leveling value
self.assertAlmostEqual(d.data_written_tb, 1.02, places=1)
def test_needs_root_when_no_data(self):
d = drives.parse("/dev/sda", None)
self.assertTrue(d.needs_root)
def test_roundtrip_through_dicts(self):
d = drives.parse("/dev/nvme0", _NVME_OK)
back = drives.from_dicts([asdict(d)])
self.assertEqual(len(back), 1)
self.assertEqual(back[0].model, d.model)
self.assertEqual(back[0].health_pct, d.health_pct)
class FindingTests(unittest.TestCase):
def test_healthy_nvme_is_ok_with_stats(self):
f = drives.to_findings([drives.parse("/dev/nvme0", _NVME_OK)])[0]
self.assertEqual(f.severity, OK)
self.assertIn("97% life left", f.title)
self.assertIn("1,234 h", f.title)
def test_failing_sata_is_critical(self):
f = drives.to_findings([drives.parse("/dev/sda", _SATA_FAILING)])[0]
self.assertEqual(f.severity, CRITICAL)
self.assertIn("FAILED", f.detail)
self.assertIn("reallocated sectors", f.detail)
def test_worn_nvme_is_warning(self):
f = drives.to_findings([drives.parse("/dev/nvme1", _NVME_WORN)])[0]
self.assertEqual(f.severity, WARNING)
self.assertIn("worn", f.title)
def test_needs_root_is_info(self):
f = drives.to_findings([drives.parse("/dev/sda", None)])[0]
self.assertEqual(f.severity, INFO)
self.assertIn("needs root", f.title)
if __name__ == "__main__":
unittest.main()
+77
View File
@@ -0,0 +1,77 @@
"""Tests for the GPU stress + thermal-monitor analysis (synthetic ticks, no real GPU)."""
import unittest
from rigdoctor.core import stress
from rigdoctor.core.health import CRITICAL, OK, WARNING
def _tick(temp=None, power=None, throttle=(), capped=False, lost=False, dt=1.0, **extra):
values = {}
if temp is not None:
values["gpu.temp"] = temp
if power is not None:
values["gpu.power"] = power
values.update(extra)
return stress._Tick(dt=dt, values=values, throttle=list(throttle), power_capped=capped, lost=lost)
class SummarizeTests(unittest.TestCase):
def test_stable_run_is_ok(self):
ticks = [_tick(temp=t, power=200, **{"gpu.power_limit": 280}) for t in (60, 65, 70, 72)]
r = stress.summarize(ticks, load="monitor-only", interval=1.0, faults=[])
self.assertEqual(r.severity, OK)
self.assertEqual(r.peak_temp, 72)
self.assertEqual(r.max_power, 200)
self.assertEqual(r.power_limit, 280)
self.assertFalse(r.throttled)
self.assertIn("Stable", r.verdict)
def test_dwell_time_above_thresholds(self):
# 3 ticks of 2s each at 82/86/92 °C → ≥80 for all 6s, ≥85 for 4s, ≥90 for 2s.
ticks = [_tick(temp=82, dt=2.0), _tick(temp=86, dt=2.0), _tick(temp=92, dt=2.0)]
r = stress.summarize(ticks, load="x", interval=2.0, faults=[])
self.assertEqual(r.time_above[80], 6.0)
self.assertEqual(r.time_above[85], 4.0)
self.assertEqual(r.time_above[90], 2.0)
self.assertNotIn(95, r.time_above) # never reached → omitted
def test_throttling_is_a_warning(self):
ticks = [_tick(temp=88, throttle=["HW thermal slowdown"])]
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, WARNING)
self.assertTrue(r.throttled)
self.assertIn("HW thermal slowdown", r.throttle_reasons)
def test_high_temp_without_throttle_is_a_warning(self):
r = stress.summarize([_tick(temp=93)], load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, WARNING)
self.assertIn("hot", r.verdict.lower())
def test_gpu_lost_is_critical(self):
ticks = [_tick(temp=70), _tick(lost=True)]
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, CRITICAL)
self.assertTrue(r.gpu_lost)
def test_journal_fault_is_critical(self):
r = stress.summarize([_tick(temp=70)], load="x", interval=1.0,
faults=["NVIDIA Xid 79 ×1"])
self.assertEqual(r.severity, CRITICAL)
self.assertIn("Xid 79", r.verdict)
def test_no_telemetry_is_info(self):
r = stress.summarize([_tick()], load="monitor-only", interval=1.0, faults=[])
self.assertEqual(r.severity, "info")
self.assertIsNone(r.peak_temp)
class ThrottleDecodeTests(unittest.TestCase):
def test_throttle_bits_map_to_reasons(self):
# the constants used by _throttle_state decode the NVML active-reasons bitmask
self.assertIn("HW thermal slowdown", stress._THROTTLE_BITS.values())
self.assertIn("SW thermal slowdown", stress._THROTTLE_BITS.values())
if __name__ == "__main__":
unittest.main()