Compare commits

...

10 Commits

Author SHA1 Message Date
jessey 5996fbdc30 Merge pull request 'fix(games): let the GUI Add-game dialog link a launcher & log folder' (#47) from feat/gpu-stress-and-drive-health into main
release / test (push) Successful in 12s
release / release (push) Successful in 19s
Reviewed-on: #47
2026-05-29 14:59:59 +00:00
jessey 8f4824f576 chore(release): v0.43.0
tests / core (pull_request) Successful in 13s
tests / gui-smoke (pull_request) Successful in 31s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:59:13 +02:00
jessey edc2166011 feat(health): GPU stress monitor + per-drive SMART health/wear
Two diagnostics for the load-correlated GPU crashes and for storage wear.

GPU stress (`rigdoctor stress` + a System Health "Stress test…" dialog): drive a GPU
load and sample sensors at high rate, then report per-metric min/avg/peak, time spent
above each temp threshold, power vs limit, throttling (decoded from the NVML
clocks-event bitmask), and any GPU fault (Xid / VA-space freeze / query-timeout hang)
in the window. Load source: explicit --command, an auto-detected loader, or
monitor-only (you launch the game). Analysis is a pure, unit-tested function.

Drive health (core/drives.py): parse full `smartctl --json` per drive into prioritized
findings — SMART verdict, derived life-left % (NVMe percentage_used or SATA
wear-leveling), power-on hours, TBW, temperature, and failure predictors
(reallocated/pending/offline sectors, NVMe media errors, low spare). Replaces the old
pass/fail-only check_smart; runs through the same elevated path (collect-priv / sudo),
degrading to "needs root" notes unprivileged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:59:06 +02:00
jessey 31ecf67ca7 fix(games): let the GUI Add-game dialog link a launcher & log folder
The "Add game…" button only prompted for a name (single-field QInputDialog), so a
custom game couldn't be given its launch command or log dir from the GUI. Replace it
with a proper dialog: name + an optional launch command/script (with a file browser)
+ an optional log folder (auto-detected from the script's folder when left blank).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:58:56 +02:00
jessey ac4863b0d4 Merge pull request 'feat(health): detect no-Xid GPU freezes (open-module VA-space faults)' (#46) from feat/gpu-vaspace-spt into main
release / test (push) Successful in 13s
release / release (push) Successful in 17s
Reviewed-on: #46
2026-05-29 14:10:58 +00:00
jessey b65f36bb2d Merge branch 'main' into feat/gpu-vaspace-spt
tests / core (pull_request) Successful in 12s
tests / gui-smoke (pull_request) Successful in 29s
2026-05-29 14:10:01 +00:00
jessey 0f9cb4b684 chore(release): v0.42.0
tests / core (pull_request) Successful in 17s
tests / gui-smoke (pull_request) Successful in 29s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:09:02 +02:00
jessey b9bfec961c feat(games): manually add games (e.g. SPT) with launch + own logs
Some titles never show up in a Steam/Lutris/Heroic scan — standalone mod
launchers like SPT (Single-Player Tarkov), itch.io downloads, hand-installed
executables. Add a user-authored custom-games list (core/customgames.py) shown
alongside the other sources in `rigdoctor games` and the GUI.

Each entry can carry a launch command and a log directory:
  - `rigdoctor games add "SPT" --command .../tarkov.sh` (logs/ auto-detected)
  - `rigdoctor games play "SPT"` launches it under the crash-capture wrapper
    (wrap.run gains an explicit game-name override, since there's no SteamAppId)
  - the diagnostic now feeds the game's own logs to the analysis: gamelogs
    .collect(game=...) tails the registered log dir (SPT's server/launcher logs)
    alongside the kernel log, freshness-scoped by mtime.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:07:25 +02:00
jessey b1bc961b79 feat(health): detect no-Xid GPU freezes (open-module VA-space faults)
The kernel-log scanner only caught Xid codes, OOM, panic, MCE, AER, thermal,
and amdgpu resets — so a hard freeze that logs NO Xid slipped through entirely.
Add detection for the NVIDIA open-kernel-module VA-space mapping fault
(gpu_vaspace.c / dmaAllocMapping / NVKMS GEM-allocation failures), which can
storm for minutes and end in a freeze without the GPU ever "falling off the
bus". Also flag when the open kernel module (nvidia-*-open) is loaded — the
context behind these faults — and add an AI-knowledge entry so the assistant
distinguishes it from the Xid 79 hardware drop.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:07:14 +02:00
jessey 410f8882ee Merge pull request 'feat(ai): import & analyze Windows crash dumps (.dmp) — 0.41.0' (#45) from feat/ram-speed into main
release / test (push) Successful in 12s
release / release (push) Successful in 14s
Reviewed-on: #45
2026-05-25 16:41:03 +00:00
23 changed files with 1557 additions and 59 deletions
+45
View File
@@ -5,6 +5,51 @@ All notable changes to RigDoctor are recorded here. Format follows
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git (`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
release tag (so the auto-updater, D18, can compare versions). release tag (so the auto-updater, D18, can compare versions).
## [0.43.0] - 2026-05-29
### Added
- **GPU stress test + close thermal monitoring** (`rigdoctor stress`, and a "Stress test…" button
on System Health). Runs a GPU load and samples sensors at a high rate (default 0.5 s), then
reports per-metric min/avg/**peak**, how long the core spent above each temperature threshold,
power vs the limit, throttling (decoded from the NVML clocks-event bitmask), and any GPU **fault**
(Xid / VA-space freeze / a query-timeout hang) that hit during the window — the on-demand way to
reproduce load-correlated crashes. The load comes from an explicit `--command` (a game or a tool
like gpu-burn), an auto-detected loader (gpu-burn/vkmark/glmark2/vkcube), or **monitor-only** when
none is found (you launch the game; it tracks temps while you play).
- **Drive health & wear in the health report.** A new `core/drives.py` parses the full
`smartctl --json` for every drive into prioritized findings: the SMART verdict, a derived
**life-left %** (NVMe `percentage_used` or the SATA wear-leveling attribute), **power-on hours**,
data written (TBW), temperature, and the early-failure predictors (reallocated / pending /
offline-uncorrectable sectors, NVMe media errors, low available spare). Replaces the old
pass/fail-only SMART check; flows through the same elevated path (GUI launch / `sudo rigdoctor
report`), degrading to per-drive "needs root" notes unprivileged.
### Fixed
- **GUI "Add game…" can now link a launcher.** The dialog only asked for a name, so a custom
game (e.g. SPT) couldn't be given its launch command or log folder from the app — those were
CLI-only, leaving it unlaunchable from the GUI. It's now a proper form: name + an optional
launch command/script (with a **Browse…** file picker) + an optional log folder (auto-detected
from the script's folder when left blank).
## [0.42.0] - 2026-05-29
### Added
- **Detect hard freezes that log no Xid.** The kernel-log scanner caught Xid codes, OOM, panic,
MCE, PCIe AER, thermal events, and amdgpu resets — but a crash that logs *no* Xid slipped
through. It now flags the NVIDIA open-kernel-module **VA-space mapping fault** (`gpu_vaspace.c`
/ `dmaAllocMapping` assertions, NVKMS GEM-allocation failures) — a driver-internal error that
can storm for minutes and end in a freeze without the GPU ever "falling off the bus" (distinct
from Xid 79). A new `check_nvidia_module()` notes when the open module (`nvidia-*-open`) is
loaded — the context behind these faults — and a new `ai_knowledge` entry lets the assistant
tell the no-Xid freeze apart from the Xid 79 hardware drop.
- **Add games no launcher reports (e.g. SPT).** A user-authored custom-games list
(`core/customgames.py`) shows alongside Steam/Lutris/Heroic in `rigdoctor games` and the GUI
("Add game…"), for standalone mod launchers (Single-Player Tarkov), itch.io downloads, or any
hand-installed game. Each entry can carry a launch command and a log directory:
`rigdoctor games add "SPT" --command .../tarkov.sh` (a sibling `logs/` is auto-detected),
`rigdoctor games play "SPT"` launches it under the crash-capture wrapper (tagged with the real
name, not the script's), and the diagnostic now tails the game's *own* logs — SPT's
server/launcher logs — alongside the kernel log so the analysis sees what the game logged
before the freeze.
## [0.41.0] - 2026-05-25 ## [0.41.0] - 2026-05-25
### Added ### Added
- **Import a crash dump (`.dmp`) and explain it with AI.** The **Games** page gains an - **Import a crash dump (`.dmp`) and explain it with AI.** The **Games** page gains an
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "rigdoctor" name = "rigdoctor"
version = "0.41.0" version = "0.43.0"
description = "Modular hardware monitoring & crash diagnostics for Linux gamers." description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
+1 -1
View File
@@ -1,3 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers.""" """RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.41.0" __version__ = "0.43.0"
+111 -4
View File
@@ -298,10 +298,10 @@ def cmd_collect_priv(args) -> int:
"""Internal: emit root-only data (SMART + dmidecode) as JSON, run via pkexec at launch.""" """Internal: emit root-only data (SMART + dmidecode) as JSON, run via pkexec at launch."""
from dataclasses import asdict from dataclasses import asdict
from .core.health import check_smart from .core import drives
from .core.inventory import _dmidecode from .core.inventory import _dmidecode
data = {"smart": [asdict(f) for f in check_smart()], "dmidecode": _dmidecode()} data = {"drives": [asdict(d) for d in drives.collect()], "dmidecode": _dmidecode()}
print(json.dumps(data)) print(json.dumps(data))
return 0 return 0
@@ -525,13 +525,13 @@ def cmd_gameenv(args) -> int:
def cmd_games(args) -> int: def cmd_games(args) -> int:
from dataclasses import asdict from dataclasses import asdict
from .core import launchers, steam from .core import customgames, launchers, steam
selected = steam.selected_library_paths() selected = steam.selected_library_paths()
result = steam.rescan() if selected else None result = steam.rescan() if selected else None
steam_games = result.games if result else [] steam_games = result.games if result else []
extra = launchers.scan() # non-Steam (Lutris/Heroic) extra = launchers.scan() # non-Steam (Lutris/Heroic)
all_games = list(steam_games) + list(extra) all_games = list(steam_games) + list(extra) + customgames.scan() # + user-added (SPT etc.)
if args.json: if args.json:
print(json.dumps({ print(json.dumps({
@@ -596,6 +596,91 @@ def cmd_games_libraries(args) -> int:
return 0 return 0
def cmd_games_add(args) -> int:
from .core import customgames
if customgames.add(args.name, command=args.command, logdir=args.logdir):
print(f"Added '{args.name}' to your games (custom). It'll show in `rigdoctor games` "
"and the diagnostic game picker.")
entry = customgames.get(args.name) or {}
if entry.get("command"):
print(f" launch: {entry['command']} (run with: rigdoctor games play \"{args.name}\")")
if entry.get("logdir"):
print(f" logs: {entry['logdir']} (included in crash diagnostics)")
return 0
print(f"'{args.name}' is blank or already in your custom games.")
return 1
def cmd_games_play(args) -> int:
from .core import customgames, wrap
command = customgames.command(args.name)
if command is None:
if customgames.get(args.name) is None:
print(f"'{args.name}' isn't in your custom games. Add it: "
f"rigdoctor games add \"{args.name}\" --command <launch script>")
else:
print(f"'{args.name}' has no launch command. Set one: "
f"rigdoctor games remove \"{args.name}\" && rigdoctor games add \"{args.name}\" "
"--command <launch script>")
return 1
print(f"Launching '{args.name}' with crash-capture… (capture stops cleanly on exit; "
"a hard freeze is flagged next time you open RigDoctor)")
return wrap.run(command, game=args.name)
def cmd_games_remove(args) -> int:
from .core import customgames
if customgames.remove(args.name):
print(f"Removed '{args.name}' from your custom games.")
return 0
print(f"'{args.name}' isn't in your custom games. Current: {', '.join(customgames.names()) or '(none)'}")
return 1
def cmd_stress(args) -> int:
import shlex as _shlex
from .core import stress
from .render import format_raw, render_stress
command = _shlex.split(args.command) if args.command else None
if not args.json:
loaders = stress.available_loaders()
if command:
print(f"Stressing with: {' '.join(command)}")
elif loaders:
print(f"Stressing with auto-detected loader: {loaders[0]}")
else:
print("No GPU load tool found and no --command given — MONITOR-ONLY mode.")
print(f" Launch the game/app now; I'll closely track temps for up to {int(args.duration)}s.")
print(f" Sampling every {args.interval:g}s. Press Ctrl-C to stop early.\n")
def _tick(sample, elapsed) -> None:
by = {r.key: r for r in sample.readings}
bits = [f"{elapsed:5.0f}s"]
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
("gpu.util", "util"), ("gpu.clock.core", "clk")):
r = by.get(key)
if r is not None and r.value is not None:
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
print(" " + " ".join(bits) + " ", end="\r", flush=True)
result = stress.run(duration=args.duration, interval=args.interval, command=command,
on_tick=None if args.json else _tick)
if not args.json:
print() # end the live line
if args.json:
from dataclasses import asdict
print(json.dumps(asdict(result), indent=2, ensure_ascii=False))
else:
print(render_stress(result))
return 0 if result.severity in ("ok", "info") else 1
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser( p = argparse.ArgumentParser(
prog="rigdoctor", prog="rigdoctor",
@@ -613,6 +698,14 @@ def build_parser() -> argparse.ArgumentParser:
mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI") mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI")
mp.set_defaults(func=cmd_monitor) mp.set_defaults(func=cmd_monitor)
st = sub.add_parser("stress", help="GPU stress + close thermal monitoring (repro load crashes)")
st.add_argument("-d", "--duration", type=float, default=120.0, help="run for this many seconds (default 120)")
st.add_argument("-n", "--interval", type=float, default=0.5, help="sampling interval in seconds (default 0.5)")
st.add_argument("--command", default=None,
help="load generator to run (e.g. a game or 'gpu-burn 60'); omit to auto-detect or monitor-only")
st.add_argument("--json", action="store_true", help="output JSON")
st.set_defaults(func=cmd_stress)
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui) sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources) sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
@@ -681,6 +774,20 @@ def build_parser() -> argparse.ArgumentParser:
lib_p.add_argument("--json", action="store_true", help="output JSON") lib_p.add_argument("--json", action="store_true", help="output JSON")
lib_p.set_defaults(func=cmd_games_libraries) lib_p.set_defaults(func=cmd_games_libraries)
add_p = games_sub.add_parser("add", help="add a game no launcher reports (e.g. SPT)")
add_p.add_argument("name", help="game name, e.g. \"SPT\"")
add_p.add_argument("--command", default=None,
help="launch command/script (e.g. the path to tarkov.sh) — enables `games play`")
add_p.add_argument("--logdir", default=None,
help="the game's own log directory (auto-detected as <command dir>/logs if present)")
add_p.set_defaults(func=cmd_games_add)
play_p = games_sub.add_parser("play", help="launch a custom game with crash-capture (e.g. SPT)")
play_p.add_argument("name", help="game name to launch")
play_p.set_defaults(func=cmd_games_play)
rm_p = games_sub.add_parser("remove", help="remove a previously added custom game")
rm_p.add_argument("name", help="game name to remove")
rm_p.set_defaults(func=cmd_games_remove)
env_p = sub.add_parser("gameenv", help="gaming environment checks (M6): flag stability/perf settings") env_p = sub.add_parser("gameenv", help="gaming environment checks (M6): flag stability/perf settings")
env_p.add_argument("--json", action="store_true", help="output JSON instead of text") env_p.add_argument("--json", action="store_true", help="output JSON instead of text")
env_p.set_defaults(func=cmd_gameenv) env_p.set_defaults(func=cmd_gameenv)
+3
View File
@@ -36,6 +36,9 @@ SPAWN_LOG = STATE_DIR / "recorder.out"
# Gaming environment / game detection (M6) — cached Steam game scan (mutable state, # Gaming environment / game detection (M6) — cached Steam game scan (mutable state,
# not config: refreshed by the background scan on every launch). # not config: refreshed by the background scan on every launch).
GAMES_FILE = STATE_DIR / "games.json" GAMES_FILE = STATE_DIR / "games.json"
# User-added games that no launcher reports (e.g. SPT/standalone mod launchers). Authored
# by the user (not a refreshable cache), so it lives in DATA_DIR and persists across scans.
CUSTOM_GAMES_FILE = DATA_DIR / "custom-games.json"
# Logging & reports (opt-in via `logging_enabled`). App log: rotating file of app events. # Logging & reports (opt-in via `logging_enabled`). App log: rotating file of app events.
# Each diagnostic is stored under DIAGNOSTICS_DIR/<id>/; "Report" zips one into REPORTS_DIR. # Each diagnostic is stored under DIAGNOSTICS_DIR/<id>/; "Report" zips one into REPORTS_DIR.
+8
View File
@@ -30,6 +30,14 @@ ENTRIES: list[tuple[tuple[str, ...], str]] = [
(("xid 8", "xid 62", "xid 63", "xid 64"), (("xid 8", "xid 62", "xid 63", "xid 64"),
"These Xid codes commonly indicate VRAM/ECC or memory-training problems — suspect failing " "These Xid codes commonly indicate VRAM/ECC or memory-training problems — suspect failing "
"VRAM or an unstable memory overclock."), "VRAM or an unstable memory overclock."),
(("va-space mapping", "gpu_vaspace", "dmaallocmapping", "nvkms memory for gem",
"open kernel module", "nvidia open"),
"NVIDIA open-kernel-module VA-space mapping errors (gpu_vaspace.c / dmaAllocMapping / "
"'Failed to allocate NVKMS memory for GEM object') are a driver-internal fault on the open "
"module (nvidia-*-open). They can storm for minutes and end in a HARD FREEZE with NO Xid "
"logged — so the GPU never 'falls off the bus', and this is distinct from the Xid 79 "
"hardware drop. Fix path: switch from the open to the proprietary NVIDIA kernel module and "
"update to the latest driver branch."),
(("smart 197", "current_pending_sector", "pending sector"), (("smart 197", "current_pending_sector", "pending sector"),
"SMART 197 (Current Pending Sector) > 0 = sectors the drive can't read and is waiting to " "SMART 197 (Current Pending Sector) > 0 = sectors the drive can't read and is waiting to "
"reallocate — early sign of a failing disk. Back up now and run an extended self-test."), "reallocate — early sign of a failing disk. Back up now and run an extended self-test."),
+113
View File
@@ -0,0 +1,113 @@
"""User-added games (M6): a manual list for titles no launcher reports.
Some games never show up in a Steam/Lutris/Heroic scan — standalone mod launchers like
**SPT** (Single-Player Tarkov), itch.io downloads, or any hand-installed executable. This
module keeps a small user-authored list so those still appear in the game list and can be
picked for a focused diagnostic, in the same `steam.Game` shape as every other source.
Each entry is a name plus two optionals: a **launch command** (so `rigdoctor games play`
can start it under the auto-capture wrapper) and a **log directory** (so a crash diagnostic
can read the game's own logs — e.g. SPT's `logs/tarkov-latest.log`). Stored as JSON in
`config.CUSTOM_GAMES_FILE`; stdlib only; every reader degrades to [] on a missing/bad file.
"""
from __future__ import annotations
import json
import os
import shlex
from .. import config
from .steam import Game
LAUNCHER = "custom"
def _load() -> list[dict]:
try:
data = json.loads(config.CUSTOM_GAMES_FILE.read_text())
except (OSError, ValueError):
return []
games = data.get("games") if isinstance(data, dict) else None
return [g for g in games if isinstance(g, dict) and g.get("name")] if isinstance(games, list) else []
def _save(games: list[dict]) -> None:
config.CUSTOM_GAMES_FILE.parent.mkdir(parents=True, exist_ok=True)
config.CUSTOM_GAMES_FILE.write_text(json.dumps({"games": games}, indent=2, ensure_ascii=False) + "\n")
def names() -> list[str]:
"""Just the stored names (insertion order preserved)."""
return [str(g["name"]) for g in _load()]
def get(name: str) -> dict | None:
"""The stored entry (name + optional command/logdir) for a game, case-insensitive."""
name = (name or "").strip().lower()
return next((g for g in _load() if str(g["name"]).lower() == name), None)
def add(name: str, command: str | None = None, logdir: str | None = None) -> bool:
"""Add a game by name, with an optional launch command and log directory.
Returns False if the name is blank or already present (case-insensitive). When a command
is given but no logdir, a sibling `logs/` dir is inferred if it exists (covers SPT's layout).
"""
name = (name or "").strip()
if not name:
return False
if get(name):
return False
entry: dict = {"name": name}
command = (command or "").strip()
if command:
entry["command"] = command
if not logdir:
sibling = os.path.join(os.path.dirname(_argv0(command)), "logs")
if os.path.isdir(sibling):
logdir = sibling
logdir = (logdir or "").strip()
if logdir:
entry["logdir"] = os.path.expanduser(logdir)
games = _load()
games.append(entry)
_save(games)
return True
def remove(name: str) -> bool:
"""Remove a game by name (case-insensitive). Returns True if one was removed."""
name = (name or "").strip().lower()
games = _load()
kept = [g for g in games if str(g["name"]).lower() != name]
if len(kept) == len(games):
return False
_save(kept)
return True
def _argv0(command: str) -> str:
parts = shlex.split(command)
return parts[0] if parts else command
def command(name: str) -> list[str] | None:
"""The launch argv for a game (shlex-split), or None if it has no command."""
entry = get(name)
cmd = (entry or {}).get("command")
return shlex.split(cmd) if cmd else None
def log_dir(name: str) -> str | None:
"""The game's own log directory, or None if it isn't set / doesn't exist."""
entry = get(name)
path = (entry or {}).get("logdir")
return path if path and os.path.isdir(path) else None
def scan() -> list[Game]:
"""User-added games as `Game` objects (launcher='custom'), sorted by name."""
out = [Game(appid="", name=str(g["name"]), library="", installdir="", launcher=LAUNCHER)
for g in _load()]
return sorted(out, key=lambda g: g.name.lower())
+1 -1
View File
@@ -75,7 +75,7 @@ def store(result, capture_path=None, since: float | None = None) -> Path | None:
_write(target / "report.txt", "\n".join(report)) _write(target / "report.txt", "\n".join(report))
try: try:
logs = gamelogs.collect(since=since) logs = gamelogs.collect(since=since, game=getattr(result, "game", None))
if logs: if logs:
_write(target / "gamelogs.txt", logs) _write(target / "gamelogs.txt", logs)
except OSError: except OSError:
+229
View File
@@ -0,0 +1,229 @@
"""Drive health & wear (M-drives): per-disk SMART stats parsed from smartctl JSON.
Unlike a GPU, storage exposes a real health/wear story, so this reads it in full: the overall
SMART verdict, a derived **life-left %** (NVMe ``percentage_used`` or the SATA wear-leveling
attribute), **power-on hours** (the drive's runtime), data written (TBW), temperature, and the
early-failure predictors (reallocated / pending / offline-uncorrectable sectors, NVMe media
errors, available spare). Turned into prioritized health findings.
smartctl needs root, so collection runs through the same elevated path as the other root-only
checks (``rigdoctor collect-priv`` via pkexec at GUI launch, or ``sudo rigdoctor report``).
Parsing is JSON-based (smartctl ``--json``), which is stable across drive types. Stdlib only;
degrades gracefully — no smartctl, no root, or an unparseable device yields an info finding.
"""
from __future__ import annotations
import json
import shutil
import subprocess
from dataclasses import dataclass
from .health import CRITICAL, INFO, OK, WARNING, Finding
# NVMe writes are counted in 512-KB "data units"; 1 unit = 1000 * 512 bytes.
_NVME_UNIT_BYTES = 512_000
_LBA_BYTES = 512 # SATA Total_LBAs_Written counts 512-byte sectors
@dataclass
class DriveHealth:
device: str
model: str = ""
kind: str = "" # "nvme" | "sata" | "scsi"
passed: bool | None = None # SMART overall verdict; None if unknown / needs root
needs_root: bool = False
health_pct: int | None = None # derived life-left %
percent_used: int | None = None # NVMe wear used %
power_on_hours: int | None = None
temp_c: int | None = None
data_written_tb: float | None = None
reallocated: int | None = None # SATA reallocated sectors (id 5)
pending: int | None = None # SATA current-pending sectors (id 197)
offline_uncorrectable: int | None = None # SATA id 198
available_spare: int | None = None # NVMe %
available_spare_threshold: int | None = None
media_errors: int | None = None # NVMe
# --- collection (root) ----------------------------------------------------------------
def _scan_devices() -> list[str]:
try:
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10)
except (subprocess.SubprocessError, OSError):
return []
return [ln.split()[0] for ln in proc.stdout.splitlines() if ln.strip().startswith("/dev/")]
def _smartctl_json(device: str) -> dict | None:
try:
proc = subprocess.run(
["smartctl", "--json=c", "-H", "-A", "-i", device],
capture_output=True, text=True, timeout=20,
)
except (subprocess.SubprocessError, OSError):
return None
try:
return json.loads(proc.stdout)
except (ValueError, TypeError):
return None
def _ata_attr(data: dict, attr_id: int) -> int | None:
for row in data.get("ata_smart_attributes", {}).get("table", []):
if row.get("id") == attr_id:
raw = row.get("raw", {})
return raw.get("value")
return None
def _ata_norm_value(data: dict, attr_id: int) -> int | None:
"""The normalized 'value' (100→0 life indicator) for an ATA attribute."""
for row in data.get("ata_smart_attributes", {}).get("table", []):
if row.get("id") == attr_id:
return row.get("value")
return None
def parse(device: str, data: dict | None) -> DriveHealth:
"""Build a DriveHealth from smartctl JSON (pure-ish; no IO of its own)."""
d = DriveHealth(device=device)
if not data:
d.needs_root = True
return d
d.model = data.get("model_name") or data.get("scsi_model_name") or ""
proto = (data.get("device", {}).get("protocol") or "").lower()
d.kind = "nvme" if "nvme" in proto else ("sata" if "ata" in proto else (proto or ""))
status = data.get("smart_status")
if isinstance(status, dict) and "passed" in status:
d.passed = bool(status["passed"])
else:
# No verdict and a non-zero exit usually means we couldn't open the device (needs root).
if data.get("smartctl", {}).get("exit_status", 0) and not status:
d.needs_root = True
temp = data.get("temperature", {}).get("current")
d.temp_c = int(temp) if isinstance(temp, (int, float)) else None
poh = data.get("power_on_time", {}).get("hours")
d.power_on_hours = int(poh) if isinstance(poh, (int, float)) else None
if d.kind == "nvme":
log = data.get("nvme_smart_health_information_log", {})
d.percent_used = log.get("percentage_used")
d.available_spare = log.get("available_spare")
d.available_spare_threshold = log.get("available_spare_threshold")
d.media_errors = log.get("media_errors")
if d.temp_c is None and isinstance(log.get("temperature"), (int, float)):
d.temp_c = int(log["temperature"])
units = log.get("data_units_written")
if isinstance(units, (int, float)):
d.data_written_tb = round(units * _NVME_UNIT_BYTES / 1e12, 2)
if isinstance(d.percent_used, (int, float)):
d.health_pct = max(0, 100 - int(d.percent_used))
else: # SATA / ATA
d.reallocated = _ata_attr(data, 5)
d.pending = _ata_attr(data, 197)
d.offline_uncorrectable = _ata_attr(data, 198)
lbas = _ata_attr(data, 241) # Total_LBAs_Written
if isinstance(lbas, (int, float)) and lbas > 0:
d.data_written_tb = round(lbas * _LBA_BYTES / 1e12, 2)
wear = _ata_norm_value(data, 177) # Wear_Leveling_Count (Samsung): normalized = life left
if wear is None:
wear = _ata_norm_value(data, 231) # SSD_Life_Left on some drives
if isinstance(wear, int):
d.health_pct = wear
return d
def collect() -> list[DriveHealth]:
"""Per-drive health for every SMART-capable device (needs root for real data)."""
if shutil.which("smartctl") is None:
return []
return [parse(dev, _smartctl_json(dev)) for dev in _scan_devices()]
def from_dicts(rows: list[dict]) -> list[DriveHealth]:
"""Rebuild DriveHealth objects from the privileged collector's JSON."""
out: list[DriveHealth] = []
for r in rows:
if isinstance(r, dict) and r.get("device"):
fields = {k: r.get(k) for k in DriveHealth.__dataclass_fields__}
out.append(DriveHealth(**fields))
return out
# --- findings -------------------------------------------------------------------------
def _stats_line(d: DriveHealth) -> str:
parts: list[str] = []
if d.health_pct is not None:
parts.append(f"{d.health_pct}% life left")
elif d.percent_used is not None:
parts.append(f"{d.percent_used}% used")
if d.power_on_hours is not None:
parts.append(f"{d.power_on_hours:,} h powered on")
if d.data_written_tb is not None:
parts.append(f"{d.data_written_tb:g} TB written")
if d.temp_c is not None:
parts.append(f"{d.temp_c}°C")
if d.available_spare is not None:
parts.append(f"spare {d.available_spare}%")
return " · ".join(parts)
def to_findings(drives: list[DriveHealth]) -> list[Finding]:
if not drives:
if shutil.which("smartctl") is None:
return [Finding(INFO, "Storage", "SMART not checked (smartmontools missing)",
"Disk self-health couldn't be read.",
"Install it: `sudo apt install smartmontools`")]
return []
findings: list[Finding] = []
for d in drives:
name = d.model or d.device
if d.needs_root:
findings.append(Finding(INFO, "Storage", f"{name}: SMART needs root",
"Reading drive health requires elevated access.",
"Run: `sudo rigdoctor report` (or launch the GUI, which asks once)."))
continue
stats = _stats_line(d)
# Severity from the failure predictors, worst first.
bad = []
if d.passed is False:
bad.append("SMART overall self-assessment FAILED")
for label, val in (("reallocated sectors", d.reallocated),
("pending sectors", d.pending),
("offline-uncorrectable sectors", d.offline_uncorrectable),
("NVMe media errors", d.media_errors)):
if isinstance(val, int) and val > 0:
bad.append(f"{val} {label}")
spare_low = (isinstance(d.available_spare, int) and isinstance(d.available_spare_threshold, int)
and d.available_spare < d.available_spare_threshold)
worn = isinstance(d.percent_used, int) and d.percent_used >= 90
hot = isinstance(d.temp_c, int) and d.temp_c >= 70
if d.passed is False or bad:
findings.append(Finding(
CRITICAL, "Storage", f"{name}: failing ({stats})" if stats else f"{name}: failing",
"; ".join(bad) + ".",
"Back up this drive now and plan to replace it."))
elif spare_low or worn:
findings.append(Finding(
WARNING, "Storage", f"{name}: worn ({stats})",
("Available spare below the drive's threshold." if spare_low else
f"NVMe wear at {d.percent_used}% used — near end of rated life."),
"Back up important data and budget for a replacement."))
elif hot:
findings.append(Finding(
WARNING, "Storage", f"{name}: hot ({stats})",
f"Drive temperature is {d.temp_c}°C.",
"Improve case/M.2 airflow; sustained heat shortens SSD life."))
else:
findings.append(Finding(
OK, "Storage", f"{name}: healthy" + (f" ({stats})" if stats else ""),
"SMART self-assessment passed." if d.passed else ""))
return findings
+35 -2
View File
@@ -81,15 +81,48 @@ def available() -> bool:
return bool(_proton_logs() or _steam_console()) return bool(_proton_logs() or _steam_console())
def collect(since: float | None = None, max_bytes: int = 8000) -> str: def _custom_game_logs(game: str, since: float | None, max_bytes: int) -> list[str]:
"""Recent Proton + Steam log tails as one labelled text block ('' if none). """Tail the recent ``*.log`` files in a custom game's own log dir (e.g. SPT's
``logs/tarkov-latest.log`` + ``server-latest.log``), newest first, freshness-scoped by mtime.
Custom-game logs use their own timestamp formats, so we scope by file mtime (like the Proton
log) rather than the ``[YYYY-MM-DD …]`` line filter used for the Steam console.
"""
from . import customgames
directory = customgames.log_dir(game)
if not directory:
return []
try:
files = [p for p in Path(directory).glob("*.log") if p.is_file()]
except OSError:
return []
files.sort(key=_mtime, reverse=True)
sections: list[str] = []
for log in files[:4]: # a session touches a handful (tarkov/server/launcher latest)
if since is not None and _mtime(log) < since:
continue
tail = _tail(log, max_bytes).strip()
if tail:
sections.append(f"--- {game} log ({log.name}) ---\n{tail}")
return sections
def collect(since: float | None = None, max_bytes: int = 8000, game: str | None = None) -> str:
"""Recent Proton + Steam (+ custom-game) log tails as one labelled text block ('' if none).
With ``since`` (epoch), scope to that session: skip a Proton log not written during/after With ``since`` (epoch), scope to that session: skip a Proton log not written during/after
the session (a stale per-app log from an earlier game), and keep only Steam-console lines the session (a stale per-app log from an earlier game), and keep only Steam-console lines
timestamped at/after ``since`` — so we don't feed the model an unrelated past session. timestamped at/after ``since`` — so we don't feed the model an unrelated past session.
``game`` (the diagnostic's focused title) pulls in that custom game's own logs if it has a
registered log dir — e.g. SPT's server/launcher logs, which Steam/Proton never see.
""" """
sections: list[str] = [] sections: list[str] = []
if game:
sections += _custom_game_logs(game, since, max_bytes)
protons = _proton_logs() protons = _proton_logs()
if protons: if protons:
log = protons[0] log = protons[0]
+84 -45
View File
@@ -116,6 +116,31 @@ def scan_journal_text(text: str) -> list[Finding]:
"Check power/thermals/driver; capture a session with `rigdoctor record`.", "Check power/thermals/driver; capture a session with `rigdoctor record`.",
)) ))
# NVIDIA open-kernel-module VA-space mapping faults: a driver-internal failure that can
# storm for minutes and end in a HARD FREEZE with NO Xid logged — the GPU never "falls off
# the bus", so the Xid scan above misses it entirely. These code paths live in the open
# kernel module (nvidia-*-open); the proprietary module doesn't hit them.
nvrm_va = [
ln for ln in lines
if "gpu_vaspace.c" in ln
or "_gvaspaceMappingInsert" in ln
or "dmaAllocMapping" in ln
or "NVKMS memory for GEM object" in ln
]
if nvrm_va:
findings.append(Finding(
WARNING, "GPU", f"NVIDIA driver VA-space mapping errors ×{len(nvrm_va)}",
"The NVIDIA kernel module repeatedly failed to update the GPU's virtual address "
"space (gpu_vaspace / dmaAllocMapping assertions, NVKMS GEM-allocation failures). "
"This is a driver-internal fault that can recur for minutes and end in a hard freeze "
"with NO Xid logged — distinct from an Xid 79 hardware drop. These code paths are "
"specific to the open kernel module (nvidia-*-open).",
"If you're on the open module, switch to the proprietary NVIDIA driver "
"(install `nvidia-driver-###` instead of the `…-open` variant) and update to the "
"latest branch, then reboot. Capture a session with `rigdoctor record` to confirm "
"the errors precede the freeze.",
))
return findings return findings
@@ -188,47 +213,66 @@ def check_nvidia_driver() -> list[Finding]:
return [] return []
def _smart_devices() -> list[str]: def _read_text(path: str) -> str | None:
try: try:
proc = subprocess.run(["smartctl", "--scan"], capture_output=True, text=True, timeout=10) return Path(path).read_text()
except (subprocess.SubprocessError, OSError): except OSError:
return [] return None
devices = []
for line in proc.stdout.splitlines():
line = line.strip()
if line.startswith("/dev/"):
devices.append(line.split()[0])
return devices
def check_smart() -> list[Finding]: def _nvidia_module_is_open() -> bool | None:
if shutil.which("smartctl") is None: """Whether the *loaded* NVIDIA kernel module is the open-source flavor.
return [Finding(
INFO, "Storage", "SMART not checked (smartmontools missing)", True = open (nvidia-*-open), False = proprietary, None = can't tell / no NVIDIA module.
"Disk self-health couldn't be read.", /proc is authoritative for the loaded module and needs no external tool; modinfo's filename
"Install it for disk health checks: `sudo apt install smartmontools`", (…/nvidia-###-open/nvidia.ko) is the fallback.
)] """
devices = _smart_devices() proc = _read_text("/proc/driver/nvidia/version")
if not devices: if proc:
return [Finding( low = proc.lower()
INFO, "Storage", "SMART: couldn't enumerate drives", if "open kernel module" in low:
"Reading SMART usually needs root.", return True
"Run: `sudo rigdoctor report`", if "kernel module" in low: # proprietary banner: "NVIDIA UNIX … Kernel Module …"
)] return False
findings: list[Finding] = [] if shutil.which("modinfo"):
for dev in devices:
try: try:
proc = subprocess.run(["smartctl", "-H", dev], capture_output=True, text=True, timeout=15) out = subprocess.run(["modinfo", "nvidia"], capture_output=True, text=True, timeout=10).stdout
except (subprocess.SubprocessError, OSError): except (subprocess.SubprocessError, OSError):
continue out = ""
combined = proc.stdout + proc.stderr for line in out.splitlines():
if "Permission denied" in combined or "requires root" in combined.lower(): if line.startswith("filename:"):
findings.append(Finding(INFO, "Storage", f"SMART for {dev} needs root", "", "Run: `sudo rigdoctor report`")) return "-open" in line
elif "PASSED" in combined: return None
findings.append(Finding(OK, "Storage", f"SMART OK: {dev}", "Overall-health self-assessment passed."))
elif "FAILED" in combined or "FAILING_NOW" in combined:
findings.append(Finding(CRITICAL, "Storage", f"SMART FAILED: {dev}", "The drive reports failing health.", "Back up now and replace the drive.")) def check_nvidia_module() -> list[Finding]:
return findings """Note when the open-source NVIDIA kernel module is loaded — the context behind the no-Xid
VA-space freeze signature, which lives in the open module's code paths (suggestion-only)."""
if _nvidia_module_is_open() is not True:
return []
return [Finding(
INFO, "Driver", "NVIDIA open kernel module in use",
"The loaded NVIDIA driver is the open-source kernel module (nvidia-*-open). It's fine for "
"most setups, but on some GeForce cards it hits driver-internal faults (VA-space mapping "
"errors, hard freezes with no Xid) that the proprietary module doesn't.",
"If you get unexplained hard freezes with no Xid in the logs, try the proprietary NVIDIA "
"driver (`nvidia-driver-###` rather than the `…-open` variant) on the latest branch.",
)]
def check_drives() -> list[Finding]:
"""Per-drive SMART health + wear/runtime stats (see core/drives.py).
Uses the session's elevated collection when present (GUI launch / pkexec), else reads
smartctl directly — which only returns real data as root, so the unprivileged case yields
'needs root' info findings pointing at `sudo rigdoctor report`.
"""
from . import drives, elevation
priv = elevation.privileged()
if priv is not None and priv.get("drives") is not None:
return drives.to_findings(drives.from_dicts(priv["drives"]))
return drives.to_findings(drives.collect())
def check_live_temps() -> list[Finding]: def check_live_temps() -> list[Finding]:
@@ -326,24 +370,19 @@ def check_memory_speed() -> list[Finding]:
def run_health_checks(include_journal: bool = True) -> list[Finding]: def run_health_checks(include_journal: bool = True) -> list[Finding]:
"""Run all checks and return findings sorted by severity (worst first). """Run all checks and return findings sorted by severity (worst first).
SMART needs root; if the session collected it via launch elevation, use that Drive SMART and RAM speed need root; if the session collected them via launch elevation,
instead of re-running smartctl (which would just report "needs root"). those checks use the cached data instead of re-running (which would just report "needs root").
`include_journal=False` skips the 7-day kernel-journal scan — used by the crash `include_journal=False` skips the 7-day kernel-journal scan — used by the crash
analysis, which scans the previous (crashed) boot specifically instead. analysis, which scans the previous (crashed) boot specifically instead.
""" """
from . import elevation
findings: list[Finding] = [] findings: list[Finding] = []
findings += check_nvidia_driver() findings += check_nvidia_driver()
findings += check_nvidia_module()
if include_journal: if include_journal:
findings += check_journal() findings += check_journal()
findings += check_journal_persistence() findings += check_journal_persistence()
priv = elevation.privileged() findings += check_drives()
if priv is not None and priv.get("smart") is not None:
findings += [Finding(**d) for d in priv["smart"]]
else:
findings += check_smart()
findings += check_live_temps() findings += check_live_temps()
findings += check_pcie_links() findings += check_pcie_links()
findings += check_displays() findings += check_displays()
+322
View File
@@ -0,0 +1,322 @@
"""GPU stress + close thermal monitoring — the repro tool for load-correlated crashes.
Run a GPU load and sample sensors at a high rate, then report peak/sustained temperatures,
how long the GPU spent above each temperature threshold, power headroom vs the limit, whether
it throttled, and any GPU fault (Xid / VA-space / a query timeout) that hit during the window.
This is the on-demand way to reproduce the "only under load / only certain games" freezes
instead of waiting for a game to trigger them.
The load comes from, in order: an explicit ``command`` (your game, or a loader like gpu-burn),
an auto-detected loader on PATH (gpu-burn / vkmark / glmark2 / vkcube), or **monitor-only** when
none is found — then you generate the load yourself (launch the game) while this closely tracks
temps for the duration.
Stdlib only. Degrades gracefully: no nvidia-smi → no GPU stats; a loader that won't start →
monitor-only with a note; missing journal access → no fault scan, just the telemetry.
"""
from __future__ import annotations
import shutil
import subprocess
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from . import health
from .sample import Sample
from .sampler import Sampler
from .sources import available_sources
# Default temperature dwell thresholds (°C). 83 is Ampere's typical thermal-throttle point;
# 90+ is hot; sustained 95+ on the core (or 100+ on GDDR6 memory) is a cooling problem.
DEFAULT_THRESHOLDS = (80, 85, 90, 95)
# Known GPU load generators, best (heaviest / most deterministic) first. argv builder takes the
# remaining duration so a self-terminating loader (gpu-burn) bounds itself; the windowed
# benchmarks loop until we kill them. None are required — detection is best-effort.
_LOADERS: list[tuple[str, Callable[[float], list[str]]]] = [
("gpu-burn", lambda secs: ["gpu-burn", str(max(1, int(secs)))]),
("vkmark", lambda _s: ["vkmark", "--run-forever"]),
("glmark2", lambda _s: ["glmark2", "--run-forever"]),
("vkcube", lambda _s: ["vkcube"]),
]
# NVML clocks-event bits that mean the clocks are being *held back* (a throttle), decoded from
# the active-reasons bitmask so we don't depend on per-field name differences across drivers.
_THROTTLE_BITS = {
0x008: "HW slowdown",
0x020: "SW thermal slowdown",
0x040: "HW thermal slowdown",
0x080: "HW power-brake slowdown",
}
_POWERCAP_BIT = 0x004 # hitting the power limit — expected under load, reported separately
@dataclass
class MetricStat:
key: str # e.g. "gpu.temp", "gpu.power", "gpu.clock.core"
label: str # human label for the report
unit: str
min: float
avg: float
max: float
samples: int
@dataclass
class _Tick:
dt: float # seconds this tick represents (for dwell-time weighting)
values: dict[str, float] # reading key -> value across all sources (Nones dropped)
throttle: list[str] # active throttle reasons this tick
power_capped: bool
lost: bool # query timeout / no GPU response this tick
@dataclass
class StressResult:
load: str # "command: …" | "auto: gpu-burn" | "monitor-only"
duration: float # seconds actually monitored
samples: int
interval: float
stats: list[MetricStat] = field(default_factory=list)
peak_temp: float | None = None
peak_mem_temp: float | None = None
avg_temp: float | None = None
time_above: dict[int, float] = field(default_factory=dict) # threshold °C -> seconds at/above
max_power: float | None = None
power_limit: float | None = None
power_capped: bool = False
throttled: bool = False
throttle_reasons: list[str] = field(default_factory=list)
gpu_lost: bool = False
faults: list[str] = field(default_factory=list) # Xid/VA-space titles in the window
aborted: bool = False # Ctrl-C or the load exited early
severity: str = health.OK
verdict: str = ""
# --- load resolution ------------------------------------------------------------------
def available_loaders() -> list[str]:
"""Known GPU load tools found on PATH (heaviest first)."""
return [name for name, _ in _LOADERS if shutil.which(name)]
def _start_load(command: list[str] | None, duration: float) -> tuple[subprocess.Popen | None, str]:
"""Start the load process and return (proc, description). proc is None for monitor-only."""
if command:
try:
proc = subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return proc, "command: " + " ".join(command)
except (OSError, ValueError) as exc:
return None, f"monitor-only (command failed to start: {exc})"
for name, build in _LOADERS:
if shutil.which(name):
try:
proc = subprocess.Popen(build(duration), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return proc, f"auto: {name}"
except (OSError, ValueError):
continue
return None, "monitor-only"
def _stop_load(proc: subprocess.Popen | None) -> None:
if proc is None or proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
# --- throttle / fault probes ----------------------------------------------------------
def _throttle_state() -> tuple[list[str], bool]:
"""(active throttle reasons, power-capped) decoded from the clocks-event bitmask."""
if shutil.which("nvidia-smi") is None:
return [], False
raw = ""
for field_name in ("clocks_event_reasons.active", "clocks_throttle_reasons.active"):
try:
proc = subprocess.run(
["nvidia-smi", f"--query-gpu={field_name}", "--format=csv,noheader"],
capture_output=True, text=True, timeout=5,
)
except (subprocess.SubprocessError, OSError):
continue
raw = proc.stdout.strip().splitlines()[0].strip() if proc.stdout.strip() else ""
if raw and raw.lower() not in ("n/a", "not supported", "[n/a]"):
break
try:
bits = int(raw, 16)
except ValueError:
return [], False
reasons = [name for bit, name in _THROTTLE_BITS.items() if bits & bit]
return reasons, bool(bits & _POWERCAP_BIT)
def _faults_since(start_ts: float) -> list[str]:
"""Titles of GPU/PCIe/hardware faults logged to the kernel journal since the run began."""
out = health._journalctl(["-k", "--no-pager", "-o", "cat", "--since", f"@{int(start_ts)}"])
if not out:
return []
return [f.title for f in health.scan_journal_text(out)
if f.category in ("GPU", "PCIe", "Hardware", "Kernel")]
def _tick_values(sample: Sample) -> tuple[dict[str, float], bool]:
"""Reading key -> value across all sources (Nones dropped), plus whether the GPU
failed to respond (an nvidia-smi query timeout — a hang/lost signal)."""
values: dict[str, float] = {}
lost = False
for r in sample.readings:
if r.source == "gpu" and r.metric == "status" and r.label == "query-timeout":
lost = True
if r.value is not None:
values[r.key] = r.value
return values, lost
# --- pure analysis (unit-testable, no IO) ---------------------------------------------
_REPORT_KEYS = {
"gpu.temp": ("GPU core temp", "°C"),
"gpu.temp.memory": ("GPU memory temp", "°C"),
"gpu.power": ("GPU power", "W"),
"gpu.util": ("GPU utilization", "%"),
"gpu.mem_util": ("VRAM controller util", "%"),
"gpu.clock.core": ("Core clock", "MHz"),
"gpu.clock.memory": ("Memory clock", "MHz"),
"gpu.fan": ("Fan", "%"),
"gpu.mem_used": ("VRAM used", "MiB"),
"cpu.temp": ("CPU temp", "°C"),
}
def summarize(ticks: list[_Tick], *, load: str, interval: float, faults: list[str],
thresholds=DEFAULT_THRESHOLDS) -> StressResult:
"""Build a StressResult from collected ticks — pure, so it's tested with synthetic input."""
duration = sum(t.dt for t in ticks)
result = StressResult(load=load, duration=round(duration, 1), samples=len(ticks),
interval=interval, faults=faults)
series: dict[str, list[float]] = {}
throttle_seen: set[str] = set()
time_above = {th: 0.0 for th in thresholds}
for t in ticks:
for key, value in t.values.items():
series.setdefault(key, []).append(value)
throttle_seen.update(t.throttle)
if t.power_capped:
result.power_capped = True
if t.lost:
result.gpu_lost = True
core = t.values.get("gpu.temp")
if core is not None:
for th in thresholds:
if core >= th:
time_above[th] += t.dt
for key, (label, unit) in _REPORT_KEYS.items():
vals = series.get(key)
if not vals:
continue
stat = MetricStat(key, label, unit, round(min(vals), 1),
round(sum(vals) / len(vals), 1), round(max(vals), 1), len(vals))
result.stats.append(stat)
if key == "gpu.temp":
result.peak_temp, result.avg_temp = stat.max, stat.avg
elif key == "gpu.temp.memory":
result.peak_mem_temp = stat.max
elif key == "gpu.power":
result.max_power = stat.max
# power_limit isn't a reported metric (it's ~constant); pull it from the raw series.
if "gpu.power_limit" in series:
result.power_limit = max(series["gpu.power_limit"])
result.throttle_reasons = sorted(throttle_seen)
result.throttled = bool(throttle_seen)
result.time_above = {th: round(secs, 1) for th, secs in time_above.items() if secs > 0}
_verdict(result)
return result
def _verdict(r: StressResult) -> None:
"""Set severity + a plain-language conclusion from the gathered signals."""
peak = f"{r.peak_temp:.0f}°C" if r.peak_temp is not None else "?"
if r.gpu_lost or any(t for t in r.faults):
r.severity = health.CRITICAL
cause = "; ".join(r.faults) if r.faults else "the GPU stopped responding (query timeout)"
r.verdict = (f"GPU fault during the stress run: {cause}. This reproduces the crash under "
f"load — capture/keep these logs. Peak core temp {peak}.")
return
if r.throttled:
r.severity = health.WARNING
r.verdict = (f"Thermal/HW throttling detected ({', '.join(r.throttle_reasons)}) — the GPU "
f"held clocks back to stay safe. Peak core temp {peak}. Improve cooling/airflow.")
return
if r.peak_temp is not None and r.peak_temp >= 90:
r.severity = health.WARNING
r.verdict = (f"No fault, but the core peaked at {peak} — hot. Watch GDDR6/VRM cooling; "
"sustained high temps shorten the card's life and precede instability.")
return
if r.peak_temp is None:
r.severity = health.INFO
r.verdict = "No GPU telemetry was captured (nvidia-smi unavailable?)."
return
capped = " (power-limited — hitting the cap, which is normal)" if r.power_capped else ""
r.verdict = f"Stable: peaked at {peak} with no faults or throttling{capped}."
# --- the run loop (IO) ----------------------------------------------------------------
def run(duration: float = 120.0, interval: float = 0.5, command: list[str] | None = None,
thresholds=DEFAULT_THRESHOLDS, on_tick: Callable[[Sample, float], None] | None = None,
should_stop: Callable[[], bool] | None = None) -> StressResult:
"""Drive a GPU load for ``duration`` seconds, sampling every ``interval``, and report.
Stops early on Ctrl-C, if a GPU query times out (likely hang), if the load process exits, or
when ``should_stop()`` returns True (the GUI's Stop button). ``on_tick(sample, elapsed)`` is
called each tick for live display.
"""
sampler = Sampler(available_sources())
proc, load_desc = _start_load(command, duration)
start = time.monotonic()
start_ts = time.time()
ticks: list[_Tick] = []
last = start
aborted = False
try:
while True:
sample = sampler.sample()
now = time.monotonic()
dt = now - last
last = now
values, lost = _tick_values(sample)
reasons, capped = _throttle_state()
ticks.append(_Tick(dt=dt, values=values, throttle=reasons, power_capped=capped, lost=lost))
if on_tick is not None:
on_tick(sample, now - start)
if lost: # GPU stopped responding — stop now, it may be hung/lost
break
if should_stop is not None and should_stop(): # GUI Stop button
aborted = True
break
if proc is not None and proc.poll() is not None: # the load finished/exited
break
if (now - start) >= duration:
break
time.sleep(max(0.0, interval - (time.monotonic() - now)))
except KeyboardInterrupt:
aborted = True
finally:
_stop_load(proc)
faults = _faults_since(start_ts)
result = summarize(ticks, load=load_desc, interval=interval, faults=faults, thresholds=thresholds)
result.aborted = aborted or (proc is not None and command is not None and result.duration < duration - interval)
return result
+7 -3
View File
@@ -40,16 +40,20 @@ def launch_option() -> str:
return f"{quoted} wrap %command%" return f"{quoted} wrap %command%"
def run(command: list[str]) -> int: def run(command: list[str], game: str | None = None) -> int:
"""Start a focused capture (unless one's already running), run the game, then stop it. """Start a focused capture (unless one's already running), run the game, then stop it.
Returns the game's exit code so Steam sees the right status.""" Returns the game's exit code so Steam sees the right status.
`game` overrides name detection — used by `games play` for a custom game (e.g. SPT), where
there's no SteamAppId and the bare script name (tarkov.sh) wouldn't tag the capture usefully.
"""
from . import diagnostic, reccontrol from . import diagnostic, reccontrol
if not command: if not command:
print("usage: rigdoctor wrap %command% (set as a Steam launch option)", file=sys.stderr) print("usage: rigdoctor wrap %command% (set as a Steam launch option)", file=sys.stderr)
return 2 return 2
game = game_name_from_env() or os.path.basename(command[0]) game = game or game_name_from_env() or os.path.basename(command[0])
started = False started = False
if not reccontrol.running_pid(): # don't disturb an existing capture if not reccontrol.running_pid(): # don't disturb an existing capture
started = diagnostic.start(game=game) is not None started = diagnostic.start(game=game) is not None
+1 -1
View File
@@ -143,7 +143,7 @@ class DiagnosticDialog(QDialog):
lines.append("\nCapture summary:\n" + render_summary(summary)) lines.append("\nCapture summary:\n" + render_summary(summary))
since = (summary.start - 60) if summary.start else None since = (summary.start - 60) if summary.start else None
logs = gamelogs.collect(since=since) # scoped to this session logs = gamelogs.collect(since=since, game=result.game) # scoped to this session
if logs: if logs:
lines.append("\nGame/Proton/Steam logs for this session:\n" + logs) lines.append("\nGame/Proton/Steam logs for this session:\n" + logs)
sys_logs = syslogs.collect(since=since) # kernel log + crashed-process records sys_logs = syslogs.collect(since=since) # kernel log + crashed-process records
+84 -1
View File
@@ -115,6 +115,10 @@ class GamesPage(QWidget):
self._autocap_btn = QPushButton("Auto-capture…") self._autocap_btn = QPushButton("Auto-capture…")
self._autocap_btn.clicked.connect(self._show_autocapture) self._autocap_btn.clicked.connect(self._show_autocapture)
header.addWidget(self._autocap_btn) header.addWidget(self._autocap_btn)
# Add a game no launcher reports (e.g. SPT / standalone mod launchers).
self._add_btn = QPushButton("Add game…")
self._add_btn.clicked.connect(self._add_custom_game)
header.addWidget(self._add_btn)
self._rescan_btn = QPushButton("Rescan") self._rescan_btn = QPushButton("Rescan")
self._rescan_btn.setObjectName("PrimaryButton") self._rescan_btn.setObjectName("PrimaryButton")
self._rescan_btn.clicked.connect(self.refresh) self._rescan_btn.clicked.connect(self.refresh)
@@ -235,7 +239,9 @@ class GamesPage(QWidget):
] ]
self._libraries_ready.emit(libs) self._libraries_ready.emit(libs)
try: try:
self._extra_games = launchers.scan() # Lutris / Heroic (non-Steam) from ..core import customgames
# non-Steam: Lutris/Heroic + user-added games (SPT etc.)
self._extra_games = list(launchers.scan()) + customgames.scan()
except Exception: except Exception:
self._extra_games = [] self._extra_games = []
self._scanned.emit(steam.rescan()) self._scanned.emit(steam.rescan())
@@ -423,6 +429,83 @@ class GamesPage(QWidget):
reccontrol.stop_background() reccontrol.stop_background()
self._banner.hide() self._banner.hide()
def _add_custom_game(self) -> None:
"""Manually add a game no launcher reports (e.g. SPT): name + an optional launch
command/script (so it can be launched under crash-capture) and log folder."""
from ..core import customgames
dlg = QDialog(self)
dlg.setWindowTitle("Add game")
dlg.setMinimumWidth(560)
v = QVBoxLayout(dlg)
v.setContentsMargins(20, 18, 20, 16)
v.setSpacing(10)
intro = QLabel(
"Add a game no launcher reports — a standalone mod launcher like SPT, an itch.io "
"download, or any hand-installed game.")
intro.setWordWrap(True)
v.addWidget(intro)
name_edit = QLineEdit()
name_edit.setPlaceholderText("SPT")
v.addWidget(QLabel("Game name"))
v.addWidget(name_edit)
cmd_edit = QLineEdit()
cmd_edit.setPlaceholderText("e.g. /run/media/.../Escape-From-Tarkov/tarkov.sh")
cmd_row = QHBoxLayout()
cmd_row.addWidget(cmd_edit, 1)
cmd_browse = QPushButton("Browse…")
cmd_row.addWidget(cmd_browse, 0)
v.addWidget(QLabel("Launch command / script (optional — enables launch + auto-capture)"))
v.addLayout(cmd_row)
log_edit = QLineEdit()
log_edit.setPlaceholderText("auto-detected from the script's folder (its logs/ subfolder)")
log_row = QHBoxLayout()
log_row.addWidget(log_edit, 1)
log_browse = QPushButton("Browse…")
log_row.addWidget(log_browse, 0)
v.addWidget(QLabel("Log folder (optional — read into crash diagnostics)"))
v.addLayout(log_row)
def _pick_command() -> None:
path, _ = QFileDialog.getOpenFileName(dlg, "Select the launch script/executable")
if path:
cmd_edit.setText(path)
def _pick_logdir() -> None:
path = QFileDialog.getExistingDirectory(dlg, "Select the game's log folder")
if path:
log_edit.setText(path)
cmd_browse.clicked.connect(_pick_command)
log_browse.clicked.connect(_pick_logdir)
buttons = QHBoxLayout()
buttons.addStretch(1)
cancel = QPushButton("Cancel")
cancel.clicked.connect(dlg.reject)
buttons.addWidget(cancel)
add = QPushButton("Add")
add.setObjectName("PrimaryButton")
add.setDefault(True)
add.clicked.connect(dlg.accept)
buttons.addWidget(add)
v.addLayout(buttons)
if dlg.exec() != QDialog.DialogCode.Accepted:
return
name = name_edit.text().strip()
if not name:
return
if customgames.add(name, command=cmd_edit.text().strip() or None,
logdir=log_edit.text().strip() or None):
self.refresh()
else:
QMessageBox.information(self, "Add game", f"'{name}' is already in your games.")
def _show_autocapture(self) -> None: def _show_autocapture(self) -> None:
from ..core import wrap from ..core import wrap
+8
View File
@@ -39,6 +39,9 @@ class HealthPage(QWidget):
self._status = QLabel("") self._status = QLabel("")
self._status.setObjectName("Muted") self._status.setObjectName("Muted")
header.addWidget(self._status) header.addWidget(self._status)
self._stress_btn = QPushButton("Stress test…")
self._stress_btn.clicked.connect(self._open_stress)
header.addWidget(self._stress_btn)
self._run_btn = QPushButton("Run health report") self._run_btn = QPushButton("Run health report")
self._run_btn.setObjectName("PrimaryButton") self._run_btn.setObjectName("PrimaryButton")
self._run_btn.clicked.connect(self._run) self._run_btn.clicked.connect(self._run)
@@ -59,6 +62,11 @@ class HealthPage(QWidget):
QTimer.singleShot(300, self._run) # auto-run shortly after the window opens QTimer.singleShot(300, self._run) # auto-run shortly after the window opens
def _open_stress(self) -> None:
from .stress_dialog import StressDialog
StressDialog(self).exec()
def _run(self) -> None: def _run(self) -> None:
self._run_btn.setEnabled(False) self._run_btn.setEnabled(False)
self._status.setText("Scanning logs, SMART, and driver…") self._status.setText("Scanning logs, SMART, and driver…")
+157
View File
@@ -0,0 +1,157 @@
"""GPU stress + thermal-monitor dialog (GUI front-end for core/stress.py).
Runs the stress monitor in a background thread, streams a live one-line readout, and shows the
rendered result (telemetry stats + verdict) when it finishes. A Stop button ends the run early
via a cooperative flag; closing the dialog mid-run stops it too.
"""
from __future__ import annotations
import threading
from PySide6.QtCore import Qt, Signal
from PySide6.QtGui import QFont
from PySide6.QtWidgets import (
QDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
QTextEdit,
QVBoxLayout,
)
class StressDialog(QDialog):
_tick = Signal(str) # live one-line readout (worker thread -> GUI)
_done = Signal(object) # stress.StressResult when the run finishes
def __init__(self, parent=None) -> None:
super().__init__(parent)
self._stop = threading.Event()
self._running = False
self._tick.connect(self._on_tick)
self._done.connect(self._on_done)
self.setWindowTitle("GPU stress + thermal monitor")
self.resize(640, 460)
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 16)
root.setSpacing(12)
intro = QLabel(
"Run a GPU load and closely watch temps. Reports peak/sustained temps, time spent "
"hot, throttling, and any GPU fault (Xid / driver freeze) during the run.")
intro.setWordWrap(True)
root.addWidget(intro)
from ..core import stress
loaders = stress.available_loaders()
self._mode = QLabel(
f"Load tool detected: {loaders[0]} — it'll drive the load." if loaders else
"No GPU load tool installed → MONITOR-ONLY: start this, then launch your game; "
"it tracks temps while you play. (Or give a command below.)")
self._mode.setObjectName("Muted")
self._mode.setWordWrap(True)
root.addWidget(self._mode)
form = QHBoxLayout()
form.addWidget(QLabel("Duration (s):"))
self._duration = QSpinBox()
self._duration.setRange(5, 3600)
self._duration.setValue(120)
form.addWidget(self._duration)
form.addSpacing(12)
form.addWidget(QLabel("Command (optional):"))
self._command = QLineEdit()
self._command.setPlaceholderText("e.g. /…/tarkov.sh or gpu-burn 60")
form.addWidget(self._command, 1)
root.addLayout(form)
self._live = QLabel("")
self._live.setFont(QFont("monospace"))
self._live.setStyleSheet("background: #0d0f13; color: #cfd3da; border: 1px solid #2a2f39; "
"border-radius: 8px; padding: 8px;")
root.addWidget(self._live)
self._report = QTextEdit()
self._report.setReadOnly(True)
self._report.setFont(QFont("monospace"))
self._report.setVisible(False)
root.addWidget(self._report, 1)
buttons = QHBoxLayout()
buttons.addStretch(1)
self._stop_btn = QPushButton("Stop")
self._stop_btn.setEnabled(False)
self._stop_btn.clicked.connect(self._on_stop)
buttons.addWidget(self._stop_btn)
self._start_btn = QPushButton("Start")
self._start_btn.setObjectName("PrimaryButton")
self._start_btn.clicked.connect(self._on_start)
buttons.addWidget(self._start_btn)
root.addLayout(buttons)
def _on_start(self) -> None:
if self._running:
return
self._running = True
self._stop.clear()
self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True)
self._report.setVisible(False)
self._live.setText("starting…")
duration = float(self._duration.value())
command_text = self._command.text().strip()
threading.Thread(target=self._work, args=(duration, command_text), daemon=True).start()
def _work(self, duration: float, command_text: str) -> None:
import shlex
from ..core import stress
command = shlex.split(command_text) if command_text else None
def _tick(sample, elapsed) -> None:
by = {r.key: r for r in sample.readings}
from ..render import format_raw
bits = [f"{elapsed:5.0f}s"]
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
("gpu.util", "util"), ("gpu.clock.core", "clk"),
("gpu.temp.memory", "vram")):
r = by.get(key)
if r is not None and r.value is not None:
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
self._tick.emit(" ".join(bits))
try:
result = stress.run(duration=duration, interval=0.5, command=command,
on_tick=_tick, should_stop=self._stop.is_set)
except Exception as exc: # never let a worker crash take down the dialog
result = exc
self._done.emit(result)
def _on_tick(self, text: str) -> None:
self._live.setText(text)
def _on_done(self, result) -> None:
from ..render import render_stress
self._running = False
self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False)
if isinstance(result, Exception):
self._report.setPlainText(f"Stress run failed: {result}")
else:
self._report.setPlainText(render_stress(result))
self._report.setVisible(True)
def _on_stop(self) -> None:
self._stop.set()
self._stop_btn.setEnabled(False)
self._live.setText("stopping…")
def closeEvent(self, event) -> None: # stop the run if the dialog is closed mid-flight
self._stop.set()
super().closeEvent(event)
+26
View File
@@ -118,6 +118,32 @@ def render_health(findings: list, title: str = "Health report") -> str:
return "\n".join(lines).rstrip() return "\n".join(lines).rstrip()
def render_stress(result) -> str:
"""Render a stress.StressResult: telemetry stats, temp dwell time, and the verdict."""
lines = ["GPU stress + thermal monitor", ""]
lines.append(f" Load : {result.load}")
lines.append(f" Duration : {_fmt_duration(result.duration)} · {result.samples} samples "
f"@ {result.interval:g}s" + (" (stopped early)" if result.aborted else ""))
if result.stats:
lines += ["", f" {'Metric':<22}{'min':>12}{'avg':>12}{'max':>12}"]
for s in result.stats:
u = s.unit
lines.append(f" {s.label:<22}{format_raw(s.min, u):>12}{format_raw(s.avg, u):>12}"
f"{format_raw(s.max, u):>12}")
if result.time_above:
spans = " ".join(f"{th}°C: {_fmt_duration(secs)}" for th, secs in sorted(result.time_above.items()))
lines += ["", f" Time at temp (core): {spans}"]
if result.max_power is not None and result.power_limit:
cap = " — hit the power cap" if result.power_capped else ""
lines.append(f" Power peak: {result.max_power:.0f} W of {result.power_limit:.0f} W limit{cap}")
if result.throttle_reasons:
lines.append(f" Throttling: {', '.join(result.throttle_reasons)}")
if result.faults:
lines.append(f" Faults : {'; '.join(result.faults)}")
lines += ["", f"[{_SEV_LABEL.get(result.severity, '?')}] {result.verdict}"]
return "\n".join(lines)
def render_summary(summary: Summary, log_path=None) -> str: def render_summary(summary: Summary, log_path=None) -> str:
if summary.samples == 0 and not summary.events: if summary.samples == 0 and not summary.events:
where = f" ({log_path})" if log_path else "" where = f" ({log_path})" if log_path else ""
+85
View File
@@ -0,0 +1,85 @@
"""Tests for user-added games (M6): add/remove/scan of titles no launcher reports (e.g. SPT)."""
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from rigdoctor.core import customgames
class CustomGamesTests(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory()
self._file = Path(self._tmp.name) / "custom-games.json"
self._patch = mock.patch.object(customgames.config, "CUSTOM_GAMES_FILE", self._file)
self._patch.start()
def tearDown(self):
self._patch.stop()
self._tmp.cleanup()
def test_missing_file_scans_empty(self):
self.assertEqual(customgames.scan(), [])
self.assertEqual(customgames.names(), [])
def test_add_then_scan_returns_game(self):
self.assertTrue(customgames.add("SPT"))
games = customgames.scan()
self.assertEqual(len(games), 1)
self.assertEqual(games[0].name, "SPT")
self.assertEqual(games[0].launcher, "custom")
self.assertTrue(self._file.exists()) # persisted
def test_add_is_idempotent_case_insensitive(self):
self.assertTrue(customgames.add("SPT"))
self.assertFalse(customgames.add("spt")) # already present
self.assertFalse(customgames.add(" ")) # blank
self.assertEqual(customgames.names(), ["SPT"])
def test_remove(self):
customgames.add("SPT")
customgames.add("Minecraft")
self.assertTrue(customgames.remove("spt")) # case-insensitive
self.assertEqual(customgames.names(), ["Minecraft"])
self.assertFalse(customgames.remove("nope"))
def test_scan_sorted_by_name(self):
for n in ("Zomboid", "Apex", "SPT"):
customgames.add(n)
self.assertEqual([g.name for g in customgames.scan()], ["Apex", "SPT", "Zomboid"])
def test_command_and_logdir_stored_and_resolved(self):
logs = Path(self._tmp.name) / "logs"
logs.mkdir()
sh = Path(self._tmp.name) / "tarkov.sh"
sh.write_text("#!/bin/sh\n")
self.assertTrue(customgames.add("SPT", command=str(sh), logdir=str(logs)))
self.assertEqual(customgames.command("SPT"), [str(sh)])
self.assertEqual(customgames.log_dir("SPT"), str(logs))
def test_logdir_inferred_from_sibling_logs(self):
# A command with a sibling logs/ dir (SPT's layout) → logdir auto-detected.
sh = Path(self._tmp.name) / "tarkov.sh"
sh.write_text("#!/bin/sh\n")
(Path(self._tmp.name) / "logs").mkdir()
self.assertTrue(customgames.add("SPT", command=str(sh)))
self.assertEqual(customgames.log_dir("SPT"), str(Path(self._tmp.name) / "logs"))
def test_no_command_resolves_to_none(self):
customgames.add("SPT")
self.assertIsNone(customgames.command("SPT"))
self.assertIsNone(customgames.command("missing"))
self.assertIsNone(customgames.log_dir("SPT"))
def test_corrupt_file_degrades_to_empty(self):
self._file.parent.mkdir(parents=True, exist_ok=True)
self._file.write_text("{not json")
self.assertEqual(customgames.scan(), [])
# and a subsequent add still works (overwrites the garbage)
self.assertTrue(customgames.add("SPT"))
self.assertEqual(customgames.names(), ["SPT"])
if __name__ == "__main__":
unittest.main()
+99
View File
@@ -0,0 +1,99 @@
"""Tests for drive health parsing & findings (synthetic smartctl JSON)."""
import unittest
from dataclasses import asdict
from rigdoctor.core import drives
from rigdoctor.core.health import CRITICAL, INFO, OK, WARNING
_NVME_OK = {
"model_name": "Samsung SSD 980 PRO 1TB",
"device": {"protocol": "NVMe"},
"smart_status": {"passed": True},
"temperature": {"current": 41},
"power_on_time": {"hours": 1234},
"nvme_smart_health_information_log": {
"percentage_used": 3, "available_spare": 100, "available_spare_threshold": 10,
"media_errors": 0, "data_units_written": 200_000_000, # ~102 TB
},
}
_NVME_WORN = {
"model_name": "Worn NVMe",
"device": {"protocol": "NVMe"},
"smart_status": {"passed": True},
"nvme_smart_health_information_log": {"percentage_used": 96, "available_spare": 100,
"available_spare_threshold": 10},
}
_SATA_FAILING = {
"model_name": "Samsung SSD 870 QVO 1TB",
"device": {"protocol": "ATA"},
"smart_status": {"passed": False},
"temperature": {"current": 35},
"power_on_time": {"hours": 5000},
"ata_smart_attributes": {"table": [
{"id": 5, "name": "Reallocated_Sector_Ct", "value": 80, "raw": {"value": 12}},
{"id": 177, "name": "Wear_Leveling_Count", "value": 88, "raw": {"value": 300}},
{"id": 241, "name": "Total_LBAs_Written", "value": 99, "raw": {"value": 2_000_000_000}},
]},
}
class ParseTests(unittest.TestCase):
def test_nvme_parse(self):
d = drives.parse("/dev/nvme0", _NVME_OK)
self.assertEqual(d.kind, "nvme")
self.assertTrue(d.passed)
self.assertEqual(d.percent_used, 3)
self.assertEqual(d.health_pct, 97) # 100 - percentage_used
self.assertEqual(d.power_on_hours, 1234)
self.assertEqual(d.temp_c, 41)
self.assertAlmostEqual(d.data_written_tb, 102.4, places=1)
def test_sata_parse(self):
d = drives.parse("/dev/sda", _SATA_FAILING)
self.assertEqual(d.kind, "sata")
self.assertFalse(d.passed)
self.assertEqual(d.reallocated, 12) # raw value
self.assertEqual(d.health_pct, 88) # normalized wear-leveling value
self.assertAlmostEqual(d.data_written_tb, 1.02, places=1)
def test_needs_root_when_no_data(self):
d = drives.parse("/dev/sda", None)
self.assertTrue(d.needs_root)
def test_roundtrip_through_dicts(self):
d = drives.parse("/dev/nvme0", _NVME_OK)
back = drives.from_dicts([asdict(d)])
self.assertEqual(len(back), 1)
self.assertEqual(back[0].model, d.model)
self.assertEqual(back[0].health_pct, d.health_pct)
class FindingTests(unittest.TestCase):
def test_healthy_nvme_is_ok_with_stats(self):
f = drives.to_findings([drives.parse("/dev/nvme0", _NVME_OK)])[0]
self.assertEqual(f.severity, OK)
self.assertIn("97% life left", f.title)
self.assertIn("1,234 h", f.title)
def test_failing_sata_is_critical(self):
f = drives.to_findings([drives.parse("/dev/sda", _SATA_FAILING)])[0]
self.assertEqual(f.severity, CRITICAL)
self.assertIn("FAILED", f.detail)
self.assertIn("reallocated sectors", f.detail)
def test_worn_nvme_is_warning(self):
f = drives.to_findings([drives.parse("/dev/nvme1", _NVME_WORN)])[0]
self.assertEqual(f.severity, WARNING)
self.assertIn("worn", f.title)
def test_needs_root_is_info(self):
f = drives.to_findings([drives.parse("/dev/sda", None)])[0]
self.assertEqual(f.severity, INFO)
self.assertIn("needs root", f.title)
if __name__ == "__main__":
unittest.main()
+30
View File
@@ -47,6 +47,36 @@ class CollectTests(unittest.TestCase):
self.assertEqual(gamelogs.collect(), "") self.assertEqual(gamelogs.collect(), "")
class CustomGameLogTests(unittest.TestCase):
def test_collect_includes_custom_game_logs(self):
tmp = Path(tempfile.mkdtemp())
(tmp / "tarkov-latest.log").write_text(">>> Tarkov gone. clean exit")
(tmp / "server-latest.log").write_text("SPT server error: mod failed to load")
with mock.patch.object(gamelogs, "_proton_logs", return_value=[]), \
mock.patch.object(gamelogs, "_steam_console", return_value=None), \
mock.patch("rigdoctor.core.customgames.log_dir", return_value=str(tmp)):
out = gamelogs.collect(game="SPT")
self.assertIn("SPT log", out)
self.assertIn("server-latest.log", out)
self.assertIn("mod failed to load", out)
def test_custom_logs_skipped_when_stale(self):
tmp = Path(tempfile.mkdtemp())
old = tmp / "tarkov-latest.log"
old.write_text("an earlier session")
old_mtime = time.time() - 3600
os.utime(old, (old_mtime, old_mtime))
with mock.patch.object(gamelogs, "_proton_logs", return_value=[]), \
mock.patch.object(gamelogs, "_steam_console", return_value=None), \
mock.patch("rigdoctor.core.customgames.log_dir", return_value=str(tmp)):
self.assertEqual(gamelogs.collect(since=time.time() - 60, game="SPT"), "")
def test_no_game_means_no_custom_logs(self):
with mock.patch.object(gamelogs, "_proton_logs", return_value=[]), \
mock.patch.object(gamelogs, "_steam_console", return_value=None):
self.assertEqual(gamelogs.collect(), "") # game=None → custom lookup skipped
class SinceScopingTests(unittest.TestCase): class SinceScopingTests(unittest.TestCase):
def test_since_filter_keeps_window_only(self): def test_since_filter_keeps_window_only(self):
text = ( text = (
+30
View File
@@ -11,11 +11,19 @@ from rigdoctor.core.health import (
WARNING, WARNING,
check_displays, check_displays,
check_memory_speed, check_memory_speed,
check_nvidia_module,
check_pcie_links, check_pcie_links,
run_health_checks, run_health_checks,
scan_journal_text, scan_journal_text,
) )
# A real no-Xid freeze: the open-module VA-space storm captured on 2026-05-29.
_VASPACE_LOG = """\
NVRM: nvCheckFailedNoLog: Check failed: 0 == (pMapNode->gpuMask & gpuMask) @ gpu_vaspace.c:4547
NVRM: dmaAllocMapping_GM107: can't update VA space for mapping @vaddr=0x4be00000
[drm:nv_drm_gem_alloc_nvkms_memory_ioctl [nvidia_drm]] *ERROR* Failed to allocate NVKMS memory for GEM object
"""
class HealthScanTests(unittest.TestCase): class HealthScanTests(unittest.TestCase):
def test_xid_79_is_critical(self): def test_xid_79_is_critical(self):
@@ -44,6 +52,28 @@ class HealthScanTests(unittest.TestCase):
def test_clean_text_yields_no_findings(self): def test_clean_text_yields_no_findings(self):
self.assertEqual(scan_journal_text("usb 1-1: new high-speed USB device\nbluetooth: ok"), []) self.assertEqual(scan_journal_text("usb 1-1: new high-speed USB device\nbluetooth: ok"), [])
def test_vaspace_freeze_detected_without_any_xid(self):
findings = scan_journal_text(_VASPACE_LOG)
gpu = [f for f in findings if f.category == "GPU"]
self.assertEqual(len(gpu), 1)
self.assertEqual(gpu[0].severity, WARNING)
self.assertIn("VA-space", gpu[0].title)
# It must NOT be misreported as an Xid finding (the log has no Xid at all).
self.assertNotIn("Xid", gpu[0].title)
self.assertIn("open kernel module", gpu[0].detail.lower())
def test_open_module_finding_when_open_loaded(self):
with mock.patch("rigdoctor.core.health._nvidia_module_is_open", return_value=True):
findings = check_nvidia_module()
self.assertEqual(len(findings), 1)
self.assertEqual(findings[0].severity, INFO)
self.assertEqual(findings[0].category, "Driver")
def test_no_module_finding_when_proprietary_or_absent(self):
for state in (False, None):
with mock.patch("rigdoctor.core.health._nvidia_module_is_open", return_value=state):
self.assertEqual(check_nvidia_module(), [])
def test_run_health_checks_returns_findings(self): def test_run_health_checks_returns_findings(self):
# Runs against the real system; just assert it returns a sorted list of Findings. # Runs against the real system; just assert it returns a sorted list of Findings.
findings = run_health_checks() findings = run_health_checks()
+77
View File
@@ -0,0 +1,77 @@
"""Tests for the GPU stress + thermal-monitor analysis (synthetic ticks, no real GPU)."""
import unittest
from rigdoctor.core import stress
from rigdoctor.core.health import CRITICAL, OK, WARNING
def _tick(temp=None, power=None, throttle=(), capped=False, lost=False, dt=1.0, **extra):
values = {}
if temp is not None:
values["gpu.temp"] = temp
if power is not None:
values["gpu.power"] = power
values.update(extra)
return stress._Tick(dt=dt, values=values, throttle=list(throttle), power_capped=capped, lost=lost)
class SummarizeTests(unittest.TestCase):
def test_stable_run_is_ok(self):
ticks = [_tick(temp=t, power=200, **{"gpu.power_limit": 280}) for t in (60, 65, 70, 72)]
r = stress.summarize(ticks, load="monitor-only", interval=1.0, faults=[])
self.assertEqual(r.severity, OK)
self.assertEqual(r.peak_temp, 72)
self.assertEqual(r.max_power, 200)
self.assertEqual(r.power_limit, 280)
self.assertFalse(r.throttled)
self.assertIn("Stable", r.verdict)
def test_dwell_time_above_thresholds(self):
# 3 ticks of 2s each at 82/86/92 °C → ≥80 for all 6s, ≥85 for 4s, ≥90 for 2s.
ticks = [_tick(temp=82, dt=2.0), _tick(temp=86, dt=2.0), _tick(temp=92, dt=2.0)]
r = stress.summarize(ticks, load="x", interval=2.0, faults=[])
self.assertEqual(r.time_above[80], 6.0)
self.assertEqual(r.time_above[85], 4.0)
self.assertEqual(r.time_above[90], 2.0)
self.assertNotIn(95, r.time_above) # never reached → omitted
def test_throttling_is_a_warning(self):
ticks = [_tick(temp=88, throttle=["HW thermal slowdown"])]
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, WARNING)
self.assertTrue(r.throttled)
self.assertIn("HW thermal slowdown", r.throttle_reasons)
def test_high_temp_without_throttle_is_a_warning(self):
r = stress.summarize([_tick(temp=93)], load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, WARNING)
self.assertIn("hot", r.verdict.lower())
def test_gpu_lost_is_critical(self):
ticks = [_tick(temp=70), _tick(lost=True)]
r = stress.summarize(ticks, load="x", interval=1.0, faults=[])
self.assertEqual(r.severity, CRITICAL)
self.assertTrue(r.gpu_lost)
def test_journal_fault_is_critical(self):
r = stress.summarize([_tick(temp=70)], load="x", interval=1.0,
faults=["NVIDIA Xid 79 ×1"])
self.assertEqual(r.severity, CRITICAL)
self.assertIn("Xid 79", r.verdict)
def test_no_telemetry_is_info(self):
r = stress.summarize([_tick()], load="monitor-only", interval=1.0, faults=[])
self.assertEqual(r.severity, "info")
self.assertIsNone(r.peak_temp)
class ThrottleDecodeTests(unittest.TestCase):
def test_throttle_bits_map_to_reasons(self):
# the constants used by _throttle_state decode the NVML active-reasons bitmask
self.assertIn("HW thermal slowdown", stress._THROTTLE_BITS.values())
self.assertIn("SW thermal slowdown", stress._THROTTLE_BITS.values())
if __name__ == "__main__":
unittest.main()