Compare commits

..

6 Commits

Author SHA1 Message Date
jessey 7804893054 Merge pull request 'feat(m9): systemd --user trigger modes + game-launch watcher — 0.23.0' (#19) from feat/m9-installer into main
release / release (push) Successful in 14s
Reviewed-on: #19
2026-05-22 07:55:47 +00:00
jessey bf3ac4af1a feat(m9): systemd --user trigger modes + game-launch watcher — 0.23.0
D6 trigger modes, no root:
- core/service.py: write/enable `systemd --user` units; apply_mode(manual/
  always-on/game-launch) reconciles the recorder + watcher services; status().
- core/watcher.py + `rigdoctor watch`: poll Steam RunningAppID, auto-bracket a
  focused capture (D12 zero-config fallback; wrapper stays primary).
- CLI `rigdoctor service status|mode`; config `trigger_mode`.
- GUI Settings: "Recording trigger" dropdown (Apply runs apply_mode off-thread).
- Tests for unit generation, mode reconciliation, watcher transitions/parse.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 09:55:36 +02:00
jessey e4a37176e1 Merge pull request 'feat(m6): PowerMizer + Wine/Steam versions + non-Steam launchers — 0.22.0' (#18) from feat/m6-leftovers into main
release / release (push) Successful in 14s
Reviewed-on: #18
2026-05-22 07:47:26 +00:00
jessey 67665974dc feat(m6): PowerMizer + Wine/Steam versions + non-Steam launchers — 0.22.0
M6 leftovers (the watcher defers to M9's trigger-mode work):
- gameenv: check_gpu_powermizer (NVIDIA, X; degrades when the gpu target won't
  resolve), check_wine (wine --version), check_steam_client (dpkg package version);
  steam.client_version() helper.
- core/launchers.py: detect Lutris (read-only SQLite pga.db) and Heroic (Epic
  legendary + GOG JSON) installed games; Game gained a `launcher` field.
- Games page + `rigdoctor games` list non-Steam games alongside Steam, tagged by
  launcher; Run Diagnostic works on them (auto-launch stays Steam-only).
- Tests for launchers (synthetic Lutris db + Heroic json).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 09:46:42 +02:00
jessey 51b7ed69bd Merge pull request 'feat: live monitor TUI (M2) — 0.21.0' (#17) from feat/m11-tray into main
release / release (push) Successful in 15s
Reviewed-on: #17
2026-05-22 07:38:17 +00:00
jessey 6fca2c9aba feat: live monitor TUI (M2) — 0.21.0
Upgrade `rigdoctor monitor` from a basic redraw to a stdlib curses dashboard
(tui.py): current / session-min / session-max per sensor, grouped by subsystem,
with temperature & utilization color bands (GPU-lost flagged red). q quits,
r resets min/max. Plain full-screen redraw fallback on a non-TTY (--plain forces
it). Pure track()/band() helpers are unit-tested; curses path verified in a pty.

Completes the Monitoring bundle (M2 + M8).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 09:37:57 +02:00
19 changed files with 1007 additions and 51 deletions
+32
View File
@@ -5,6 +5,38 @@ All notable changes to RigDoctor are recorded here. Format follows
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
release tag (so the auto-updater, D18, can compare versions).
## [0.23.0] - 2026-05-22
### Added
- **Crash-logger trigger modes (M9 / D6)** via `systemd --user`, no root: **manual**,
**always-on** (a background service records continuously), and **game-launch** (auto-records
while a Steam game runs). Set it from **Settings → Recording trigger** or
`rigdoctor service mode <manual|always-on|game-launch>`; `rigdoctor service status` shows it.
`core/service.py` writes/enables the user units.
- **Zero-config game-launch watcher** (`core/watcher.py`, `rigdoctor watch`) — polls Steam's
RunningAppID and brackets a focused capture around the running game (the D12 fallback for users
who don't add the `wrap` launch option; the wrapper stays the precise primary path).
## [0.22.0] - 2026-05-22
### Added
- **M6 breadth.** Environment checks now also report **GPU PowerMizer** mode (NVIDIA, X — flags
Adaptive/Auto and suggests Prefer-Max-Performance), the **Wine** version, and the **Steam
client** version.
- **Non-Steam launchers.** Lutris (its SQLite library) and Heroic (Epic + GOG JSON stores) are
detected (`core/launchers.py`) and listed on the Games page and `rigdoctor games`, tagged by
launcher. You can Run Diagnostic on them too (records while you play; auto-launch stays
Steam-only).
### Notes
- The zero-config game watcher (D12 fallback) is deferred to the M9 trigger-mode work, where the
service integration lives.
## [0.21.0] - 2026-05-22
### Added
- **Live monitor TUI (M2).** `rigdoctor monitor` is now a proper **curses** dashboard:
current / session-min / session-max per sensor, grouped by subsystem, with temperature and
utilization **color bands** (and GPU-lost flagged red). `q` quits, `r` resets the session
min/max. Falls back to a plain full-screen redraw on a non-TTY (`--plain` forces it). The
terminal face of the same live data the GUI dashboard graphs. Completes the Monitoring bundle.
## [0.20.0] - 2026-05-22
### Changed
- **Reorganized navigation** into grouped sidebar sections — **Monitor** (Dashboard) ·
+9 -4
View File
@@ -11,7 +11,7 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
| M1 | Sensor core | Essential | none (nvidia-smi, sysfs) | all (NVIDIA first) | P0 | ✅ |
| M3 | Crash-capture logger | Essential | none (opt: smartmontools) | all (NVIDIA first) | P0 | ✅ |
| M4 | Health report (log scan) | Essential | none (opt: smartmontools) | all (NVIDIA first) | P0 | ✅ |
| M2 | Live monitor (TUI) | Monitoring | none (stdlib curses) | all | P1 | |
| M2 | Live monitor (TUI) | Monitoring | none (stdlib curses) | all | P1 | |
| M8 | Alerting | Monitoring | libnotify (opt) | all | P2 | ✅ |
| M5 | System inventory | Diagnostics | none (opt: lm-sensors, dmidecode) | all | P1 | ✅ |
| M6 | Gaming env checks | Diagnostics | none | all | P2 | 🟨 |
@@ -41,7 +41,10 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
findings (see SPEC §4). *Implemented:* journalctl scan (Xid/panic/OOM/MCE/AER/thermal/amdgpu),
SMART, NVIDIA driver-mismatch, journald-persistence + live-temp checks; `rigdoctor report`
(text/JSON) + GUI Health tab. GPU-firmware verification deferred.
- **M2 Live monitor** — depends on M1; the terminal "HWMonitor for Linux" face. Stdlib-only.
- **M2 Live monitor** — the terminal "HWMonitor for Linux" face. *Implemented (`tui.py`):*
`rigdoctor monitor` is a stdlib **curses** dashboard — current / session-min / session-max
per sensor, grouped by subsystem, with temperature & utilization color bands; `q` quits,
`r` resets the min/max. Falls back to a plain redraw on a non-TTY (`--plain` forces it).
- **M5 / M6 Diagnostics** — inventory export + gaming-env checks; M6 flags risky settings and
suggests the fix command but does not apply it (D9). *M6 implemented (Steam detection first —
the D12 "pick a game" foundation):* discovers Steam installs + all library folders
@@ -64,8 +67,10 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
banner → results dialog). **Auto-capture** via the D12 wrapper (`rigdoctor wrap %command%`,
`core/wrap.py`; GUI "Auto-capture…" helper). **Hard crashes are detected** (capture left
without a clean stop) and flagged on next launch with a crash-boot kernel-log analysis
(`pending_crash`/`analyze_crash` + `health.check_previous_boot`). *Pending:* non-Steam
launchers (Lutris/Heroic), GPU power-profile (PowerMizer) checks, and the zero-config watcher.
(`pending_crash`/`analyze_crash` + `health.check_previous_boot`). **Non-Steam launchers**
(Lutris SQLite + Heroic JSON, `core/launchers.py`) are detected and listed alongside Steam
games; env checks also cover **GPU PowerMizer** (X), **Wine** and **Steam-client** versions.
*Pending:* the zero-config watcher (D12 fallback) — landing with M9's trigger-mode work.
- **M8 Alerting** — threshold/event notifications; integrates with the tray applet (M11).
- **M10 Desktop GUI** — PySide6 graphical front-end over the core engine. Optional; adds the
Qt dependency. Dark-themed window with a **grouped sidebar** (Monitor / Diagnose / System /
+10 -5
View File
@@ -22,7 +22,8 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
last readings + a plausible cause.
## Phase 2 — Live monitor (terminal)
- [ ] M2 TUI dashboard (current/min/max, grouped, throttle highlighting)
- [x] M2 TUI dashboard (`rigdoctor monitor`, `tui.py`): curses, current/min/max grouped by
subsystem with temp/usage color bands; q quit / r reset; plain-redraw fallback on non-TTY
- [ ] M8 basic alerting (overheat/throttle/GPU-lost notifications)
## Phase 3 — Diagnostics breadth
@@ -33,7 +34,9 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
This is also the D12 "pick a game" foundation. *Env-check engine done* (`rigdoctor gameenv`
+ GUI Environment page): PCIe ASPM, NVIDIA persistence, CPU governor, GameMode, MangoHud,
swappiness, shader cache, THP, mitigations, Proton versions — read-only with fix commands.
*Pending:* non-Steam launchers (Lutris/Heroic) + GPU power-profile (PowerMizer) checks.
Also: GPU PowerMizer (X), Wine + Steam-client versions, and non-Steam launchers
(Lutris/Heroic, `core/launchers.py`). *Pending:* the zero-config watcher (D12 fallback,
lands with M9's trigger-mode work).
- [ ] SMART integration (smartmontools if present)
## Phase 4 — Desktop UI & installer
@@ -58,9 +61,11 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
watcher (Steam RunningAppID + /proc), GameMode hook, and the always-on `systemd --user`
service.
- [~] M9 interactive installer — *done:* distro/GPU detection + optional-dependency install
(`rigdoctor install`, GUI Setup tab); **user-local `install.sh` + self-extracting `.run`**
(no-root venv install, handles python3-venv prereq, CI-built). *Pending:* module-selection
config + `systemd --user` service enable + trigger-mode pick.
(`rigdoctor install`, GUI Settings); **user-local `install.sh` + self-extracting `.run`**
(no-root venv install, handles python3-venv prereq, CI-built); **`systemd --user` trigger
modes** (`core/service.py`, `rigdoctor service mode manual|always-on|game-launch` + GUI
Settings "Recording trigger") incl. the zero-config **game-launch watcher**
(`core/watcher.py`, `rigdoctor watch`). *Pending:* module-selection config during install.
- [ ] `.deb` packaging (D8) declaring per-bundle deps incl. python3-pyside6 for Desktop UI
## Phase 5 — Breadth (later)
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "rigdoctor"
version = "0.20.0"
version = "0.23.0"
description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
readme = "README.md"
requires-python = ">=3.11"
+1 -1
View File
@@ -1,3 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.20.0"
__version__ = "0.23.0"
+73 -32
View File
@@ -44,17 +44,10 @@ def cmd_snapshot(args) -> int:
def cmd_monitor(args) -> int:
from .tui import run
interval = args.interval or load_config()["interval"]
try:
for sample in _sampler().stream(interval=interval):
# Basic full-screen redraw; the rich TUI (M2) comes later.
print("\033[2J\033[H", end="")
print(f"RigDoctor — live (every {interval:g}s, Ctrl-C to quit)\n")
print(render_snapshot(sample))
sys.stdout.flush()
except KeyboardInterrupt:
print()
return 0
return run(interval, plain=getattr(args, "plain", False))
def cmd_gui(args) -> int:
@@ -423,6 +416,34 @@ def cmd_wrap(args) -> int:
return wrap.run(args.command)
def cmd_watch(args) -> int:
from .core import watcher
interval = args.interval or load_config().get("interval", 1.0)
print("Watching for a running Steam game (Ctrl-C to stop)…")
return watcher.watch(interval=max(2.0, interval))
def cmd_service(args) -> int:
from .core import service
sub = args.service_cmd or "status"
if sub == "mode":
ok, msg = service.apply_mode(args.mode)
print(f"Trigger mode set to '{args.mode}'.")
if not ok and msg:
print(f" note: {msg}")
return 0 if ok or not service.available() else 1
info = service.status()
print(f"Trigger mode: {info['mode']}")
print(f"systemd --user: {'available' if info['available'] else 'not available'}")
if info["available"]:
print(f" recorder service: {'active' if info.get('recorder_active') else 'inactive'}")
print(f" watcher service: {'active' if info.get('watch_active') else 'inactive'}")
return 0
def cmd_gameenv(args) -> int:
from dataclasses import asdict
@@ -438,34 +459,41 @@ def cmd_gameenv(args) -> int:
def cmd_games(args) -> int:
from .core import steam
from dataclasses import asdict
from .core import launchers, steam
selected = steam.selected_library_paths()
if not selected:
print("No Steam libraries selected to scan.")
print(" See them with: rigdoctor games libraries")
print(" Then enable one: rigdoctor games libraries --enable <path> (or --all)")
return 1
result = steam.rescan()
if args.json:
from dataclasses import asdict
result = steam.rescan() if selected else None
steam_games = result.games if result else []
extra = launchers.scan() # non-Steam (Lutris/Heroic)
all_games = list(steam_games) + list(extra)
if args.json:
print(json.dumps({
"scanned_at": result.scanned_at,
"new_appids": result.new_appids,
"games": [asdict(g) for g in result.games],
"scanned_at": result.scanned_at if result else None,
"new_appids": result.new_appids if result else [],
"games": [asdict(g) for g in all_games],
}, indent=2, ensure_ascii=False))
return 0
if not result.games:
print("No games found in the selected Steam libraries.")
if not all_games:
if not selected:
print("No Steam libraries selected and no non-Steam games found.")
print(" Pick a Steam library: rigdoctor games libraries --enable <path> (or --all)")
return 1
print("No games found.")
return 0
new = set(result.new_appids)
print(f"{len(result.games)} game(s) across {len(selected)} librar(y/ies):\n")
for g in result.games:
flag = " NEW" if g.appid in new else ""
print(f" {g.name:<48} {steam.human_size(g.size_bytes):>9}{flag}")
if new:
print(f"\n{len(new)} newly-installed since the last scan.")
new = set(result.new_appids) if result else set()
print(f"{len(all_games)} game(s):\n")
for g in all_games:
tag = " NEW" if g.appid in new else ""
src = "" if g.launcher == "steam" else f" [{g.launcher}]"
size = steam.human_size(g.size_bytes) if g.size_bytes else ""
print(f" {g.name:<46}{src:<10} {size:>9}{tag}")
if not selected:
print("\n(no Steam libraries selected — `rigdoctor games libraries --all` to add them)")
return 0
@@ -516,8 +544,9 @@ def build_parser() -> argparse.ArgumentParser:
sp.add_argument("--json", action="store_true", help="output JSON instead of text")
sp.set_defaults(func=cmd_snapshot)
mp = sub.add_parser("monitor", help="live-refreshing sensor view")
mp = sub.add_parser("monitor", help="live monitor TUI (current/min/max, M2)")
mp.add_argument("-n", "--interval", type=float, default=None, help="refresh interval (s)")
mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI")
mp.set_defaults(func=cmd_monitor)
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
@@ -617,6 +646,18 @@ def build_parser() -> argparse.ArgumentParser:
wrap_p.add_argument("command", nargs=argparse.REMAINDER,
help="the game command — use `rigdoctor wrap %%command%%` in Steam")
wrap_p.set_defaults(func=cmd_wrap)
watch_p = sub.add_parser("watch", help="auto-capture while a Steam game runs (game-launch trigger)")
watch_p.add_argument("-n", "--interval", type=float, default=None, help="poll interval (s)")
watch_p.set_defaults(func=cmd_watch)
svc_p = sub.add_parser("service", help="crash-logger trigger mode + systemd --user service (M9/D6)")
svc_sub = svc_p.add_subparsers(dest="service_cmd")
svc_sub.add_parser("status", help="show the trigger mode and service state").set_defaults(func=cmd_service)
mode_p = svc_sub.add_parser("mode", help="set the trigger mode")
mode_p.add_argument("mode", choices=("manual", "always-on", "game-launch"))
mode_p.set_defaults(func=cmd_service)
svc_p.set_defaults(func=cmd_service, service_cmd=None)
return p
+1
View File
@@ -154,6 +154,7 @@ DEFAULTS: dict = {
"cpu_temp_alert": 95.0, # °C — alert when CPU reaches this
"relay_url": "wss://rigdoctor.jesseyvanofferen.com", # session-sharing relay (M12)
"steam_libraries": [], # Steam library paths to scan for games (M6); empty = none picked yet
"trigger_mode": "manual", # crash-logger trigger (D6): manual | always-on | game-launch
}
+57
View File
@@ -71,6 +71,32 @@ def check_pcie_aspm() -> list[Finding]:
# --- NVIDIA persistence mode (seed-case relevant) -------------------------------------
def check_gpu_powermizer() -> list[Finding]:
"""NVIDIA PowerMizer preferred-performance mode (X only, via nvidia-settings)."""
if shutil.which("nvidia-settings") is None or not os.environ.get("DISPLAY"):
return []
try:
proc = subprocess.run(
["nvidia-settings", "-q", "[gpu:0]/GPUPowerMizerMode", "-t"],
capture_output=True, text=True, timeout=10,
)
except (subprocess.SubprocessError, OSError):
return []
raw = proc.stdout.strip().splitlines()[0].strip() if proc.stdout.strip() else ""
if not raw.isdigit(): # no X target / Wayland / query failed — skip quietly
return []
names = {0: "Adaptive", 1: "Prefer Maximum Performance", 2: "Auto"}
name = names.get(int(raw), f"mode {raw}")
if int(raw) == 1:
return [Finding(OK, "GPU", f"GPU PowerMizer: {name}", "The GPU prefers maximum performance.")]
return [Finding(
INFO, "GPU", f"GPU PowerMizer: {name}",
"Adaptive/Auto can downclock the GPU between load spikes, hurting frame consistency.",
"Prefer max performance (X only, resets on reboot): "
"`nvidia-settings -a '[gpu:0]/GPUPowerMizerMode=1'`.",
)]
def check_gpu_persistence() -> list[Finding]:
if shutil.which("nvidia-smi") is None:
return []
@@ -235,6 +261,34 @@ def check_mitigations() -> list[Finding]:
# --- Proton versions (informational) --------------------------------------------------
def check_wine() -> list[Finding]:
"""System Wine version (used by Lutris / non-Proton games)."""
if shutil.which("wine") is None:
return []
try:
proc = subprocess.run(["wine", "--version"], capture_output=True, text=True, timeout=10)
except (subprocess.SubprocessError, OSError):
return []
ver = proc.stdout.strip().split()[0] if proc.stdout.strip() else ""
if not ver:
return []
return [Finding(
INFO, "Tools", f"Wine: {ver}",
"System Wine — used by Lutris and non-Proton titles.",
"Steam games generally run best on Proton; keep Wine current for native/Lutris use.",
)]
def check_steam_client() -> list[Finding]:
"""Installed Steam client package version."""
from . import steam
ver = steam.client_version()
if not ver:
return []
return [Finding(INFO, "Tools", f"Steam client: {ver}", "The installed Steam package version.")]
def check_proton() -> list[Finding]:
from . import steam
@@ -259,6 +313,7 @@ def run_gameenv_checks() -> list[Finding]:
findings: list[Finding] = []
findings += check_pcie_aspm()
findings += check_gpu_persistence()
findings += check_gpu_powermizer()
findings += check_cpu_governor()
findings += check_gamemode()
findings += check_mangohud()
@@ -267,5 +322,7 @@ def run_gameenv_checks() -> list[Finding]:
findings += check_thp()
findings += check_mitigations()
findings += check_proton()
findings += check_wine()
findings += check_steam_client()
findings.sort(key=lambda f: _ORDER.get(f.severity, 9))
return findings
+89
View File
@@ -0,0 +1,89 @@
"""Non-Steam game detection (M6): Lutris + Heroic installed games.
Reads each launcher's own install records (Lutris' SQLite library, Heroic's JSON stores),
returning the same `steam.Game` shape tagged with the launcher. Stdlib only; every reader
degrades to [] if the launcher isn't installed or its files can't be parsed.
"""
from __future__ import annotations
import json
import os
import sqlite3
from pathlib import Path
from .steam import Game
LUTRIS_DB = Path(os.path.expanduser("~/.local/share/lutris/pga.db"))
HEROIC_DIR = Path(os.path.expanduser("~/.config/heroic"))
def _lutris_games() -> list[Game]:
db = LUTRIS_DB
if not db.exists():
return []
games: list[Game] = []
try:
con = sqlite3.connect(f"file:{db}?mode=ro", uri=True) # read-only
try:
rows = con.execute(
"SELECT name, slug FROM games WHERE installed = 1 AND name IS NOT NULL"
).fetchall()
finally:
con.close()
except (sqlite3.Error, OSError):
return []
for name, slug in rows:
if name:
games.append(Game(appid=slug or "", name=str(name), library="", installdir="",
launcher="lutris"))
return games
def _read_json(path: Path):
try:
return json.loads(path.read_text())
except (OSError, ValueError):
return None
def _heroic_games() -> list[Game]:
base = HEROIC_DIR
if not base.is_dir():
return []
games: list[Game] = []
# Epic / Legendary: {app_name: {"title": ..., ...}}
epic = _read_json(base / "legendaryConfig" / "legendary" / "installed.json")
if isinstance(epic, dict):
for app_name, info in epic.items():
if isinstance(info, dict):
games.append(Game(appid=str(app_name), name=info.get("title") or str(app_name),
library="", installdir="", launcher="heroic"))
# GOG: {"installed": [{"appName", "install_path", "title"?}]}
gog = _read_json(base / "gog_store" / "installed.json")
entries = gog.get("installed") if isinstance(gog, dict) else None
if isinstance(entries, list):
for e in entries:
if not isinstance(e, dict):
continue
install_path = e.get("install_path") or ""
title = e.get("title") or os.path.basename(install_path.rstrip("/")) or str(e.get("appName", ""))
if title:
games.append(Game(appid=str(e.get("appName", "")), name=title, library="",
installdir="", launcher="heroic"))
return games
def scan() -> list[Game]:
"""Installed non-Steam games (Lutris + Heroic), de-duplicated, sorted by name."""
seen: set[tuple[str, str]] = set()
out: list[Game] = []
for game in _lutris_games() + _heroic_games():
key = (game.launcher, game.name)
if key in seen:
continue
seen.add(key)
out.append(game)
return sorted(out, key=lambda g: g.name.lower())
+118
View File
@@ -0,0 +1,118 @@
"""`systemd --user` services for the crash logger + game watcher (M9 / D6 trigger modes).
Three trigger modes (D6): **manual** (no service — start/stop by hand), **always-on** (a user
service samples continuously, bounded by log rotation), and **game-launch** (a watcher service
auto-brackets a capture around each game). No root: everything is a `systemd --user` unit in
``~/.config/systemd/user``. Degrades gracefully when systemd isn't available.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
from pathlib import Path
from .. import config
UNIT_DIR = Path(os.path.expanduser("~/.config/systemd/user"))
RECORDER_UNIT = "rigdoctor-recorder.service"
WATCH_UNIT = "rigdoctor-watch.service"
MODES = ("manual", "always-on", "game-launch")
_UNITS = {
RECORDER_UNIT: ("RigDoctor crash-capture recorder (always-on)", ["record", "run"]),
WATCH_UNIT: ("RigDoctor game-launch watcher", ["watch"]),
}
def available() -> bool:
return shutil.which("systemctl") is not None
def _rigdoctor_bin() -> str:
exe = Path(sys.executable).with_name("rigdoctor") # next to the venv python
if exe.exists():
return str(exe)
return shutil.which("rigdoctor") or "rigdoctor"
def _systemctl(*args: str) -> tuple[int, str]:
try:
proc = subprocess.run(["systemctl", "--user", *args],
capture_output=True, text=True, timeout=20)
return proc.returncode, (proc.stdout + proc.stderr).strip()
except (OSError, subprocess.SubprocessError) as exc:
return 1, str(exc)
def unit_text(description: str, args: list[str]) -> str:
exec_cmd = " ".join([_rigdoctor_bin(), *args])
return (
"[Unit]\n"
f"Description={description}\n\n"
"[Service]\n"
"Type=simple\n"
f"ExecStart={exec_cmd}\n"
"Restart=on-failure\n"
"RestartSec=5\n\n"
"[Install]\n"
"WantedBy=default.target\n"
)
def install_units() -> None:
"""Write/refresh both unit files and reload systemd (idempotent)."""
UNIT_DIR.mkdir(parents=True, exist_ok=True)
for name, (desc, args) in _UNITS.items():
(UNIT_DIR / name).write_text(unit_text(desc, args))
_systemctl("daemon-reload")
def is_active(name: str) -> bool:
return _systemctl("is-active", name)[0] == 0
def is_enabled(name: str) -> bool:
return _systemctl("is-enabled", name)[0] == 0
def _enable(name: str) -> tuple[int, str]:
return _systemctl("enable", "--now", name)
def _disable(name: str) -> tuple[int, str]:
return _systemctl("disable", "--now", name)
def apply_mode(mode: str) -> tuple[bool, str]:
"""Reconcile the user services to `mode` and persist it. Returns (ok, message)."""
if mode not in MODES:
return False, f"Unknown trigger mode: {mode}"
if not available():
config.update_config(trigger_mode=mode)
return False, "systemd --user isn't available — mode saved, but no service was changed."
install_units()
if mode == "always-on":
_disable(WATCH_UNIT)
rc, out = _enable(RECORDER_UNIT)
elif mode == "game-launch":
_disable(RECORDER_UNIT)
rc, out = _enable(WATCH_UNIT)
else: # manual
_disable(RECORDER_UNIT)
_disable(WATCH_UNIT)
rc, out = 0, ""
config.update_config(trigger_mode=mode)
return rc == 0, out
def status() -> dict:
"""Current trigger mode (config) + live service states (best-effort)."""
cfg = config.load_config()
info = {"available": available(), "mode": cfg.get("trigger_mode", "manual")}
if info["available"]:
info["recorder_active"] = is_active(RECORDER_UNIT)
info["watch_active"] = is_active(WATCH_UNIT)
return info
+19 -2
View File
@@ -58,10 +58,11 @@ class SteamLibrary:
class Game:
appid: str
name: str
library: str # library path the game lives in
library: str # library path the game lives in (Steam)
installdir: str # folder name under <library>/steamapps/common
size_bytes: int = 0
last_updated: int = 0 # epoch seconds (acf LastUpdated), 0 if unknown
launcher: str = "steam" # "steam" | "lutris" | "heroic"
# --- VDF (Valve Data Format) parsing --------------------------------------------------
@@ -313,7 +314,8 @@ def cached_games() -> list[Game]:
cache = load_cache()
if not cache:
return []
return [Game(**{k: g.get(k) for k in Game.__dataclass_fields__}) for g in cache.get("games", [])]
# Only pass keys present in the record so dataclass defaults fill any new fields.
return [Game(**{k: g[k] for k in Game.__dataclass_fields__ if k in g}) for g in cache.get("games", [])]
def rescan(cfg: dict | None = None) -> ScanResult:
@@ -353,6 +355,21 @@ def acknowledge_new() -> None:
# --- formatting -----------------------------------------------------------------------
def client_version() -> str | None:
"""The installed Steam package version (apt), or None — best-effort, offline."""
if shutil.which("dpkg-query") is None:
return None
for pkg in ("steam-installer", "steam-launcher", "steam"):
try:
proc = subprocess.run(["dpkg-query", "-W", "-f=${Version}", pkg],
capture_output=True, text=True, timeout=10)
except (subprocess.SubprocessError, OSError):
continue
if proc.returncode == 0 and proc.stdout.strip():
return proc.stdout.strip()
return None
def launch_game(appid: str) -> bool:
"""Best-effort: ask Steam to launch a game by appid (steam:// URL). Non-blocking."""
if not appid:
+107
View File
@@ -0,0 +1,107 @@
"""Zero-config game-launch watcher (D12 fallback): poll Steam's RunningAppID and
auto-bracket a focused capture around the running game.
For users who won't add the `rigdoctor wrap %command%` launch option. Less precise than the
wrapper (it depends on Steam writing RunningAppID to registry.vdf, and only covers Steam), so
the wrapper stays the primary mechanism. Stdlib only; safe to run as a `systemd --user` service
(the game-launch trigger mode).
"""
from __future__ import annotations
import os
import signal
import time
from pathlib import Path
from . import reccontrol, steam
from .steam import _parse_vdf
_REGISTRY_CANDIDATES = ("~/.steam/registry.vdf", "~/.steam/steam/registry.vdf")
def _registry_path() -> Path | None:
for cand in _REGISTRY_CANDIDATES:
p = Path(os.path.expanduser(cand))
if p.exists():
return p
return None
def _find_key(data: dict, key: str):
"""Recursively find a (case-insensitive) scalar key in nested VDF dicts."""
target = key.lower()
for k, v in data.items():
if isinstance(v, dict):
found = _find_key(v, key)
if found is not None:
return found
elif k.lower() == target:
return v
return None
def running_appid() -> int:
"""The Steam appid currently running (0 if none / unknown)."""
path = _registry_path()
if path is None:
return 0
try:
data = _parse_vdf(path.read_text(encoding="utf-8", errors="replace"))
except OSError:
return 0
raw = _find_key(data, "RunningAppID")
try:
return int(raw)
except (TypeError, ValueError):
return 0
def transition(prev: int, current: int) -> str | None:
"""'start' when a game begins, 'stop' when it ends, else None."""
if current and not prev:
return "start"
if prev and not current:
return "stop"
return None
def _name_for(appid: int) -> str:
target = str(appid)
for g in steam.cached_games() or steam.scan_games(steam.selected_library_paths()):
if g.appid == target:
return g.name
return f"Steam app {appid}"
def watch(interval: float = 5.0) -> int:
"""Poll for a running Steam game and bracket a capture around it. Blocks until signalled."""
from . import diagnostic
stop = {"flag": False}
def _on_signal(_sig, _frame):
stop["flag"] = True
signal.signal(signal.SIGTERM, _on_signal)
signal.signal(signal.SIGINT, _on_signal)
prev = 0
started = False
while not stop["flag"]:
current = running_appid()
action = transition(prev, current)
if action == "start" and not reccontrol.running_pid():
started = diagnostic.start(game=_name_for(current)) is not None
elif action == "stop" and started:
reccontrol.stop_background()
started = False
prev = current
# Sleep in small slices so a stop signal is handled promptly.
slept = 0.0
while slept < interval and not stop["flag"]:
time.sleep(min(0.25, interval - slept))
slept += 0.25
if started:
reccontrol.stop_background()
return 0
+17 -5
View File
@@ -88,6 +88,7 @@ class GamesPage(QWidget):
self._diag_done.connect(self._on_diag_done)
self._busy = False
self._new_appids: set[str] = set()
self._extra_games: list = [] # non-Steam (Lutris/Heroic), appended after a scan
self._diag_game: str | None = None
root = QVBoxLayout(self)
@@ -213,7 +214,7 @@ class GamesPage(QWidget):
threading.Thread(target=self._work, daemon=True).start()
def _work(self) -> None:
from ..core import steam
from ..core import launchers, steam
try:
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
@@ -223,6 +224,10 @@ class GamesPage(QWidget):
for lib in steam.discover_libraries()
]
self._libraries_ready.emit(libs)
try:
self._extra_games = launchers.scan() # Lutris / Heroic (non-Steam)
except Exception:
self._extra_games = []
self._scanned.emit(steam.rescan())
except Exception:
self._scanned.emit(None)
@@ -265,11 +270,13 @@ class GamesPage(QWidget):
self._status.setText("scan failed")
return
self._new_appids = set(result.new_appids)
self._populate_games(result.games, self._new_appids)
games = list(result.games) + list(self._extra_games)
self._populate_games(games, self._new_appids)
new = len(self._new_appids)
suffix = f" · {new} new" if new else ""
non_steam = f" · {len(self._extra_games)} non-Steam" if self._extra_games else ""
self._status.setText(
f"{len(result.games)} games · {time.strftime('%H:%M:%S')}{suffix}"
f"{len(games)} games · {time.strftime('%H:%M:%S')}{suffix}{non_steam}"
)
self.new_count_changed.emit(new)
@@ -293,12 +300,17 @@ class GamesPage(QWidget):
return
for g in games:
launcher = getattr(g, "launcher", "steam")
if launcher != "steam":
sublabel, appid = launcher.title(), "" # non-Steam: can't steam:// launch it
else:
sublabel, appid = (os.path.basename(g.library.rstrip("/")) or g.library), g.appid
self._list.addWidget(_game_row(
g.name,
os.path.basename(g.library.rstrip("/")) or g.library,
sublabel,
steam.human_size(g.size_bytes),
g.appid in new_appids,
appid=g.appid,
appid=appid,
on_diagnose=self._start_diagnostic,
))
self._list.addStretch(1)
+51 -1
View File
@@ -9,6 +9,7 @@ from PySide6.QtGui import QDesktopServices
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QComboBox,
QDoubleSpinBox,
QFrame,
QGridLayout,
@@ -24,7 +25,7 @@ from PySide6.QtWidgets import (
)
from .. import config
from ..core import alerts, installer, sysenv, uninstall, updates
from ..core import alerts, installer, service, sysenv, uninstall, updates
from .theme import GOOD, MUTED, WARN
@@ -52,6 +53,7 @@ _BACKEND_DESC = {
class SetupPage(QWidget):
_installed = Signal(int, str)
_upd_state = Signal(object)
_mode_applied = Signal(object) # (mode, ok, message) from a trigger-mode change
changed = Signal() # alert settings saved — main window re-applies them live
def __init__(self) -> None:
@@ -59,6 +61,7 @@ class SetupPage(QWidget):
self.setObjectName("Page")
self._installed.connect(self._on_installed)
self._upd_state.connect(self._on_upd_state)
self._mode_applied.connect(self._on_mode_applied)
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18)
@@ -123,6 +126,35 @@ class SetupPage(QWidget):
alerts_layout.addLayout(alerts_buttons)
root.addWidget(alerts_card)
# Recording trigger (M9 / D6): when the crash logger runs.
trig_card, trig_layout = _panel("Recording trigger")
trig_desc = QLabel(
"When the crash logger runs (uses a systemd --user service):\n"
"• Manual — you start/stop it yourself.\n"
"• Always-on — a background service records continuously.\n"
"• Game-launch — auto-records while a Steam game is running."
)
trig_desc.setObjectName("Muted")
trig_desc.setWordWrap(True)
trig_layout.addWidget(trig_desc)
trig_row = QHBoxLayout()
self._trigger = QComboBox()
self._trigger.addItems(list(service.MODES))
apply_trigger = QPushButton("Apply")
apply_trigger.setObjectName("PrimaryButton")
apply_trigger.clicked.connect(self._apply_trigger)
trig_row.addWidget(self._trigger, 1)
trig_row.addWidget(apply_trigger)
trig_layout.addLayout(trig_row)
self._trigger_status = QLabel("")
self._trigger_status.setObjectName("Muted")
self._trigger_status.setWordWrap(True)
trig_layout.addWidget(self._trigger_status)
if not service.available():
apply_trigger.setEnabled(False)
self._trigger_status.setText("systemd --user isn't available on this system.")
root.addWidget(trig_card)
# Account access (M13/M12): one Gitea token gates updates and session sharing.
upd_card, upd_layout = _panel("Account access")
hint = QLabel("A Gitea access token unlocks updates and session sharing. "
@@ -167,8 +199,26 @@ class SetupPage(QWidget):
self._refresh()
self._load_alerts()
self._trigger.setCurrentText(config.load_config().get("trigger_mode", "manual"))
self._refresh_update_status()
# --- recording trigger (M9) -----------------------------------------------
def _apply_trigger(self) -> None:
mode = self._trigger.currentText()
self._trigger_status.setText(f"Applying “{mode}”… (may take a moment)")
threading.Thread(target=self._work_trigger, args=(mode,), daemon=True).start()
def _work_trigger(self, mode: str) -> None:
ok, msg = service.apply_mode(mode)
self._mode_applied.emit((mode, ok, msg))
def _on_mode_applied(self, result) -> None:
mode, ok, msg = result
if ok:
self._trigger_status.setText(f"Recording trigger set to “{mode}”.")
else:
self._trigger_status.setText(f"{mode}” saved. {msg}")
# --- alerts (M8) ----------------------------------------------------------
@staticmethod
def _spin() -> QDoubleSpinBox:
+170
View File
@@ -0,0 +1,170 @@
"""Live monitor TUI (M2): a curses HWMonitor-style terminal dashboard.
Shows current / session-min / session-max per sensor, grouped by subsystem, with
temperature and utilization color bands. stdlib `curses` only; falls back to a plain
full-screen redraw when stdout isn't a TTY (piped/SSH-without-tty). Keys: q quit, r reset
the session min/max. The terminal face of the same live data the GUI dashboard graphs.
"""
from __future__ import annotations
import curses
import sys
import time
from .core.sample import Reading, Sample
from .core.sampler import Sampler
from .core.sources import available_sources
from .render import _GROUP_ORDER, _GROUP_TITLES, format_raw, metric_label, render_snapshot
# Color-band thresholds (mirror the GUI dashboard so both faces agree).
TEMP_COLD, TEMP_WARN, TEMP_CRIT = 50.0, 78.0, 88.0
USAGE_WARN, USAGE_CRIT = 85.0, 95.0
_USAGE_METRICS = {"util", "used_pct", "mem_util", "load"}
def band(r: Reading) -> str:
"""Color band for a reading: cold | good | warn | crit | normal | na."""
if r.source == "gpu" and r.metric == "status": # GPU-lost / query timeout
return "crit"
if r.value is None:
return "na"
if r.unit == "°C":
if r.value >= TEMP_CRIT:
return "crit"
if r.value >= TEMP_WARN:
return "warn"
if r.value >= TEMP_COLD:
return "good"
return "cold"
if r.unit == "%" and r.metric in _USAGE_METRICS:
if r.value >= USAGE_CRIT:
return "crit"
if r.value >= USAGE_WARN:
return "warn"
return "good"
return "normal"
def track(stats: dict[str, tuple[float, float]], sample: Sample) -> None:
"""Fold a sample's readings into {key: (min, max)} session extremes."""
for r in sample.readings:
if r.value is None:
continue
lo, hi = stats.get(r.key, (r.value, r.value))
stats[r.key] = (min(lo, r.value), max(hi, r.value))
# --- curses front-end -----------------------------------------------------------------
_BAND_PAIR = {"cold": 1, "good": 2, "warn": 3, "crit": 4}
def _init_colors() -> None:
try:
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1)
curses.init_pair(2, curses.COLOR_GREEN, -1)
curses.init_pair(3, curses.COLOR_YELLOW, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
except curses.error:
pass
def _attr(band_name: str) -> int:
pair = _BAND_PAIR.get(band_name)
if not pair:
return curses.A_NORMAL
attr = curses.color_pair(pair)
return attr | curses.A_BOLD if band_name == "crit" else attr
def _draw(stdscr, sample: Sample, stats: dict, interval: float) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
def put(y: int, x: int, text: str, attr: int = curses.A_NORMAL) -> None:
if 0 <= y < height and 0 <= x < width:
try:
stdscr.addnstr(y, x, text, max(0, width - x - 1), attr)
except curses.error:
pass
put(0, 0, f"RigDoctor — live monitor every {interval:g}s", curses.A_BOLD)
put(1, 0, "q quit r reset min/max", curses.A_DIM)
groups = sample.by_source()
order = [k for k in _GROUP_ORDER if k in groups] + [k for k in groups if k not in _GROUP_ORDER]
name_w, col_w = 24, 11
y = 3
for key in order:
if y >= height:
break
put(y, 0, _GROUP_TITLES.get(key, key.title()), curses.A_BOLD)
y += 1
put(y, 2, f"{'sensor':<{name_w}}{'current':>{col_w}}{'min':>{col_w}}{'max':>{col_w}}", curses.A_DIM)
y += 1
for r in groups[key]:
if y >= height:
break
if r.metric == "name": # device identity line
put(y, 2, str(r.label), curses.A_DIM)
y += 1
continue
lo, hi = stats.get(r.key, (r.value, r.value))
put(y, 2, f"{metric_label(r):<{name_w}}")
put(y, 2 + name_w, f"{format_raw(r.value, r.unit):>{col_w}}", _attr(band(r)))
put(y, 2 + name_w + col_w, f"{format_raw(lo, r.unit):>{col_w}}", curses.A_DIM)
put(y, 2 + name_w + 2 * col_w, f"{format_raw(hi, r.unit):>{col_w}}", curses.A_DIM)
y += 1
y += 1
stdscr.refresh()
def _loop(stdscr, sampler: Sampler, interval: float) -> None:
curses.curs_set(0)
stdscr.nodelay(True)
_init_colors()
stats: dict[str, tuple[float, float]] = {}
latest = sampler.sample()
track(stats, latest)
next_sample = time.monotonic() + interval
while True:
ch = stdscr.getch()
if ch in (ord("q"), ord("Q")):
return
if ch in (ord("r"), ord("R")):
stats.clear()
track(stats, latest)
now = time.monotonic()
if now >= next_sample:
latest = sampler.sample()
track(stats, latest)
next_sample = now + interval
_draw(stdscr, latest, stats, interval)
time.sleep(0.05) # keep key handling responsive without busy-spinning
def _run_plain(sampler: Sampler, interval: float) -> int:
"""Fallback for non-TTY output: clear + reprint each tick (no curses)."""
try:
for sample in sampler.stream(interval=interval):
print("\033[2J\033[H", end="")
print(f"RigDoctor — live (every {interval:g}s, Ctrl-C to quit)\n")
print(render_snapshot(sample))
sys.stdout.flush()
except KeyboardInterrupt:
print()
return 0
def run(interval: float, plain: bool = False) -> int:
sampler = Sampler(available_sources())
if plain or not sys.stdout.isatty():
return _run_plain(sampler, interval)
try:
curses.wrapper(_loop, sampler, interval)
except curses.error: # terminal can't do curses — degrade gracefully
return _run_plain(sampler, interval)
return 0
+67
View File
@@ -0,0 +1,67 @@
"""Tests for M6 non-Steam game detection (Lutris SQLite + Heroic JSON)."""
import json
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from rigdoctor.core import launchers
class LutrisTests(unittest.TestCase):
def test_reads_installed_games_only(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "pga.db"
con = sqlite3.connect(db)
con.execute("CREATE TABLE games (id INTEGER, name TEXT, slug TEXT, installed INTEGER)")
con.executemany(
"INSERT INTO games VALUES (?, ?, ?, ?)",
[(1, "Hades", "hades", 1), (2, "Hollow Knight", "hollow-knight", 1), (3, "Old Game", "old", 0)],
)
con.commit()
con.close()
with mock.patch.object(launchers, "LUTRIS_DB", db), \
mock.patch.object(launchers, "HEROIC_DIR", Path(d) / "nope"):
games = launchers.scan()
names = {g.name for g in games}
self.assertEqual(names, {"Hades", "Hollow Knight"})
self.assertTrue(all(g.launcher == "lutris" for g in games))
def test_missing_db_is_empty(self):
with tempfile.TemporaryDirectory() as d:
with mock.patch.object(launchers, "LUTRIS_DB", Path(d) / "absent.db"), \
mock.patch.object(launchers, "HEROIC_DIR", Path(d) / "nope"):
self.assertEqual(launchers.scan(), [])
class HeroicTests(unittest.TestCase):
def test_epic_and_gog(self):
with tempfile.TemporaryDirectory() as d:
base = Path(d) / "heroic"
(base / "legendaryConfig" / "legendary").mkdir(parents=True)
(base / "gog_store").mkdir(parents=True)
(base / "legendaryConfig" / "legendary" / "installed.json").write_text(
json.dumps({"abc123": {"title": "Control"}}))
(base / "gog_store" / "installed.json").write_text(
json.dumps({"installed": [{"appName": "777", "title": "The Witcher 3"}]}))
with mock.patch.object(launchers, "LUTRIS_DB", Path(d) / "nope.db"), \
mock.patch.object(launchers, "HEROIC_DIR", base):
names = {g.name for g in launchers.scan()}
self.assertEqual(names, {"Control", "The Witcher 3"})
def test_gog_title_falls_back_to_install_path(self):
with tempfile.TemporaryDirectory() as d:
base = Path(d) / "heroic"
(base / "gog_store").mkdir(parents=True)
(base / "gog_store" / "installed.json").write_text(
json.dumps({"installed": [{"appName": "9", "install_path": "/games/Stardew Valley"}]}))
with mock.patch.object(launchers, "LUTRIS_DB", Path(d) / "nope.db"), \
mock.patch.object(launchers, "HEROIC_DIR", base):
names = {g.name for g in launchers.scan()}
self.assertEqual(names, {"Stardew Valley"})
if __name__ == "__main__":
unittest.main()
+58
View File
@@ -0,0 +1,58 @@
"""Tests for the M9 systemd --user trigger-mode service manager."""
import unittest
from unittest import mock
from rigdoctor.core import service
class UnitTextTests(unittest.TestCase):
def test_unit_text_has_required_sections(self):
txt = service.unit_text("RigDoctor recorder", ["record", "run"])
self.assertIn("[Unit]", txt)
self.assertIn("[Service]", txt)
self.assertIn("ExecStart=", txt)
self.assertIn("record run", txt)
self.assertIn("WantedBy=default.target", txt)
class ApplyModeTests(unittest.TestCase):
def test_unknown_mode_rejected(self):
ok, msg = service.apply_mode("turbo")
self.assertFalse(ok)
self.assertIn("Unknown", msg)
def test_no_systemd_saves_mode_but_reports(self):
with mock.patch.object(service, "available", return_value=False), \
mock.patch.object(service.config, "update_config") as update:
ok, msg = service.apply_mode("always-on")
self.assertFalse(ok)
self.assertIn("available", msg.lower())
update.assert_called_once_with(trigger_mode="always-on")
def test_always_on_enables_recorder_disables_watch(self):
calls = []
with mock.patch.object(service, "available", return_value=True), \
mock.patch.object(service, "install_units"), \
mock.patch.object(service, "_enable", side_effect=lambda n: calls.append(("enable", n)) or (0, "")), \
mock.patch.object(service, "_disable", side_effect=lambda n: calls.append(("disable", n)) or (0, "")), \
mock.patch.object(service.config, "update_config"):
ok, _ = service.apply_mode("always-on")
self.assertTrue(ok)
self.assertIn(("enable", service.RECORDER_UNIT), calls)
self.assertIn(("disable", service.WATCH_UNIT), calls)
def test_manual_disables_both(self):
disabled = []
with mock.patch.object(service, "available", return_value=True), \
mock.patch.object(service, "install_units"), \
mock.patch.object(service, "_enable", return_value=(0, "")), \
mock.patch.object(service, "_disable", side_effect=lambda n: disabled.append(n) or (0, "")), \
mock.patch.object(service.config, "update_config"):
ok, _ = service.apply_mode("manual")
self.assertTrue(ok)
self.assertEqual(set(disabled), {service.RECORDER_UNIT, service.WATCH_UNIT})
if __name__ == "__main__":
unittest.main()
+58
View File
@@ -0,0 +1,58 @@
"""Tests for the M2 live-monitor TUI logic (min/max tracking + color bands)."""
import unittest
from rigdoctor import tui
from rigdoctor.core.sample import Reading, Sample
def _temp(v):
return Reading("gpu", "temp", v, "°C", "")
class TrackTests(unittest.TestCase):
def test_tracks_min_and_max(self):
stats: dict = {}
for v in (60.0, 80.0, 70.0, 55.0):
tui.track(stats, Sample(0.0, [_temp(v)]))
self.assertEqual(stats["gpu.temp"], (55.0, 80.0))
def test_ignores_none_values(self):
stats: dict = {}
tui.track(stats, Sample(0.0, [_temp(None)]))
self.assertEqual(stats, {})
def test_keys_separate_by_label(self):
stats: dict = {}
tui.track(stats, Sample(0.0, [
Reading("cpu", "temp", 50.0, "°C", "Core 0"),
Reading("cpu", "temp", 70.0, "°C", "Core 1"),
]))
self.assertEqual(stats["cpu.temp.Core 0"], (50.0, 50.0))
self.assertEqual(stats["cpu.temp.Core 1"], (70.0, 70.0))
class BandTests(unittest.TestCase):
def test_temperature_bands(self):
self.assertEqual(tui.band(_temp(40.0)), "cold")
self.assertEqual(tui.band(_temp(60.0)), "good")
self.assertEqual(tui.band(_temp(80.0)), "warn")
self.assertEqual(tui.band(_temp(90.0)), "crit")
def test_usage_bands(self):
self.assertEqual(tui.band(Reading("gpu", "util", 50.0, "%")), "good")
self.assertEqual(tui.band(Reading("gpu", "util", 88.0, "%")), "warn")
self.assertEqual(tui.band(Reading("memory", "used_pct", 96.0, "%")), "crit")
def test_non_metric_percentage_is_normal(self):
self.assertEqual(tui.band(Reading("gpu", "fan", 100.0, "%")), "normal")
def test_gpu_lost_is_crit(self):
self.assertEqual(tui.band(Reading("gpu", "status", None, "", "query-timeout")), "crit")
def test_missing_value_is_na(self):
self.assertEqual(tui.band(Reading("gpu", "power", None, "W")), "na")
if __name__ == "__main__":
unittest.main()
+69
View File
@@ -0,0 +1,69 @@
"""Tests for the M9/D12 game-launch watcher (RunningAppID parse + transitions)."""
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from rigdoctor.core import watcher
_REGISTRY = """"Registry"
{
\t"HKCU"
\t{
\t\t"Software"
\t\t{
\t\t\t"Valve"
\t\t\t{
\t\t\t\t"Steam"
\t\t\t\t{
\t\t\t\t\t"RunningAppID"\t\t"%s"
\t\t\t\t}
\t\t\t}
\t\t}
\t}
}
"""
class TransitionTests(unittest.TestCase):
def test_transitions(self):
self.assertEqual(watcher.transition(0, 570), "start")
self.assertEqual(watcher.transition(570, 0), "stop")
self.assertIsNone(watcher.transition(570, 570))
self.assertIsNone(watcher.transition(0, 0))
class FindKeyTests(unittest.TestCase):
def test_case_insensitive_nested(self):
data = {"Registry": {"HKCU": {"steam": {"runningappid": "42"}}}}
self.assertEqual(watcher._find_key(data, "RunningAppID"), "42")
def test_missing(self):
self.assertIsNone(watcher._find_key({"a": {"b": "c"}}, "RunningAppID"))
class RunningAppIdTests(unittest.TestCase):
def _with_registry(self, content):
d = tempfile.mkdtemp()
path = Path(d) / "registry.vdf"
path.write_text(content)
return path
def test_reads_running_appid(self):
path = self._with_registry(_REGISTRY % "570")
with mock.patch.object(watcher, "_registry_path", return_value=path):
self.assertEqual(watcher.running_appid(), 570)
def test_zero_when_idle(self):
path = self._with_registry(_REGISTRY % "0")
with mock.patch.object(watcher, "_registry_path", return_value=path):
self.assertEqual(watcher.running_appid(), 0)
def test_zero_when_no_registry(self):
with mock.patch.object(watcher, "_registry_path", return_value=None):
self.assertEqual(watcher.running_appid(), 0)
if __name__ == "__main__":
unittest.main()