Files
rigdoctor/src/rigdoctor/cli.py
T
jessey edc2166011 feat(health): GPU stress monitor + per-drive SMART health/wear
Two diagnostics for the load-correlated GPU crashes and for storage wear.

GPU stress (`rigdoctor stress` + a System Health "Stress test…" dialog): drive a GPU
load and sample sensors at high rate, then report per-metric min/avg/peak, time spent
above each temp threshold, power vs limit, throttling (decoded from the NVML
clocks-event bitmask), and any GPU fault (Xid / VA-space freeze / query-timeout hang)
in the window. Load source: explicit --command, an auto-detected loader, or
monitor-only (you launch the game). Analysis is a pure, unit-tested function.

Drive health (core/drives.py): parse full `smartctl --json` per drive into prioritized
findings — SMART verdict, derived life-left % (NVMe percentage_used or SATA
wear-leveling), power-on hours, TBW, temperature, and failure predictors
(reallocated/pending/offline sectors, NVMe media errors, low spare). Replaces the old
pass/fail-only check_smart; runs through the same elevated path (collect-priv / sudo),
degrading to "needs root" notes unprivileged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 16:59:06 +02:00

851 lines
33 KiB
Python

"""RigDoctor command-line interface."""
from __future__ import annotations
import argparse
import json
import os
import signal
import sys
import time
from pathlib import Path
from . import __version__, config
from .config import load_config
from .core import reccontrol
from .core.sampler import Sampler
from .core.sources import available_sources
from .render import format_headline, render_snapshot, render_summary
def _sampler() -> Sampler:
return Sampler(available_sources())
def cmd_sources(args) -> int:
srcs = available_sources()
if not srcs:
print("No sensor sources detected.")
return 1
print("Detected sources:")
for s in srcs:
print(f" - {s.name} ({type(s).__name__})")
return 0
def cmd_snapshot(args) -> int:
sample = _sampler().sample()
if args.json:
payload = {"ts": sample.ts, "readings": sample.to_rows()}
print(json.dumps(payload, indent=2, ensure_ascii=False))
else:
print(render_snapshot(sample))
return 0
def cmd_monitor(args) -> int:
from .tui import run
interval = args.interval or load_config()["interval"]
return run(interval, plain=getattr(args, "plain", False))
def cmd_gui(args) -> int:
try:
from .gui.app import main as gui_main
except ImportError as exc:
print("The GUI needs PySide6, which isn't installed.")
print(" Ubuntu/Debian: sudo apt install python3-pyside6.qtwidgets "
"python3-pyside6.qtgui python3-pyside6.qtwebsockets python3-pyside6.qtsvg python3-pyte")
print(" pip: pip install 'rigdoctor[gui]'")
print(f" ({exc})")
return 2
return gui_main([sys.argv[0]])
# --- M3 crash-capture logger ---------------------------------------------------
def cmd_record_run(args) -> int:
cfg = load_config()
interval = args.interval or cfg["interval"]
log_path = Path(args.out) if args.out else config.LOG_FILE
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
config.PID_FILE.write_text(str(os.getpid()))
from .core.recorder import Recorder
recorder = Recorder(
interval=interval,
log_path=log_path,
max_bytes=cfg["log_max_bytes"],
backups=cfg["log_backups"],
status_path=config.STATUS_FILE,
game=getattr(args, "game", None),
)
def _handle(_sig, _frame):
recorder.stop()
signal.signal(signal.SIGTERM, _handle)
signal.signal(signal.SIGINT, _handle)
print(f"Recording to {log_path} every {interval:g}s — stop with Ctrl-C or `rigdoctor record stop`.")
try:
recorder.run()
finally:
try:
config.PID_FILE.unlink()
except OSError:
pass
print(f"Stopped after {recorder.samples} samples.")
return 0
def cmd_record_start(args) -> int:
if reccontrol.running_pid():
print(f"Recorder already running (pid {reccontrol.running_pid()}).")
return 0
pid = reccontrol.start_background(args.interval, args.out)
time.sleep(1.0) # let it come up
if pid and reccontrol.pid_alive(pid):
print(f"Recording started in the background (pid {pid}).")
print(f" log: {args.out or config.LOG_FILE}")
print(" status: rigdoctor record status · stop: rigdoctor record stop")
return 0
print(f"Recorder failed to start; see {config.SPAWN_LOG}")
return 1
def cmd_record_stop(args) -> int:
pid = reccontrol.running_pid()
if not pid:
print("Recorder is not running.")
return 0
if not reccontrol.stop_background():
print(f"Could not stop recorder (pid {pid}).")
return 1
for _ in range(50):
if not reccontrol.pid_alive(pid):
break
time.sleep(0.1)
print(f"Recorder stopped (pid {pid}).")
return 0
def cmd_record_status(args) -> int:
pid = reccontrol.running_pid()
status = reccontrol.read_status()
print(f"● recording (pid {pid})" if pid else "○ not recording")
if status:
print(f" log: {status.get('log')}")
print(f" samples: {status.get('samples')}")
if status.get("started"):
print(f" started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(status['started']))}")
if status.get("updated"):
print(f" updated: {time.strftime('%H:%M:%S', time.localtime(status['updated']))}")
if status.get("gpu_lost"):
print(" ⚠ a GPU-lost event was recorded this session")
if status.get("latest"):
print(f" latest: {format_headline(status['latest'])}")
return 0
def cmd_record_report(args) -> int:
from .core.crashlog import summarize
log_path = Path(args.log) if args.log else config.LOG_FILE
summary = summarize(log_path, last_n=args.last)
print(render_summary(summary, log_path=log_path))
return 0
def cmd_install(args) -> int:
from .core import installer, sysenv
print(f"Distro: {sysenv.distro_name()}")
pm = sysenv.package_manager()
print(f"Package manager: {pm or 'none (only apt is supported)'}")
print(f"GPU: {', '.join(sysenv.gpu_vendors()) or 'unknown'}\n")
status = installer.component_status()
print("Optional components:")
for component, present in status:
mark = "" if present else ""
print(f" [{mark}] {component.name:<22}{component.enables}")
if not present:
print(f" apt: {' '.join(component.apt)}")
missing = [c for c, present in status if not present]
if not missing:
print("\nAll optional components are installed. ✔")
return 0
packages = installer.missing_packages(missing)
print(f"\nMissing packages: {' '.join(packages)}")
if args.check:
return 0
if pm != "apt":
print(f"Automatic install needs apt. Install manually:\n sudo apt install {' '.join(packages)}")
return 1
if not args.yes:
try:
reply = input(f"\nInstall {len(packages)} package(s) now? [y/N] ").strip().lower()
except EOFError:
reply = "n"
if reply not in ("y", "yes"):
print("Aborted.")
return 1
print("Installing (you may be prompted for your password)…")
rc, out = installer.install_packages(packages)
print(out[-2000:])
if rc == 0:
still = [c.name for c, present in installer.component_status() if not present]
print("\nStill missing: " + (", ".join(still) if still else "none ✔"))
else:
print(f"\nInstall failed (exit {rc}).")
return rc
def cmd_login(args) -> int:
from getpass import getpass
from .core import updates
token = args.token
if not token:
print(f"Create a token (scope read:repository) at: {updates.TOKEN_PAGE}")
try:
token = getpass("Paste token: ").strip()
except (EOFError, KeyboardInterrupt):
token = ""
if not token:
print("No token provided.")
return 1
config.save_token(token)
state, tag, _notes = updates.update_state()
if state == updates.AUTH:
print("Token saved, but the server rejected it (check scope/permissions).")
return 1
if state in (updates.UP_TO_DATE, updates.AVAILABLE):
print(f"Token saved and verified. Latest release: {tag}.")
return 0
print("Token saved (couldn't reach the server to verify right now).")
return 0
def cmd_logout(args) -> int:
config.clear_token()
print("Update token removed.")
return 0
def cmd_update(args) -> int:
from .core import updates
state, tag, notes = updates.update_state()
if state == updates.NO_TOKEN:
print("No update token. Run `rigdoctor login` after creating one at:")
print(f" {updates.TOKEN_PAGE}")
return 1
if state == updates.AUTH:
print("The update server rejected your token (check scope/permissions).")
return 1
if state == updates.NETWORK:
print("Couldn't reach the update server.")
return 1
if state == updates.UP_TO_DATE:
print(f"Up to date (v{__version__}).")
return 0
# AVAILABLE
print(f"Update available: {tag} (current v{__version__}).")
if notes:
print("\nWhat's new:\n" + "\n".join(" " + ln for ln in notes.splitlines()) + "\n")
if args.check:
return 0
kind = updates.install_kind()
if kind != "pip": # apt/source installs aren't pip-updatable — show the right command
print(updates.update_hint(kind))
return 0
print(f"Installing {tag}")
rc, out = updates.apply_update(tag)
print(out[-2000:])
if rc == 0:
print(f"\nUpdated to {tag}. Restart RigDoctor to use the new version.")
return 0
print(f"\nUpdate failed (exit {rc}).")
return rc
def cmd_uninstall(args) -> int:
from .core import uninstall as uninstaller
scope = "everything (app + settings, token, and logs)" if args.purge else "the app (settings/logs kept)"
if not args.yes:
try:
reply = input(f"Uninstall RigDoctor — remove {scope}? [y/N] ").strip().lower()
except EOFError:
reply = "n"
if reply not in ("y", "yes"):
print("Aborted.")
return 1
uninstaller.uninstall(purge=args.purge)
print("Uninstalling… RigDoctor will be removed momentarily.")
return 0
def cmd_collect_priv(args) -> int:
"""Internal: emit root-only data (SMART + dmidecode) as JSON, run via pkexec at launch."""
from dataclasses import asdict
from .core import drives
from .core.inventory import _dmidecode
data = {"drives": [asdict(d) for d in drives.collect()], "dmidecode": _dmidecode()}
print(json.dumps(data))
return 0
def cmd_inventory(args) -> int:
from .core import inventory
sections = inventory.collect()
if args.json:
text = inventory.render_json(sections)
elif args.markdown:
text = inventory.render_markdown(sections)
else:
text = inventory.render_text(sections)
if args.output:
Path(args.output).write_text(text)
print(f"Wrote {args.output}")
else:
print(text)
return 0
def cmd_report(args) -> int:
from dataclasses import asdict
from .core.health import run_health_checks
from .render import render_health
findings = run_health_checks()
if args.json:
print(json.dumps([asdict(f) for f in findings], indent=2, ensure_ascii=False))
else:
print(render_health(findings))
return 0
def _resolve_game(args) -> str | None:
"""Game name from --game, or looked up from --appid via the Steam scan."""
if getattr(args, "game", None):
return args.game
if getattr(args, "appid", None):
from .core import steam
for g in steam.scan_games(steam.selected_library_paths()):
if g.appid == str(args.appid):
return g.name
return None
return None
def cmd_diagnose(args) -> int:
from .core import diagnostic, reccontrol, steam
sub = args.diagnose_cmd or "status"
if sub == "start":
if reccontrol.running_pid():
print("A capture is already running — finish it with: rigdoctor diagnose finish")
return 1
game = _resolve_game(args)
if game is None and (args.game or args.appid):
print("Couldn't match that game in your selected Steam libraries.")
return 1
if game is None:
games = steam.cached_games() or steam.scan_games(steam.selected_library_paths())
if games:
print("Pick a game to focus on, then re-run with --game:")
for g in games:
print(f" --game {g.name!r}")
else:
print("No games detected. Select a library: rigdoctor games libraries --all")
return 1
pid = diagnostic.start(game=game, interval=args.interval)
time.sleep(1.0)
if pid and reccontrol.pid_alive(pid):
print(f"Diagnostic capture started for {game!r} (pid {pid}).")
print(" Play your game. When you're done (or after a crash + reboot):")
print(" rigdoctor diagnose finish")
return 0
print(f"Capture failed to start; see {config.SPAWN_LOG}")
return 1
if sub == "status":
status = diagnostic.active()
if not status:
print("No diagnostic capture is running.")
return 0
game = status.get("game") or ""
print(f"Capturing for {game!r}: {status.get('samples', 0)} samples"
+ (" · GPU-lost seen" if status.get("gpu_lost") else ""))
return 0
# finish
if not reccontrol.running_pid() and not config.DIAG_LOG.exists():
print("No diagnostic to analyze. Start one with: rigdoctor diagnose start --game <name>")
return 1
print("Stopping capture and analyzing…\n")
result = diagnostic.finish(last_n=args.last)
from .render import render_health, render_summary
if result.game:
print(f"Diagnostic — {result.game}\n")
print(render_summary(result.summary, log_path=config.DIAG_LOG))
print("\n" + render_health(result.findings, title="Findings"))
return 0
def cmd_wrap(args) -> int:
from .core import wrap
return wrap.run(args.command)
def cmd_watch(args) -> int:
from .core import watcher
interval = args.interval or load_config().get("interval", 1.0)
print("Watching for a running Steam game (Ctrl-C to stop)…")
return watcher.watch(interval=max(2.0, interval))
def cmd_service(args) -> int:
from .core import service
sub = args.service_cmd or "status"
if sub == "mode":
ok, msg = service.apply_mode(args.mode)
print(f"Trigger mode set to '{args.mode}'.")
if not ok and msg:
print(f" note: {msg}")
return 0 if ok or not service.available() else 1
info = service.status()
print(f"Trigger mode: {info['mode']}")
print(f"systemd --user: {'available' if info['available'] else 'not available'}")
if info["available"]:
print(f" recorder service: {'active' if info.get('recorder_active') else 'inactive'}")
print(f" watcher service: {'active' if info.get('watch_active') else 'inactive'}")
return 0
def cmd_ai(args) -> int:
"""AI assistant (M14) — opt-in; only contacts a provider on `test`/`explain`."""
from .core import ai
sub = args.ai_cmd or "status"
if sub == "status":
print(f"Provider: {ai.provider() or 'not configured'}")
if ai.provider():
print(f" {ai.provider_label()}")
print(f" ready: {'yes' if ai.is_configured() else 'no'}")
else:
print(" Configure it in the GUI: Settings → AI assistant.")
return 0
if not ai.is_configured():
print("AI is not configured. Set it up in the GUI (Settings → AI assistant).")
return 1
if sub == "test":
ok, msg = ai.explain("Connectivity test — reply exactly: RigDoctor AI is working.")
print(msg)
return 0 if ok else 1
if sub == "dump":
# Parse a Windows .dmp minidump (e.g. from a Proton game crash) and explain it.
from .core import minidump
report = minidump.parse(args.file)
if not report.ok:
print(f"Couldn't analyze the dump — {report.error}")
return 1
print(minidump.to_text(report))
print(f"\nAsking {ai.provider_label()} to explain {os.path.basename(args.file)}\n")
ok, msg = ai.explain(minidump.to_ai_text(report))
print(msg)
return 0 if ok else 1
# explain: gather the current health findings and ask the provider to explain them.
from .core import health
findings = health.run_health_checks()
text = ai.format_findings(findings)
print(f"Asking {ai.provider_label()} to explain the current health findings…\n")
ok, msg = ai.explain(text)
print(msg)
return 0 if ok else 1
def cmd_bundle(args) -> int:
"""Zip the latest stored diagnostic into a report bundle (M15) — needs logging enabled."""
from .core import diagstore
if not diagstore.enabled():
print("Logging is off. Enable it (Settings → Logging, or set logging_enabled) so "
"diagnostics are stored and can be reported.")
return 1
directory = diagstore.latest_dir()
if directory is None:
print("No stored diagnostics yet — run a diagnostic first.")
return 1
out = diagstore.make_report(directory)
print(f"Report written: {out}")
return 0
def cmd_gameenv(args) -> int:
from dataclasses import asdict
from .core.gameenv import run_gameenv_checks
from .render import render_health
findings = run_gameenv_checks()
if args.json:
print(json.dumps([asdict(f) for f in findings], indent=2, ensure_ascii=False))
else:
print(render_health(findings, title="Gaming environment"))
return 0
def cmd_games(args) -> int:
from dataclasses import asdict
from .core import customgames, launchers, steam
selected = steam.selected_library_paths()
result = steam.rescan() if selected else None
steam_games = result.games if result else []
extra = launchers.scan() # non-Steam (Lutris/Heroic)
all_games = list(steam_games) + list(extra) + customgames.scan() # + user-added (SPT etc.)
if args.json:
print(json.dumps({
"scanned_at": result.scanned_at if result else None,
"new_appids": result.new_appids if result else [],
"games": [asdict(g) for g in all_games],
}, indent=2, ensure_ascii=False))
return 0
if not all_games:
if not selected:
print("No Steam libraries selected and no non-Steam games found.")
print(" Pick a Steam library: rigdoctor games libraries --enable <path> (or --all)")
return 1
print("No games found.")
return 0
new = set(result.new_appids) if result else set()
print(f"{len(all_games)} game(s):\n")
for g in all_games:
tag = " NEW" if g.appid in new else ""
src = "" if g.launcher == "steam" else f" [{g.launcher}]"
size = steam.human_size(g.size_bytes) if g.size_bytes else ""
print(f" {g.name:<46}{src:<10} {size:>9}{tag}")
if not selected:
print("\n(no Steam libraries selected — `rigdoctor games libraries --all` to add them)")
return 0
def cmd_games_libraries(args) -> int:
from .core import steam
discovered = steam.discover_libraries()
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
# --all / --enable / --disable adjust the selection, then we list the result.
if args.all or args.enable or args.disable:
if args.all:
selected = {lib.path for lib in discovered}
for raw in args.enable or []:
selected.add(os.path.realpath(os.path.expanduser(raw)))
for raw in args.disable or []:
selected.discard(os.path.realpath(os.path.expanduser(raw)))
config.update_config(steam_libraries=sorted(selected))
if not discovered:
print("No Steam libraries detected (is Steam installed?).")
return 1
if args.json:
print(json.dumps([
{"path": lib.path, "label": lib.label, "selected": lib.path in selected,
"games": len(steam.scan_library(lib.path))}
for lib in discovered
], indent=2, ensure_ascii=False))
return 0
print("Steam libraries (checked = scanned for games):\n")
for lib in discovered:
mark = "x" if lib.path in selected else " "
count = len(steam.scan_library(lib.path))
label = f" [{lib.label}]" if lib.label else ""
print(f" [{mark}] {lib.path}{label} ({count} games)")
return 0
def cmd_games_add(args) -> int:
from .core import customgames
if customgames.add(args.name, command=args.command, logdir=args.logdir):
print(f"Added '{args.name}' to your games (custom). It'll show in `rigdoctor games` "
"and the diagnostic game picker.")
entry = customgames.get(args.name) or {}
if entry.get("command"):
print(f" launch: {entry['command']} (run with: rigdoctor games play \"{args.name}\")")
if entry.get("logdir"):
print(f" logs: {entry['logdir']} (included in crash diagnostics)")
return 0
print(f"'{args.name}' is blank or already in your custom games.")
return 1
def cmd_games_play(args) -> int:
from .core import customgames, wrap
command = customgames.command(args.name)
if command is None:
if customgames.get(args.name) is None:
print(f"'{args.name}' isn't in your custom games. Add it: "
f"rigdoctor games add \"{args.name}\" --command <launch script>")
else:
print(f"'{args.name}' has no launch command. Set one: "
f"rigdoctor games remove \"{args.name}\" && rigdoctor games add \"{args.name}\" "
"--command <launch script>")
return 1
print(f"Launching '{args.name}' with crash-capture… (capture stops cleanly on exit; "
"a hard freeze is flagged next time you open RigDoctor)")
return wrap.run(command, game=args.name)
def cmd_games_remove(args) -> int:
from .core import customgames
if customgames.remove(args.name):
print(f"Removed '{args.name}' from your custom games.")
return 0
print(f"'{args.name}' isn't in your custom games. Current: {', '.join(customgames.names()) or '(none)'}")
return 1
def cmd_stress(args) -> int:
import shlex as _shlex
from .core import stress
from .render import format_raw, render_stress
command = _shlex.split(args.command) if args.command else None
if not args.json:
loaders = stress.available_loaders()
if command:
print(f"Stressing with: {' '.join(command)}")
elif loaders:
print(f"Stressing with auto-detected loader: {loaders[0]}")
else:
print("No GPU load tool found and no --command given — MONITOR-ONLY mode.")
print(f" Launch the game/app now; I'll closely track temps for up to {int(args.duration)}s.")
print(f" Sampling every {args.interval:g}s. Press Ctrl-C to stop early.\n")
def _tick(sample, elapsed) -> None:
by = {r.key: r for r in sample.readings}
bits = [f"{elapsed:5.0f}s"]
for key, tag in (("gpu.temp", "core"), ("gpu.power", "pwr"),
("gpu.util", "util"), ("gpu.clock.core", "clk")):
r = by.get(key)
if r is not None and r.value is not None:
bits.append(f"{tag} {format_raw(r.value, r.unit)}")
print(" " + " ".join(bits) + " ", end="\r", flush=True)
result = stress.run(duration=args.duration, interval=args.interval, command=command,
on_tick=None if args.json else _tick)
if not args.json:
print() # end the live line
if args.json:
from dataclasses import asdict
print(json.dumps(asdict(result), indent=2, ensure_ascii=False))
else:
print(render_stress(result))
return 0 if result.severity in ("ok", "info") else 1
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="rigdoctor",
description="Hardware monitoring & crash diagnostics for Linux gamers.",
)
p.add_argument("-V", "--version", action="version", version=f"rigdoctor {__version__}")
sub = p.add_subparsers(dest="command", required=True)
sp = sub.add_parser("snapshot", help="print a one-shot reading of all sensors")
sp.add_argument("--json", action="store_true", help="output JSON instead of text")
sp.set_defaults(func=cmd_snapshot)
mp = sub.add_parser("monitor", help="live monitor TUI (current/min/max, M2)")
mp.add_argument("-n", "--interval", type=float, default=None, help="refresh interval (s)")
mp.add_argument("--plain", action="store_true", help="plain redraw instead of the curses UI")
mp.set_defaults(func=cmd_monitor)
st = sub.add_parser("stress", help="GPU stress + close thermal monitoring (repro load crashes)")
st.add_argument("-d", "--duration", type=float, default=120.0, help="run for this many seconds (default 120)")
st.add_argument("-n", "--interval", type=float, default=0.5, help="sampling interval in seconds (default 0.5)")
st.add_argument("--command", default=None,
help="load generator to run (e.g. a game or 'gpu-burn 60'); omit to auto-detect or monitor-only")
st.add_argument("--json", action="store_true", help="output JSON")
st.set_defaults(func=cmd_stress)
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
inst = sub.add_parser("install", help="set up optional system dependencies (M9)")
inst.add_argument("--check", action="store_true", help="report status only; install nothing")
inst.add_argument("-y", "--yes", action="store_true", help="install without confirmation")
inst.set_defaults(func=cmd_install)
login = sub.add_parser("login", help="save a Gitea token for updates (M13)")
login.add_argument("--token", default=None, help="token (prompted if omitted)")
login.set_defaults(func=cmd_login)
sub.add_parser("logout", help="remove the saved update token").set_defaults(func=cmd_logout)
upd = sub.add_parser("update", help="check for / apply a newer version (M13)")
upd.add_argument("--check", action="store_true", help="only report, don't apply")
upd.set_defaults(func=cmd_update)
unin = sub.add_parser("uninstall", help="remove the user-local install")
unin.add_argument("--purge", action="store_true", help="also remove settings, token, and logs")
unin.add_argument("-y", "--yes", action="store_true", help="don't ask for confirmation")
unin.set_defaults(func=cmd_uninstall)
rec = sub.add_parser("record", help="crash-capture logger (M3)")
rec_sub = rec.add_subparsers(dest="record_cmd", required=True)
run_p = rec_sub.add_parser("run", help="run the capture loop in the foreground (systemd-friendly)")
run_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
run_p.add_argument("-o", "--out", default=None, help="log file path")
run_p.add_argument("--game", default=None, help="tag the capture with a game name (M6/diagnose)")
run_p.set_defaults(func=cmd_record_run)
start_p = rec_sub.add_parser("start", help="start recording in the background")
start_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
start_p.add_argument("-o", "--out", default=None, help="log file path")
start_p.set_defaults(func=cmd_record_start)
rec_sub.add_parser("stop", help="stop background recording").set_defaults(func=cmd_record_stop)
rec_sub.add_parser("status", help="show recorder status").set_defaults(func=cmd_record_status)
report_p = rec_sub.add_parser("report", help="summarize the captured log (post-crash)")
report_p.add_argument("--last", type=int, default=10, help="recent samples to show")
report_p.add_argument("--log", default=None, help="path to a capture log")
report_p.set_defaults(func=cmd_record_report)
rep = sub.add_parser("report", help="health report (M4): scan logs/SMART/driver for issues")
rep.add_argument("--json", action="store_true", help="output JSON instead of text")
rep.set_defaults(func=cmd_report)
cp = sub.add_parser("collect-priv", help=argparse.SUPPRESS) # internal: run via pkexec
cp.set_defaults(func=cmd_collect_priv)
inv = sub.add_parser("inventory", help="system inventory (M5): export hardware/OS details")
inv.add_argument("--json", action="store_true", help="output JSON")
inv.add_argument("--markdown", action="store_true", help="output Markdown (for forum/bug reports)")
inv.add_argument("-o", "--output", default=None, help="write to a file instead of stdout")
inv.set_defaults(func=cmd_inventory)
games_p = sub.add_parser("games", help="Steam game & library detection (M6)")
games_p.add_argument("--json", action="store_true", help="output JSON")
games_p.set_defaults(func=cmd_games)
games_sub = games_p.add_subparsers(dest="games_cmd")
lib_p = games_sub.add_parser("libraries", help="list/select Steam libraries to scan")
lib_p.add_argument("--enable", action="append", metavar="PATH", help="scan this library (repeatable)")
lib_p.add_argument("--disable", action="append", metavar="PATH", help="stop scanning this library (repeatable)")
lib_p.add_argument("--all", action="store_true", help="scan all detected libraries")
lib_p.add_argument("--json", action="store_true", help="output JSON")
lib_p.set_defaults(func=cmd_games_libraries)
add_p = games_sub.add_parser("add", help="add a game no launcher reports (e.g. SPT)")
add_p.add_argument("name", help="game name, e.g. \"SPT\"")
add_p.add_argument("--command", default=None,
help="launch command/script (e.g. the path to tarkov.sh) — enables `games play`")
add_p.add_argument("--logdir", default=None,
help="the game's own log directory (auto-detected as <command dir>/logs if present)")
add_p.set_defaults(func=cmd_games_add)
play_p = games_sub.add_parser("play", help="launch a custom game with crash-capture (e.g. SPT)")
play_p.add_argument("name", help="game name to launch")
play_p.set_defaults(func=cmd_games_play)
rm_p = games_sub.add_parser("remove", help="remove a previously added custom game")
rm_p.add_argument("name", help="game name to remove")
rm_p.set_defaults(func=cmd_games_remove)
env_p = sub.add_parser("gameenv", help="gaming environment checks (M6): flag stability/perf settings")
env_p.add_argument("--json", action="store_true", help="output JSON instead of text")
env_p.set_defaults(func=cmd_gameenv)
diag_p = sub.add_parser("diagnose", help="guided diagnostic: capture while gaming, then analyze")
diag_sub = diag_p.add_subparsers(dest="diagnose_cmd")
diag_start = diag_sub.add_parser("start", help="start a focused capture for a game")
diag_start.add_argument("--game", default=None, help="game name to focus on")
diag_start.add_argument("--appid", default=None, help="Steam appid to focus on (resolved to a name)")
diag_start.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
diag_start.set_defaults(func=cmd_diagnose)
diag_sub.add_parser("status", help="show the in-progress diagnostic").set_defaults(func=cmd_diagnose)
diag_finish = diag_sub.add_parser("finish", help="stop the capture and analyze it")
diag_finish.add_argument("--last", type=int, default=10, help="recent samples to show")
diag_finish.set_defaults(func=cmd_diagnose)
diag_p.set_defaults(func=cmd_diagnose, diagnose_cmd=None, last=10)
wrap_p = sub.add_parser(
"wrap", help="run a game with automatic crash-capture (Steam launch option, D12)")
wrap_p.add_argument("command", nargs=argparse.REMAINDER,
help="the game command — use `rigdoctor wrap %%command%%` in Steam")
wrap_p.set_defaults(func=cmd_wrap)
watch_p = sub.add_parser("watch", help="auto-capture while a Steam game runs (game-launch trigger)")
watch_p.add_argument("-n", "--interval", type=float, default=None, help="poll interval (s)")
watch_p.set_defaults(func=cmd_watch)
svc_p = sub.add_parser("service", help="crash-logger trigger mode + systemd --user service (M9/D6)")
svc_sub = svc_p.add_subparsers(dest="service_cmd")
svc_sub.add_parser("status", help="show the trigger mode and service state").set_defaults(func=cmd_service)
mode_p = svc_sub.add_parser("mode", help="set the trigger mode")
mode_p.add_argument("mode", choices=("manual", "always-on", "game-launch"))
mode_p.set_defaults(func=cmd_service)
svc_p.set_defaults(func=cmd_service, service_cmd=None)
ai_p = sub.add_parser("ai", help="AI assistant (M14): explain diagnostics — opt-in, never automatic")
ai_sub = ai_p.add_subparsers(dest="ai_cmd")
ai_sub.add_parser("status", help="show the configured provider (contacts nothing)").set_defaults(func=cmd_ai)
ai_sub.add_parser("test", help="send a tiny probe to verify connectivity").set_defaults(func=cmd_ai)
ai_sub.add_parser("explain", help="explain the current health findings with AI").set_defaults(func=cmd_ai)
dump_p = ai_sub.add_parser("dump", help="parse a Windows .dmp crash dump and explain it with AI")
dump_p.add_argument("file", help="path to the .dmp minidump (e.g. from a Proton game crash)")
dump_p.set_defaults(func=cmd_ai)
ai_p.set_defaults(func=cmd_ai, ai_cmd=None)
bundle_p = sub.add_parser("bundle", help="zip the latest stored diagnostic into a report bundle (M15)")
bundle_p.set_defaults(func=cmd_bundle)
return p
def main(argv: list[str] | None = None) -> int:
from .core import applog
applog.setup() # opt-in app logging (M15); no-op unless logging_enabled
args = build_parser().parse_args(argv)
return args.func(args)
if __name__ == "__main__":
sys.exit(main())