Compare commits

..

18 Commits

Author SHA1 Message Date
jessey 5cd51beadf Merge pull request 'feat(gui): Run Diagnostic flow on the Games page — 0.12.0' (#7) from feat/m6-steam-detection into main
release / release (push) Successful in 14s
Reviewed-on: #7
2026-05-22 06:32:30 +00:00
jessey 934b489fec feat(gui): Run Diagnostic flow on the Games page — 0.12.0
Brings the guided diagnostic (0.11.0 core/CLI) into the GUI:
- Each game row gets a "Run Diagnostic" button → starts a focused, game-tagged
  capture and shows a recording banner (live sample count + GPU-lost indicator)
  with Finish & analyze / Discard.
- Finishing runs core.diagnostic.finish() off the UI thread and opens a results
  dialog (gui/diagnostic_dialog.py): window-scoped capture summary + findings
  cards (reusing render_summary + finding_card).
- Banner restores on showEvent if a capture is still running (navigate away/back).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:32:04 +02:00
jessey 7a283dc338 Merge pull request 'feat: guided diagnostic session (CLI) — pick a game, capture, analyze — 0.11.0' (#6) from feat/m6-steam-detection into main
release / release (push) Successful in 15s
Reviewed-on: #6
2026-05-22 06:28:21 +00:00
jessey 5682878f22 feat: guided diagnostic session (CLI) — pick a game, capture, analyze — 0.11.0
The seed use case end to end, orchestrating M3 + M4 (ARCHITECTURE §7.1).

- core/diagnostic.py: start(game) runs a focused, game-tagged capture into a
  dedicated diagnostic log (window-scoped report, separate from the always-on
  crash log); finish() stops it and combines the capture summary (M3) with the
  health findings (M4). Game recorded as a log event so it survives crash+reboot.
- CLI: rigdoctor diagnose start --game/--appid | status | finish.
- recorder/record run gained an optional --game tag; reccontrol passes it through.
- Tests for game recovery + the finish() combination.

GUI/tray "Run Diagnostic" button and auto start/stop (D12 wrapper) come next.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:27:53 +02:00
jessey 5a584c08d5 Merge pull request 'fix(gui): readable Environment dropdowns and action buttons — 0.10.1' (#5) from feat/m6-steam-detection into main
release / release (push) Successful in 14s
Reviewed-on: #5
2026-05-22 06:22:40 +00:00
jessey 8b1083a29b fix(gui): show the real reason an Environment Apply/Install failed — 0.10.2
Thread the command output through to the status line and classify it: cancelled
at the password prompt vs. the system rejecting the change (e.g. a BIOS/kernel-
locked PCIe ASPM policy), instead of a vague "cancelled, or needs privileges".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:21:01 +02:00
jessey 25b7a58e3c fix(gui): readable Environment dropdowns and action buttons — 0.10.1
- Style the QComboBox popup (QAbstractItemView) — it's a separate widget the
  theme didn't cover, so the drop-down list rendered light-on-light.
- Install/Apply finding buttons used PrimaryButton (accent fill + dark text),
  whose fill didn't paint reliably inside the finding cards, leaving dim
  dark-on-dark text. New outlined ActionButton style: bright accent text on the
  dark card, fills accent on hover, with a min-height so the row can't crush it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:16:31 +02:00
jessey 1ec8675fa0 Merge pull request 'feat(m6): one-click install + apply controls on Environment page — 0.10.0' (#4) from feat/m6-steam-detection into main
release / release (push) Successful in 14s
Reviewed-on: #4
2026-05-22 06:05:23 +00:00
jessey 9c30c9824e feat(m6): one-click install + apply controls on Environment page — 0.10.0
Make the environment report actionable, not just advisory.

Install (reuses M9 installer):
- Add GameMode, MangoHud, cpupower to the component catalog (so they also show
  on the Setup page); catalog.by_id() lookup.
- "tool not installed" findings (GameMode/MangoHud) get an Install button.

Apply runtime-reversible tunables (D22, realizing the D9 consent-gated milestone):
- core/fixes.py: dropdown of live options + Apply for CPU governor, NVIDIA
  persistence, PCIe ASPM policy, vm.swappiness, THP. One pkexec command each,
  no reboot, reverts on reboot; chosen value validated against live options;
  writes go to sysfs/procfs/nvidia-smi, never GRUB. GRUB/mitigations stay
  suggestion-only.
- Finding gained optional action (install) + fix (apply) ids; shared
  finding_card renders the matching control; Environment page wires both and
  re-checks after a change.

Tests for fixes (parse, command builders, value validation, gameenv wiring).
Docs: D22 added (amends D9); SPEC/MODULES/ROADMAP updated. 0.9.0 -> 0.10.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:05:03 +02:00
jessey 596b3ec8c4 Merge pull request 'feat: gaming environment checks engine (M6) + notification icon — 0.9.0' (#3) from feat/m6-steam-detection into main
release / release (push) Successful in 14s
Reviewed-on: #3
2026-05-22 05:53:55 +00:00
jessey 392ea76347 Merge branch 'main' into feat/m6-steam-detection 2026-05-22 05:53:49 +00:00
jessey 29f4a45df8 feat: gaming environment checks engine (M6) + notification icon — 0.9.0
The evaluate-and-suggest half of M6: a read-only findings report (D9) over
system settings that affect gaming stability/performance, each with the exact
fix command.

- core/gameenv.py: PCIe ASPM, NVIDIA persistence mode, CPU governor (the three
  seed-case contributors to GPU bus-drop / Xid 79), GameMode, MangoHud,
  vm.swappiness, shader disk cache, THP, CPU mitigations, Proton versions.
  Pure evaluate_* helpers split from IO for testing; reuses the M4 Finding model.
- steam.proton_versions(): surfaces installed Proton builds for the report.
- CLI: rigdoctor gameenv (text / --json); render_health() gained a title arg.
- GUI: new Environment page; extracted a shared finding_card widget and switched
  the Health page to it.
- Tests for the pure evaluators + aggregate.

Also fix: desktop notifications now use the RigDoctor icon (installed theme copy
-> bundled asset -> stock fallback) instead of a generic stock icon, matching
the app/dock icon.

Docs (MODULES/ROADMAP) updated; version 0.8.0 -> 0.9.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 07:53:06 +02:00
jessey d7f07dd7c0 Merge pull request 'feat: Steam game & library detection (M6) — 0.8.0' (#2) from feat/m6-steam-detection into main
release / release (push) Successful in 15s
Reviewed-on: #2
2026-05-22 05:44:42 +00:00
jessey 0642eb4712 feat: Steam game & library detection (M6) — 0.8.0
The first slice of M6 (gaming-environment checks): detect a user's Steam
libraries and the games installed in each — also the D12 "pick a game"
foundation.

- core/steam.py: multi-install/library discovery (libraryfolders.vdf, symlink
  dedupe, native/Flatpak/Snap), appmanifest_*.acf scan with runtime/Proton/
  redist filtering, scan cache + new-game diff. Stdlib only. VDF keys read
  case-insensitively (e.g. lastupdated vs SizeOnDisk).
- Libraries are opt-in (config steam_libraries); the flat TOML writer now
  emits list/array values.
- GUI Games page: library checkboxes with per-library counts, game list,
  background rescan on every launch, NEW badge + sidebar count for games
  installed since the last scan (acknowledged when viewed).
- CLI: rigdoctor games / games libraries [--enable|--disable|--all|--json]
  (headless-complete, D17).
- Tests for VDF parse, scan, tool filter, cache diff, config list round-trip.
- Docs (MODULES/ROADMAP) updated; version 0.7.3 -> 0.8.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 07:43:31 +02:00
jessey f25ac939cc fix(share): terminal scrollback for large output
release / release (push) Successful in 14s
Render with pyte.HistoryScreen and show scrollback + screen, so large output
(ls -la, cat, etc.) can be scrolled up to read. Auto-scroll to the bottom only
when already at the bottom; preserve position when the user has scrolled up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 20:27:51 +02:00
jessey b47006bc22 fix(share): terminal caret position; remove GUI Inventory tab (use CLI)
release / release (push) Successful in 14s
- The shared terminal caret now sits at the real cursor (row and column) instead
  of the start of the line.
- Remove the GUI Inventory tab; `rigdoctor inventory` (CLI) covers it. Inventory is
  still collected for the relay guest view (so a remote helper sees the host's
  hardware) and via launch elevation. Deletes gui/inventory_page.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 20:25:52 +02:00
jessey 00394c287c fix(share): terminal access for late joiners, auto-scroll, inventory scroll
release / release (push) Successful in 14s
- A guest who joined after the host enabled the terminal stayed read-only; the
  host now re-sends the terminal state on join (req_full), so the terminal works.
- The shared terminal follows the cursor to the bottom as output arrives (ls -la)
  instead of staying scrolled up.
- The Inventory page preserves scroll position on refresh (and skips re-rendering
  unchanged data), so it no longer jumps to the top while you're reading.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 20:22:25 +02:00
jessey 2f6cab72c4 feat: shared PTY terminal (M12 Tier 3) + readable form controls
release / release (push) Successful in 14s
- feat(share): host-consented interactive terminal over the relay. The host shares
  a real PTY shell (core/pty_session.py); the guest renders it with pyte and sends
  keystrokes (gui/terminal_widget.py) — vim/top/tab-completion/Ctrl-C work. Runs as
  the host's user (never root). The host reads along live and can type too, e.g. a
  sudo password, which stays local and is never sent to the guest. Off by default.
  Guest also pulls inventory on join (req_full).
- fix(gui): style all form controls (QLineEdit/QPlainTextEdit/spin boxes/combo/
  terminals) dark-on-light-text — Fusion defaulted them to unreadable light-on-light.
- replaces the command/response shell with the full PTY; adds pyte to the gui extra.

Verified end-to-end against the deployed relay (guest keystroke ran on host PTY).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 20:16:22 +02:00
36 changed files with 2785 additions and 264 deletions
+116
View File
@@ -5,6 +5,122 @@ All notable changes to RigDoctor are recorded here. Format follows
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git (`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
release tag (so the auto-updater, D18, can compare versions). release tag (so the auto-updater, D18, can compare versions).
## [0.12.0] - 2026-05-22
### Added
- **Guided diagnostic in the GUI.** Each game on the **Games** page now has a **Run Diagnostic**
button → a focused, game-tagged capture starts and a recording banner appears (live sample
count, GPU-lost indicator) with **Finish & analyze** / **Discard**. Finishing opens a results
dialog: the window-scoped capture summary (peak temps/power, events, last samples) plus the
health findings as cards. The banner persists/restores if you navigate away and back while a
capture is running. Shares `core/diagnostic.py` with the CLI (one flow, three front-ends).
## [0.11.0] - 2026-05-22
### Added
- **Guided diagnostic session (CLI) — the seed use case, end to end.** `rigdoctor diagnose
start --game "<name>"` runs a **focused crash-capture tagged with that game** (its own
diagnostic log, so the report is scoped to just that session), `diagnose status` shows
progress, and `diagnose finish` stops it and prints a combined report: the **capture
summary** (peak temps/power, GPU-lost events, last samples — M3) plus the **health findings**
(Xid/SMART/driver/etc. — M4). The game can be given by `--game` or `--appid` (resolved from
the Steam scan), and is recorded as a log event so it survives a crash + reboot.
- Shared orchestration lives in `core/diagnostic.py` (one callable for CLI/GUI/tray, per
ARCHITECTURE §7.1); the recorder/`record run` gained an optional `--game` tag.
## [0.10.2] - 2026-05-22
### Changed
- When an Environment **Apply**/**Install** fails, the status now shows the **real reason**
(cancelled at the password prompt vs. the system rejecting the change, e.g. a BIOS/kernel-
locked PCIe ASPM policy) instead of a vague "cancelled, or needs privileges".
## [0.10.1] - 2026-05-22
### Fixed
- **Environment-page contrast.** The combo-box **drop-down list** was rendering light-on-light
(the popup view is a separate widget the theme didn't cover) — now dark with readable text.
- The **Install / Apply** buttons on findings were hard to read (the accent fill didn't paint
reliably inside the finding cards, leaving dim dark-on-dark text). They're now an outlined
style — bright accent text on the dark card, filling accent on hover — readable regardless,
and given a minimum height so the row can't crush them.
## [0.10.0] - 2026-05-22
### Added
- **Actionable Environment page (M6) — install & apply, not just advice.** Findings that
recommend a tool or a setting are now one-click:
- **Install buttons** for GameMode, MangoHud, and cpupower (added to the M9 component catalog,
so they also appear on the **Setup** page with the existing installer).
- **Apply controls** for runtime-reversible tunables — a dropdown of the live options + Apply,
via a single pkexec prompt, no reboot: **CPU governor**, **NVIDIA persistence mode**,
**PCIe ASPM policy**, **vm.swappiness**, **Transparent HugePages** (`core/fixes.py`). The
chosen value is validated against the live options before anything runs.
- This is the consent-gated apply milestone D9 anticipated, scoped to safe settings (**D22**).
GRUB-based fixes and CPU mitigations stay suggestion-only; `rigdoctor gameenv` still prints
the exact commands for headless use.
### Changed
- The `Finding` model gained optional `action` (installable component) and `fix` (applyable
tunable) fields; the shared `finding_card` widget renders the matching control.
## [0.9.0] - 2026-05-22
### Added
- **Gaming environment checks (M6) — the evaluate-and-suggest engine.** A new read-only report
(D9) that flags system settings which hurt gaming stability/performance and gives the exact fix
command. Checks: **PCIe ASPM**, **NVIDIA persistence mode**, **CPU governor** (the three that
map to the seed-case GPU bus-drop / Xid 79), GameMode, MangoHud, `vm.swappiness`, shader disk
cache, Transparent HugePages, CPU mitigations, and installed Proton versions.
- **CLI:** `rigdoctor gameenv` (text or `--json`).
- **GUI:** a new **Environment** page (findings cards, auto-runs on open), reusing the M4
health-report card style via a shared `finding_card` widget.
### Fixed
- **Notification icon** now uses the RigDoctor icon (matching the app/dock) instead of a generic
stock icon — resolved from the installed icon theme, the bundled asset, then a stock fallback.
## [0.8.0] - 2026-05-22
### Added
- **Gaming environment checks (M6) — Steam game detection.** RigDoctor now finds your Steam
libraries (across multiple drives, via `libraryfolders.vdf`) and the games installed in each
(parsing `appmanifest_*.acf` — stdlib only, no Steam tooling needed). Runtimes, Proton builds,
and redistributables are filtered out.
- **Opt-in libraries:** detected libraries are listed with a per-library game count; you check
the ones to scan. Nothing is scanned until you pick a library.
- **Background scan on every launch:** the GUI rescans the selected libraries in the background
when it opens and flags games installed since the last scan with a **NEW** badge plus a count
on the **Games** sidebar item (cleared when you view the page). Results are cached
(`~/.local/state/rigdoctor/games.json`) so the list shows instantly.
- **CLI:** `rigdoctor games` lists detected games; `rigdoctor games libraries
[--enable PATH | --disable PATH | --all]` lists/selects libraries (headless-complete, D17).
- Config now supports list values (TOML arrays); `steam_libraries` records the selected libraries.
## [0.7.3] - 2026-05-21
### Fixed
- Shared terminal now has **scrollback** — large output (e.g. `ls -la`) can be scrolled up to
read; it keeps a history buffer and only auto-scrolls to the bottom when you're already there.
## [0.7.2] - 2026-05-21
### Changed
- Removed the GUI **Inventory** tab — use the CLI `rigdoctor inventory` instead. (Inventory is
still collected for the relay guest view, so a remote helper still sees the host's hardware.)
### Fixed
- Shared terminal caret now sits at the real cursor position (row **and** column) instead of
the start of the line.
## [0.7.1] - 2026-05-21
### Fixed
- Shared terminal: a guest who joined **after** the host enabled the terminal stayed read-only.
The host now re-sends the terminal state when a guest joins, so the terminal is available.
- Inventory page no longer jumps back to the top when it refreshes (e.g. when elevated data
arrives) — scroll position is preserved and unchanged data isn't re-rendered.
- Shared terminal now follows the cursor to the bottom as output arrives (e.g. `ls -la`),
instead of staying scrolled up.
## [0.7.0] - 2026-05-21
### Added
- **Shared terminal (M12, Tier 3)**: when the host enables it, the session shares a real **PTY**
shell — the guest gets an interactive terminal (vim, top, tab-completion, Ctrl-C) running on
the host as the host's user. The host **reads along** live and can type too, e.g. a `sudo`
password — which stays local and is never sent to the guest. Off by default, host-consented.
The guest also pulls the host's inventory on join.
### Fixed
- **Input contrast**: all form controls (text fields, spin boxes, combo boxes, terminals) now
use the dark theme with readable text (Fusion defaulted them to light-on-light).
## [0.6.0] - 2026-05-21 ## [0.6.0] - 2026-05-21
### Added ### Added
- **Session sharing over the relay (M12)**: a **Share** tab — *Start shared session* (host) - **Session sharing over the relay (M12)**: a **Share** tab — *Start shared session* (host)
+17 -1
View File
@@ -223,9 +223,25 @@ The next version is **determined by the Conventional Commit types** since the la
`packaging/bump.sh` writes it into `__init__.py` + `pyproject.toml`. Rules live in `packaging/bump.sh` writes it into `__init__.py` + `pyproject.toml`. Rules live in
`cliff.toml [bump]` (pre-1.0: `breaking_always_bump_major = false`). `cliff.toml [bump]` (pre-1.0: `breaking_always_bump_major = false`).
### D22 — Limited live apply of fixes (M6) — *DECIDED 2026-05-22; realizes the D9 milestone*
D9 deferred auto-applying fixes to "a deliberate later milestone, gated behind explicit user
consent." That milestone lands here, **scoped tightly to stay safe**:
- **Only runtime-reversible settings** are applyable from the gaming-environment report (M6):
**CPU governor, NVIDIA persistence mode, PCIe ASPM policy, vm.swappiness, Transparent
HugePages.** Each takes effect immediately, needs **no reboot**, and reverts on reboot.
- **How:** a dropdown of the live options + an Apply button per finding (`core/fixes.py`).
Applying runs a **single pkexec-elevated command** (one auth prompt); the chosen value is
validated against the live options first; writes target **sysfs/procfs or `nvidia-smi`** —
never the GRUB cmdline or a persistent config file.
- **Still suggestion-only** (the read-only stance holds for these): GRUB-based `pcie_aspm=off`,
CPU **mitigations** changes (security-sensitive, need a reboot), and the shader-cache env var.
- Everything remains **CLI-discoverable** (`rigdoctor gameenv` still prints the exact commands);
the apply UI is an additive convenience in the GUI, not the only path. Installing optional
tools (GameMode/MangoHud/cpupower) reuses the M9 installer and is likewise one-click.
## Open ## Open
None currently — all tracked decisions (D1D21) are resolved. New questions will be added None currently — all tracked decisions (D1D22) are resolved. New questions will be added
here as they arise. Remaining detail to flesh out during build: the tray's supporting-action here as they arise. Remaining detail to flesh out during build: the tray's supporting-action
set (D13), per-module apt package names, M12's tunnel/token specifics, and M13's set (D13), per-module apt package names, M12's tunnel/token specifics, and M13's
update mechanism (APT repo vs. self-installed `.deb`). update mechanism (APT repo vs. self-installed `.deb`).
+18 -2
View File
@@ -14,7 +14,7 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
| M2 | Live monitor (TUI) | Monitoring | none (stdlib curses) | all | P1 | ⬜ | | M2 | Live monitor (TUI) | Monitoring | none (stdlib curses) | all | P1 | ⬜ |
| M8 | Alerting | Monitoring | libnotify (opt) | all | P2 | 🟨 | | M8 | Alerting | Monitoring | libnotify (opt) | all | P2 | 🟨 |
| M5 | System inventory | Diagnostics | none (opt: lm-sensors, dmidecode) | all | P1 | 🟨 | | M5 | System inventory | Diagnostics | none (opt: lm-sensors, dmidecode) | all | P1 | 🟨 |
| M6 | Gaming env checks | Diagnostics | none | all | P2 | | | M6 | Gaming env checks | Diagnostics | none | all | P2 | 🟨 |
| M10 | Desktop GUI | Desktop UI | **python3-pyside6** | all | P2 | 🟨 | | M10 | Desktop GUI | Desktop UI | **python3-pyside6** | all | P2 | 🟨 |
| M11 | Tray / menu-bar applet | Desktop UI | **python3-pyside6** (+ AppIndicator on GNOME) | all | P2 | ⬜ | | M11 | Tray / menu-bar applet | Desktop UI | **python3-pyside6** (+ AppIndicator on GNOME) | all | P2 | ⬜ |
| M9 | Installer | (meta) | none | all | P1 | 🟨 | | M9 | Installer | (meta) | none | all | P1 | 🟨 |
@@ -41,7 +41,23 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
(text/JSON) + GUI Health tab. GPU-firmware verification deferred. (text/JSON) + GUI Health tab. GPU-firmware verification deferred.
- **M2 Live monitor** — depends on M1; the terminal "HWMonitor for Linux" face. Stdlib-only. - **M2 Live monitor** — depends on M1; the terminal "HWMonitor for Linux" face. Stdlib-only.
- **M5 / M6 Diagnostics** — inventory export + gaming-env checks; M6 flags risky settings and - **M5 / M6 Diagnostics** — inventory export + gaming-env checks; M6 flags risky settings and
suggests the fix command but does not apply it (D9). suggests the fix command but does not apply it (D9). *M6 implemented (Steam detection first —
the D12 "pick a game" foundation):* discovers Steam installs + all library folders
(`libraryfolders.vdf`, multi-drive) and the games in each (`appmanifest_*.acf`), filtering
runtimes/Proton/redistributables — stdlib only. **Libraries are opt-in** (`steam_libraries`
config); the GUI **Games** page lists them with per-library counts and rescans in the
background on every launch, badging games installed since the last scan (cached in
`state/games.json`). CLI: `rigdoctor games` / `games libraries [--enable|--disable|--all]`.
*Env-check engine implemented* (`core/gameenv.py`): a read-only findings report (reusing the
M4 `Finding` model) over PCIe ASPM, NVIDIA persistence mode, CPU governor (the three seed-case
contributors to GPU bus-drop / Xid 79), GameMode, MangoHud, swappiness, shader cache, THP, CPU
mitigations, and installed Proton versions — each with the suggested fix command. CLI
`rigdoctor gameenv`; GUI **Environment** page. Per **D22**, the GUI adds **one-click apply**
for the runtime-reversible tunables (governor / NVIDIA persistence / PCIe ASPM / swappiness /
THP — dropdown + Apply via a single pkexec prompt, `core/fixes.py`) and **one-click install**
of optional tools (GameMode / MangoHud / cpupower, now in the M9 catalog). GRUB/mitigations
stay suggestion-only. *Pending:* non-Steam launchers (Lutris/Heroic) and GPU power-profile
(PowerMizer) checks.
- **M8 Alerting** — threshold/event notifications; integrates with the tray applet (M11). - **M8 Alerting** — threshold/event notifications; integrates with the tray applet (M11).
- **M10 Desktop GUI** — PySide6 graphical front-end over the core engine (dashboard, log - **M10 Desktop GUI** — PySide6 graphical front-end over the core engine (dashboard, log
browser, report viewer, logger controls). Optional; adds the Qt dependency. *Bootstrapped browser, report viewer, logger controls). Optional; adds the Qt dependency. *Bootstrapped
+20 -7
View File
@@ -27,15 +27,26 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
## Phase 3 — Diagnostics breadth ## Phase 3 — Diagnostics breadth
- [ ] M5 system inventory + exportable report - [ ] M5 system inventory + exportable report
- [ ] M6 gaming environment checks (suggest-only) - [~] M6 gaming environment checks (suggest-only) — *Steam game/library detection done*
(multi-library `libraryfolders.vdf` discovery + `appmanifest` scan, opt-in libraries,
launch-time background rescan with new-game badge; CLI `rigdoctor games`, GUI Games page).
This is also the D12 "pick a game" foundation. *Env-check engine done* (`rigdoctor gameenv`
+ GUI Environment page): PCIe ASPM, NVIDIA persistence, CPU governor, GameMode, MangoHud,
swappiness, shader cache, THP, mitigations, Proton versions — read-only with fix commands.
*Pending:* non-Steam launchers (Lutris/Heroic) + GPU power-profile (PowerMizer) checks.
- [ ] SMART integration (smartmontools if present) - [ ] SMART integration (smartmontools if present)
## Phase 4 — Desktop UI & installer ## Phase 4 — Desktop UI & installer
- [ ] M10 desktop GUI (PySide6: dashboard, log browser, report viewer, logger controls) - [ ] M10 desktop GUI (PySide6: dashboard, log browser, report viewer, logger controls)
- [ ] M11 tray / menu-bar applet (QSystemTrayIcon: live M1 readouts + Run Diagnostic + - [ ] M11 tray / menu-bar applet (QSystemTrayIcon: live M1 readouts + Run Diagnostic +
supporting actions — D13) supporting actions — D13)
- [ ] Guided diagnostic session (pick game → focused M3 capture → M4 scan → findings), - [~] Guided diagnostic session (pick game → focused M3 capture → M4 scan → findings),
shared by tray/GUI/CLI shared by tray/GUI/CLI*core + CLI + GUI done* (`core/diagnostic.py`, `rigdoctor
diagnose start/status/finish`, and a **Run Diagnostic** button per game on the GUI Games
page → recording banner → results dialog with the capture summary + findings). Tags a
focused capture with the chosen game (own diagnostic log, window-scoped report) and
combines the capture summary with the M4 findings. *Pending:* the tray (M11) entry point,
and auto start/stop via the D12 wrapper/watcher.
- [ ] Logger trigger modes: always-on + game-launch (D12 — wrapper first: - [ ] Logger trigger modes: always-on + game-launch (D12 — wrapper first:
`rigdoctor wrap %command%` + global Steam compat-tool; zero-config watcher `rigdoctor wrap %command%` + global Steam compat-tool; zero-config watcher
(Steam RunningAppID + /proc) and GameMode hook follow) (Steam RunningAppID + /proc) and GameMode hook follow)
@@ -51,8 +62,10 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
- [x] M13 auto-update (D18) — launch-time version check (GUI sidebar) + no-root self-update - [x] M13 auto-update (D18) — launch-time version check (GUI sidebar) + no-root self-update
apply (`rigdoctor update` / sidebar button → authenticated pip upgrade), token-gated. apply (`rigdoctor update` / sidebar button → authenticated pip upgrade), token-gated.
Restart-after-update is manual for now. Restart-after-update is manual for now.
- [ ] (Later, separate milestone) Optional auto-apply of suggested fixes behind explicit - [~] Optional auto-apply of suggested fixes behind explicit consent (D9 milestone) — *first
consent — currently out of scope (D9) cut shipped for M6 (D22):* one-click apply of runtime-reversible tunables (CPU governor,
NVIDIA persistence, PCIe ASPM, swappiness, THP) via a single pkexec prompt, no reboot.
GRUB-based fixes + CPU mitigations remain suggestion-only.
## Phase 6 — Session sharing / remote assist (M12, D16) ## Phase 6 — Session sharing / remote assist (M12, D16)
Escalating ladder, built in order: Escalating ladder, built in order:
@@ -60,8 +73,8 @@ Escalating ladder, built in order:
it in RigDoctor. One-way, safest. it in RigDoctor. One-way, safest.
- [x] Tier 2: live read-only view — `rigdoctor share serve` (stdlib HTTP, token-gated: - [x] Tier 2: live read-only view — `rigdoctor share serve` (stdlib HTTP, token-gated:
sensors + health + inventory). Remote = user-chosen tunnel; GUI controls still to add. sensors + health + inventory). Remote = user-chosen tunnel; GUI controls still to add.
- [ ] Tier 3: gated interactive terminal (wrap tmate/sshx; read-only default, read-write on - [x] Tier 3: host-consented interactive terminal — a real PTY shell shared over the relay
explicit consent), with session audit log. (own `pty`, pyte-rendered guest), off by default; host reads along + can type (sudo).
> **Out of scope:** stress/repro module (D7); multi-distro support and packaging beyond > **Out of scope:** stress/repro module (D7); multi-distro support and packaging beyond
> Ubuntu/apt + `.deb` (D15) — a thin seam is kept but not built out. > Ubuntu/apt + `.deb` (D15) — a thin seam is kept but not built out.
+10 -5
View File
@@ -43,9 +43,12 @@ RigDoctor's crash-safe logger is designed to fix exactly that.
- **Not a stress-test / load-generator** — explicitly out of scope (D7). Users can run - **Not a stress-test / load-generator** — explicitly out of scope (D7). Users can run
existing tools (gpu-burn, vkmark, stress-ng) alongside the logger if they want. existing tools (gpu-burn, vkmark, stress-ng) alongside the logger if they want.
- Not an overclocking utility. - Not an overclocking utility.
- **Not (yet) an auto-fixer.** RigDoctor is **read-only**: it diagnoses and *suggests* - **Read-only by default, with a narrow consent-gated exception.** RigDoctor diagnoses and
actions (with the exact command where possible) but does not apply changes itself in this *suggests* actions (with the exact command where possible). It does **not** apply changes
stage. Auto-apply is a deliberate later milestone behind explicit consent. (D9) itself — **except** a small set of **runtime-reversible** gaming tunables (M6: CPU governor,
NVIDIA persistence, PCIe ASPM policy, swappiness, THP) that can be applied from the GUI via a
single pkexec prompt, no reboot, revert on reboot (D22, realizing the D9 milestone). Risky/
persistent fixes (GRUB cmdline, CPU mitigations) remain suggestion-only.
## 3. Target users & platforms ## 3. Target users & platforms
@@ -96,8 +99,10 @@ PCIe topology. Exportable (Markdown/JSON) to paste into forum/bug reports.
### M6 — Gaming environment checks ### M6 — Gaming environment checks
Detects & evaluates: GPU power profile / persistence mode, CPU governor, Proton/Wine/Steam Detects & evaluates: GPU power profile / persistence mode, CPU governor, Proton/Wine/Steam
versions, GameMode, MangoHud, shader cache, swappiness, hugepages, CPU mitigations, versions, GameMode, MangoHud, shader cache, swappiness, hugepages, CPU mitigations,
PCIe ASPM. Flags settings that hurt stability/performance and **suggests** the fix command PCIe ASPM. Flags settings that hurt stability/performance and **suggests** the fix command.
(read-only per D9). Also includes Steam library/game detection (the D12 "pick a game" foundation) and, per D22,
a **one-click apply** for the runtime-reversible tunables (governor, persistence, ASPM,
swappiness, THP) plus one-click install of optional tools (GameMode/MangoHud/cpupower).
### M8 — Alerting ### M8 — Alerting
Threshold + event alerts (desktop notification / sound / log) on overheat, throttle, Threshold + event alerts (desktop notification / sound / log) on overheat, throttle,
+2 -2
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "rigdoctor" name = "rigdoctor"
version = "0.6.0" version = "0.12.0"
description = "Modular hardware monitoring & crash diagnostics for Linux gamers." description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@@ -13,7 +13,7 @@ requires-python = ">=3.11"
dependencies = [] dependencies = []
[project.optional-dependencies] [project.optional-dependencies]
gui = ["PySide6"] gui = ["PySide6", "pyte"]
[project.scripts] [project.scripts]
rigdoctor = "rigdoctor.cli:main" rigdoctor = "rigdoctor.cli:main"
+1 -1
View File
@@ -1,3 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers.""" """RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.6.0" __version__ = "0.12.0"
+182
View File
@@ -86,6 +86,7 @@ def cmd_record_run(args) -> int:
max_bytes=cfg["log_max_bytes"], max_bytes=cfg["log_max_bytes"],
backups=cfg["log_backups"], backups=cfg["log_backups"],
status_path=config.STATUS_FILE, status_path=config.STATUS_FILE,
game=getattr(args, "game", None),
) )
def _handle(_sig, _frame): def _handle(_sig, _frame):
@@ -345,6 +346,158 @@ def cmd_report(args) -> int:
return 0 return 0
def _resolve_game(args) -> str | None:
"""Game name from --game, or looked up from --appid via the Steam scan."""
if getattr(args, "game", None):
return args.game
if getattr(args, "appid", None):
from .core import steam
for g in steam.scan_games(steam.selected_library_paths()):
if g.appid == str(args.appid):
return g.name
return None
return None
def cmd_diagnose(args) -> int:
from .core import diagnostic, reccontrol, steam
sub = args.diagnose_cmd or "status"
if sub == "start":
if reccontrol.running_pid():
print("A capture is already running — finish it with: rigdoctor diagnose finish")
return 1
game = _resolve_game(args)
if game is None and (args.game or args.appid):
print("Couldn't match that game in your selected Steam libraries.")
return 1
if game is None:
games = steam.cached_games() or steam.scan_games(steam.selected_library_paths())
if games:
print("Pick a game to focus on, then re-run with --game:")
for g in games:
print(f" --game {g.name!r}")
else:
print("No games detected. Select a library: rigdoctor games libraries --all")
return 1
pid = diagnostic.start(game=game, interval=args.interval)
time.sleep(1.0)
if pid and reccontrol.pid_alive(pid):
print(f"Diagnostic capture started for {game!r} (pid {pid}).")
print(" Play your game. When you're done (or after a crash + reboot):")
print(" rigdoctor diagnose finish")
return 0
print(f"Capture failed to start; see {config.SPAWN_LOG}")
return 1
if sub == "status":
status = diagnostic.active()
if not status:
print("No diagnostic capture is running.")
return 0
game = status.get("game") or ""
print(f"Capturing for {game!r}: {status.get('samples', 0)} samples"
+ (" · GPU-lost seen" if status.get("gpu_lost") else ""))
return 0
# finish
if not reccontrol.running_pid() and not config.DIAG_LOG.exists():
print("No diagnostic to analyze. Start one with: rigdoctor diagnose start --game <name>")
return 1
print("Stopping capture and analyzing…\n")
result = diagnostic.finish(last_n=args.last)
from .render import render_health, render_summary
if result.game:
print(f"Diagnostic — {result.game}\n")
print(render_summary(result.summary, log_path=config.DIAG_LOG))
print("\n" + render_health(result.findings, title="Findings"))
return 0
def cmd_gameenv(args) -> int:
from dataclasses import asdict
from .core.gameenv import run_gameenv_checks
from .render import render_health
findings = run_gameenv_checks()
if args.json:
print(json.dumps([asdict(f) for f in findings], indent=2, ensure_ascii=False))
else:
print(render_health(findings, title="Gaming environment"))
return 0
def cmd_games(args) -> int:
from .core import steam
selected = steam.selected_library_paths()
if not selected:
print("No Steam libraries selected to scan.")
print(" See them with: rigdoctor games libraries")
print(" Then enable one: rigdoctor games libraries --enable <path> (or --all)")
return 1
result = steam.rescan()
if args.json:
from dataclasses import asdict
print(json.dumps({
"scanned_at": result.scanned_at,
"new_appids": result.new_appids,
"games": [asdict(g) for g in result.games],
}, indent=2, ensure_ascii=False))
return 0
if not result.games:
print("No games found in the selected Steam libraries.")
return 0
new = set(result.new_appids)
print(f"{len(result.games)} game(s) across {len(selected)} librar(y/ies):\n")
for g in result.games:
flag = " NEW" if g.appid in new else ""
print(f" {g.name:<48} {steam.human_size(g.size_bytes):>9}{flag}")
if new:
print(f"\n{len(new)} newly-installed since the last scan.")
return 0
def cmd_games_libraries(args) -> int:
from .core import steam
discovered = steam.discover_libraries()
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
# --all / --enable / --disable adjust the selection, then we list the result.
if args.all or args.enable or args.disable:
if args.all:
selected = {lib.path for lib in discovered}
for raw in args.enable or []:
selected.add(os.path.realpath(os.path.expanduser(raw)))
for raw in args.disable or []:
selected.discard(os.path.realpath(os.path.expanduser(raw)))
config.update_config(steam_libraries=sorted(selected))
if not discovered:
print("No Steam libraries detected (is Steam installed?).")
return 1
if args.json:
print(json.dumps([
{"path": lib.path, "label": lib.label, "selected": lib.path in selected,
"games": len(steam.scan_library(lib.path))}
for lib in discovered
], indent=2, ensure_ascii=False))
return 0
print("Steam libraries (checked = scanned for games):\n")
for lib in discovered:
mark = "x" if lib.path in selected else " "
count = len(steam.scan_library(lib.path))
label = f" [{lib.label}]" if lib.label else ""
print(f" [{mark}] {lib.path}{label} ({count} games)")
return 0
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser( p = argparse.ArgumentParser(
prog="rigdoctor", prog="rigdoctor",
@@ -389,6 +542,7 @@ def build_parser() -> argparse.ArgumentParser:
run_p = rec_sub.add_parser("run", help="run the capture loop in the foreground (systemd-friendly)") run_p = rec_sub.add_parser("run", help="run the capture loop in the foreground (systemd-friendly)")
run_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)") run_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
run_p.add_argument("-o", "--out", default=None, help="log file path") run_p.add_argument("-o", "--out", default=None, help="log file path")
run_p.add_argument("--game", default=None, help="tag the capture with a game name (M6/diagnose)")
run_p.set_defaults(func=cmd_record_run) run_p.set_defaults(func=cmd_record_run)
start_p = rec_sub.add_parser("start", help="start recording in the background") start_p = rec_sub.add_parser("start", help="start recording in the background")
@@ -423,6 +577,34 @@ def build_parser() -> argparse.ArgumentParser:
inv.add_argument("--markdown", action="store_true", help="output Markdown (for forum/bug reports)") inv.add_argument("--markdown", action="store_true", help="output Markdown (for forum/bug reports)")
inv.add_argument("-o", "--output", default=None, help="write to a file instead of stdout") inv.add_argument("-o", "--output", default=None, help="write to a file instead of stdout")
inv.set_defaults(func=cmd_inventory) inv.set_defaults(func=cmd_inventory)
games_p = sub.add_parser("games", help="Steam game & library detection (M6)")
games_p.add_argument("--json", action="store_true", help="output JSON")
games_p.set_defaults(func=cmd_games)
games_sub = games_p.add_subparsers(dest="games_cmd")
lib_p = games_sub.add_parser("libraries", help="list/select Steam libraries to scan")
lib_p.add_argument("--enable", action="append", metavar="PATH", help="scan this library (repeatable)")
lib_p.add_argument("--disable", action="append", metavar="PATH", help="stop scanning this library (repeatable)")
lib_p.add_argument("--all", action="store_true", help="scan all detected libraries")
lib_p.add_argument("--json", action="store_true", help="output JSON")
lib_p.set_defaults(func=cmd_games_libraries)
env_p = sub.add_parser("gameenv", help="gaming environment checks (M6): flag stability/perf settings")
env_p.add_argument("--json", action="store_true", help="output JSON instead of text")
env_p.set_defaults(func=cmd_gameenv)
diag_p = sub.add_parser("diagnose", help="guided diagnostic: capture while gaming, then analyze")
diag_sub = diag_p.add_subparsers(dest="diagnose_cmd")
diag_start = diag_sub.add_parser("start", help="start a focused capture for a game")
diag_start.add_argument("--game", default=None, help="game name to focus on")
diag_start.add_argument("--appid", default=None, help="Steam appid to focus on (resolved to a name)")
diag_start.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
diag_start.set_defaults(func=cmd_diagnose)
diag_sub.add_parser("status", help="show the in-progress diagnostic").set_defaults(func=cmd_diagnose)
diag_finish = diag_sub.add_parser("finish", help="stop the capture and analyze it")
diag_finish.add_argument("--last", type=int, default=10, help="recent samples to show")
diag_finish.set_defaults(func=cmd_diagnose)
diag_p.set_defaults(func=cmd_diagnose, diagnose_cmd=None, last=10)
return p return p
+10
View File
@@ -23,10 +23,17 @@ CONFIG_FILE = CONFIG_DIR / "config.toml"
# Crash-capture logger (M3) # Crash-capture logger (M3)
LOG_FILE = LOG_DIR / "capture.jsonl" LOG_FILE = LOG_DIR / "capture.jsonl"
# Guided diagnostic (M6/D12): a focused capture writes here, separate from the always-on
# crash log, so its report covers only that session's window.
DIAG_LOG = LOG_DIR / "diagnostic.jsonl"
STATUS_FILE = STATE_DIR / "recorder.json" STATUS_FILE = STATE_DIR / "recorder.json"
PID_FILE = STATE_DIR / "recorder.pid" PID_FILE = STATE_DIR / "recorder.pid"
SPAWN_LOG = STATE_DIR / "recorder.out" SPAWN_LOG = STATE_DIR / "recorder.out"
# Gaming environment / game detection (M6) — cached Steam game scan (mutable state,
# not config: refreshed by the background scan on every launch).
GAMES_FILE = STATE_DIR / "games.json"
# Update access token (M13) — gates updates to Gitea account holders (D18). # Update access token (M13) — gates updates to Gitea account holders (D18).
# Stored in the OS keyring (Secret Service / GNOME Keyring) via `secret-tool` when # Stored in the OS keyring (Secret Service / GNOME Keyring) via `secret-tool` when
# available — encrypted at rest, unlocked with the login session — else a 0600 file. # available — encrypted at rest, unlocked with the login session — else a 0600 file.
@@ -143,6 +150,7 @@ DEFAULTS: dict = {
"gpu_temp_alert": 90.0, # °C — alert when GPU reaches this "gpu_temp_alert": 90.0, # °C — alert when GPU reaches this
"cpu_temp_alert": 95.0, # °C — alert when CPU reaches this "cpu_temp_alert": 95.0, # °C — alert when CPU reaches this
"relay_url": "wss://rigdoctor.jesseyvanofferen.com", # session-sharing relay (M12) "relay_url": "wss://rigdoctor.jesseyvanofferen.com", # session-sharing relay (M12)
"steam_libraries": [], # Steam library paths to scan for games (M6); empty = none picked yet
} }
@@ -165,6 +173,8 @@ def _toml_value(value) -> str:
return "true" if value else "false" return "true" if value else "false"
if isinstance(value, (int, float)): if isinstance(value, (int, float)):
return repr(value) return repr(value)
if isinstance(value, (list, tuple)):
return "[" + ", ".join(_toml_value(v) for v in value) + "]"
return '"' + str(value).replace("\\", "\\\\").replace('"', '\\"') + '"' return '"' + str(value).replace("\\", "\\\\").replace('"', '\\"') + '"'
+20 -2
View File
@@ -10,24 +10,42 @@ from __future__ import annotations
import shutil import shutil
import subprocess import subprocess
import time import time
from pathlib import Path
from ..config import DATA_DIR
from .sample import Sample from .sample import Sample
APP_NAME = "RigDoctor" APP_NAME = "RigDoctor"
_ICON = "utilities-system-monitor" _STOCK_ICON = "utilities-system-monitor"
# The RigDoctor icon, so notifications match the app/dock icon. Prefer the copy that
# desktop integration installs into the icon theme (~/.local/share/icons/...); fall back to
# the bundled asset for source/dev runs, then to a stock icon if neither is present.
_INSTALLED_ICON = DATA_DIR.parent / "icons" / "hicolor" / "scalable" / "apps" / "rigdoctor.svg"
_BUNDLED_ICON = Path(__file__).parents[1] / "gui" / "assets" / "rigdoctor.svg"
def available() -> bool: def available() -> bool:
return shutil.which("notify-send") is not None return shutil.which("notify-send") is not None
def _icon() -> str:
"""Resolve the notification icon at call time (the themed copy may be installed late)."""
for path in (_INSTALLED_ICON, _BUNDLED_ICON):
try:
if path.exists():
return str(path)
except OSError:
pass
return _STOCK_ICON
def notify(title: str, message: str, urgency: str = "normal") -> bool: def notify(title: str, message: str, urgency: str = "normal") -> bool:
"""Send a desktop notification (best-effort). urgency: low|normal|critical.""" """Send a desktop notification (best-effort). urgency: low|normal|critical."""
if not available(): if not available():
return False return False
try: try:
subprocess.run( subprocess.run(
["notify-send", "-a", APP_NAME, "-u", urgency, "-i", _ICON, title, message], ["notify-send", "-a", APP_NAME, "-u", urgency, "-i", _icon(), title, message],
timeout=10, timeout=10,
check=False, check=False,
) )
+19
View File
@@ -45,4 +45,23 @@ COMPONENTS: tuple[Component, ...] = (
"libsecret", "Encrypted token storage", "Updates", "libsecret", "Encrypted token storage", "Updates",
"Store the update token in the OS keyring, encrypted", ("libsecret-tools",), "secret-tool", "Store the update token in the OS keyring, encrypted", ("libsecret-tools",), "secret-tool",
), ),
Component(
"gamemode", "Feral GameMode", "Gaming",
"Auto-applies performance tweaks (CPU governor, scheduling) while a game runs",
("gamemode",), "gamemoderun",
),
Component(
"mangohud", "MangoHud", "Gaming",
"In-game overlay for FPS, frame times, and temperatures", ("mangohud",), "mangohud",
),
Component(
"cpupower", "cpupower", "Gaming",
"Read/set the CPU frequency governor (e.g. performance for gaming)",
("linux-tools-common", "linux-tools-generic"), "cpupower",
),
) )
def by_id(component_id: str) -> Component | None:
"""Look up a catalog component by its id (None if unknown)."""
return next((c for c in COMPONENTS if c.id == component_id), None)
+84
View File
@@ -0,0 +1,84 @@
"""Guided diagnostic session (SPEC §4 / ARCHITECTURE §7.1): orchestrate M3 + M4.
The seed use case, one flow: **pick a game** **focused crash-capture** scoped to that
session (M3, tagged with the game) on **finish**, **scan & analyze** (M4 health report)
over the captured window + system logs return a prioritized result. This is not a new
module it's a single shared callable so the CLI, GUI, and tray run the identical flow.
The capture is **manually bracketed** (start/finish) for now; auto start/stop on game launch
(the D12 wrapper/watcher) plugs in here later without changing the result shape.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from .. import config
from . import reccontrol
from .crashlog import Summary, summarize
from .health import Finding
@dataclass
class DiagnosticResult:
game: str | None
summary: Summary # capture window: peak temps/power, events, last samples (M3)
findings: list[Finding] # health findings: Xid/SMART/driver/etc. (M4)
def _clear_diag_log() -> None:
"""Each diagnostic is a fresh focused capture — drop any previous session + segments."""
base = config.DIAG_LOG
for p in [base, *base.parent.glob(base.name + ".*")]:
try:
p.unlink()
except OSError:
pass
def start(game: str | None = None, interval: float | None = None) -> int | None:
"""Begin a focused capture, tagged with the game, into the dedicated diagnostic log.
Returns the pid, or None if a capture is already running."""
if reccontrol.running_pid():
return None
_clear_diag_log()
return reccontrol.start_background(interval=interval, out=str(config.DIAG_LOG), game=game)
def is_running() -> bool:
return reccontrol.running_pid() is not None
def active() -> dict | None:
"""Status of the in-progress session (running flag, game, samples), or None if idle."""
if not is_running():
return None
return reccontrol.read_status()
def _await_stopped(timeout: float = 6.0) -> None:
deadline = time.monotonic() + timeout
while reccontrol.running_pid() and time.monotonic() < deadline:
time.sleep(0.1)
def _game_from_summary(summary: Summary) -> str | None:
"""Recover the focused game from the log's 'game' event (survives a crash + reboot)."""
for _ts, kind, detail in reversed(summary.events):
if kind == "game" and detail:
return detail
return None
def finish(last_n: int = 10, log_path=None) -> DiagnosticResult:
"""Stop the capture (if running), summarize the window, and run the health report."""
from .health import run_health_checks
reccontrol.stop_background()
_await_stopped()
path = log_path or config.DIAG_LOG
summary = summarize(path, last_n=last_n)
game = _game_from_summary(summary) or (reccontrol.read_status() or {}).get("game")
findings = run_health_checks()
return DiagnosticResult(game=game, summary=summary, findings=findings)
+177
View File
@@ -0,0 +1,177 @@
"""Apply runtime-reversible system tunables (M6) — a limited, consent-gated exception to
the read-only stance (D9, amended by D22).
Only safe settings that take effect immediately, need no reboot, and revert on reboot are
applyable here: CPU governor, NVIDIA persistence mode, PCIe ASPM policy, vm.swappiness, and
Transparent HugePages. Each is set by a single privileged command (one pkexec prompt). The
chosen value is validated against the live options before building the command, and writes go
to sysfs / procfs (or `nvidia-smi`) never the GRUB cmdline or a persistent config file.
Riskier fixes (GRUB-based PCIe ASPM-off, CPU mitigations) stay suggestion-only.
"""
from __future__ import annotations
import os
import shlex
import shutil
import subprocess
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
@dataclass
class Tunable:
id: str
label: str # e.g. "CPU governor"
options: list[str] # selectable values (live, from the system)
current: str | None # the value in effect now (preselect this in the dropdown)
note: str = "" # caveat shown by the control, e.g. "resets on reboot"
def _read(path: str) -> str | None:
try:
return Path(path).read_text()
except OSError:
return None
def _bracketed(text: str) -> tuple[list[str], str | None]:
"""Parse a sysfs 'a [b] c' enum into (options, active)."""
options = [tok.strip("[]") for tok in text.split()]
active = next((tok.strip("[]") for tok in text.split() if tok.startswith("[")), None)
return options, active
# --- individual tunables: a state reader + a command builder per id -------------------
_GOV = "/sys/devices/system/cpu"
def _cpu_governor() -> Tunable | None:
cur = _read(f"{_GOV}/cpu0/cpufreq/scaling_governor")
if cur is None:
return None
avail = _read(f"{_GOV}/cpu0/cpufreq/scaling_available_governors")
options = avail.split() if avail and avail.strip() else ["performance", "powersave", "schedutil"]
return Tunable("cpu_governor", "CPU governor", options, cur.strip(), "applies now; resets on reboot")
def _cpu_governor_cmd(value: str) -> list[str]:
return ["/bin/sh", "-c",
f'for f in {_GOV}/cpu*/cpufreq/scaling_governor; do echo {shlex.quote(value)} > "$f"; done']
def _nvidia_persistence() -> Tunable | None:
if shutil.which("nvidia-smi") is None:
return None
try:
proc = subprocess.run(
["nvidia-smi", "--query-gpu=persistence_mode", "--format=csv,noheader"],
capture_output=True, text=True, timeout=10,
)
except (subprocess.SubprocessError, OSError):
return None
state = proc.stdout.strip().splitlines()[0].strip().lower() if proc.stdout.strip() else ""
current = "Enabled" if state.startswith("enabled") else ("Disabled" if state.startswith("disabled") else None)
return Tunable("nvidia_persistence", "NVIDIA persistence mode", ["Enabled", "Disabled"], current,
"resets on reboot (enable nvidia-persistenced to persist)")
def _nvidia_persistence_cmd(value: str) -> list[str]:
return ["nvidia-smi", "-pm", "1" if value == "Enabled" else "0"]
def _pcie_aspm() -> Tunable | None:
text = _read("/sys/module/pcie_aspm/parameters/policy")
if not text:
return None
options, active = _bracketed(text)
return Tunable("pcie_aspm", "PCIe ASPM policy", options, active, "applies now; resets on reboot")
def _pcie_aspm_cmd(value: str) -> list[str]:
return ["/bin/sh", "-c", f'echo {shlex.quote(value)} > /sys/module/pcie_aspm/parameters/policy']
def _swappiness() -> Tunable | None:
text = _read("/proc/sys/vm/swappiness")
if text is None or not text.strip().isdigit():
return None
cur = text.strip()
options = ["0", "10", "30", "60", "100"]
if cur not in options:
options = sorted(set(options) | {cur}, key=int)
return Tunable("swappiness", "vm.swappiness", options, cur, "applies now; resets on reboot")
def _swappiness_cmd(value: str) -> list[str]:
return ["/bin/sh", "-c", f'echo {shlex.quote(value)} > /proc/sys/vm/swappiness']
def _thp() -> Tunable | None:
text = _read("/sys/kernel/mm/transparent_hugepage/enabled")
if not text:
return None
options, active = _bracketed(text)
return Tunable("thp", "Transparent HugePages", options, active, "applies now; resets on reboot")
def _thp_cmd(value: str) -> list[str]:
return ["/bin/sh", "-c", f'echo {shlex.quote(value)} > /sys/kernel/mm/transparent_hugepage/enabled']
_TUNABLES: dict[str, tuple[Callable[[], Tunable | None], Callable[[str], list[str]]]] = {
"cpu_governor": (_cpu_governor, _cpu_governor_cmd),
"nvidia_persistence": (_nvidia_persistence, _nvidia_persistence_cmd),
"pcie_aspm": (_pcie_aspm, _pcie_aspm_cmd),
"swappiness": (_swappiness, _swappiness_cmd),
"thp": (_thp, _thp_cmd),
}
# --- public API -----------------------------------------------------------------------
def get_tunable(fix_id: str) -> Tunable | None:
"""Live state (options + current value) for a fix id, or None if not applicable here."""
fns = _TUNABLES.get(fix_id)
return fns[0]() if fns else None
def apply_command(fix_id: str, value: str) -> list[str] | None:
"""The privileged command to set fix_id=value, or None if unknown/invalid.
The value is validated against the *live* options, so only a real, currently-available
setting can ever be turned into a command.
"""
fns = _TUNABLES.get(fix_id)
if not fns:
return None
state = fns[0]()
if state is None or value not in state.options:
return None
return fns[1](value)
def _elevate(cmd: list[str]) -> list[str]:
prog = shutil.which(cmd[0]) or cmd[0] # pkexec needs an absolute program path
cmd = [prog, *cmd[1:]]
if os.geteuid() == 0:
return cmd
if shutil.which("pkexec"):
return ["pkexec", *cmd]
if shutil.which("sudo"):
return ["sudo", *cmd]
return cmd # no escalation available — will likely fail, surfaced to the caller
def apply(fix_id: str, value: str) -> tuple[int, str]:
"""Apply fix_id=value via a single elevated command. Returns (exit_code, output)."""
cmd = apply_command(fix_id, value)
if cmd is None:
return (1, f"Unknown or unavailable setting: {fix_id}={value}")
try:
proc = subprocess.run(_elevate(cmd), capture_output=True, text=True, timeout=120)
return (proc.returncode, proc.stdout + proc.stderr)
except (subprocess.SubprocessError, OSError) as exc:
return (1, str(exc))
+271
View File
@@ -0,0 +1,271 @@
"""Gaming environment checks (M6): evaluate system settings that affect gaming
stability/performance and suggest the fix command read-only (D9).
Stdlib-only. Each check degrades gracefully (a missing file/tool yields no finding or an
info finding, never an exception). The pure ``evaluate_*`` helpers are split from the IO
that reads sysfs / runs tools, so they're unit-testable.
Several checks target the seed case directly: an RTX 3070 falling off the PCIe bus under
load (Xid 79). PCIe ASPM power-saving, NVIDIA persistence mode, and a power-saving CPU
governor are the usual contributors to that class of drop-off / stutter.
"""
from __future__ import annotations
import os
import re
import shutil
import subprocess
from pathlib import Path
from .health import INFO, OK, WARNING, Finding
_ORDER = {"critical": 0, WARNING: 1, INFO: 2, OK: 3}
def _read(path: str) -> str | None:
try:
return Path(path).read_text()
except OSError:
return None
# --- PCIe ASPM (seed-case relevant) ---------------------------------------------------
def _active_aspm(policy_text: str) -> str | None:
"""The active ASPM policy is the bracketed token, e.g. '[default] performance ...'."""
m = re.search(r"\[(\w+)\]", policy_text)
return m.group(1) if m else None
def evaluate_aspm(policy_text: str | None) -> Finding | None:
if not policy_text:
return None
active = _active_aspm(policy_text)
if active is None:
return None
if active in ("powersave", "powersupersave"):
return Finding(
WARNING, "PCIe", f"PCIe ASPM is in power-saving mode ({active})",
"Aggressive PCIe Active-State Power Management can cause the GPU to drop off the "
"bus under load (Xid 79) or stutter — the seed-case failure mode.",
"Set the policy to performance below (live), or for a permanent change add "
"`pcie_aspm=off` in GRUB, then `sudo update-grub` and reboot.",
fix="pcie_aspm",
)
if active == "performance":
return Finding(OK, "PCIe", "PCIe ASPM set to performance", "ASPM power-saving is disabled.",
fix="pcie_aspm")
return Finding(
INFO, "PCIe", f"PCIe ASPM policy: {active}",
"ASPM is left to the kernel/BIOS default.",
"If you see GPU bus-drop events (Xid 79), set the policy to performance below.",
fix="pcie_aspm",
)
def check_pcie_aspm() -> list[Finding]:
f = evaluate_aspm(_read("/sys/module/pcie_aspm/parameters/policy"))
return [f] if f else []
# --- NVIDIA persistence mode (seed-case relevant) -------------------------------------
def check_gpu_persistence() -> list[Finding]:
if shutil.which("nvidia-smi") is None:
return []
try:
proc = subprocess.run(
["nvidia-smi", "--query-gpu=persistence_mode", "--format=csv,noheader"],
capture_output=True, text=True, timeout=10,
)
except (subprocess.SubprocessError, OSError):
return []
state = proc.stdout.strip().splitlines()[0].strip() if proc.stdout.strip() else ""
if state.lower().startswith("disabled"):
return [Finding(
INFO, "GPU", "NVIDIA persistence mode is off",
"The driver unloads when no client is attached, adding latency on first GPU "
"access and churning state between game launches.",
"Enable it below (per-boot), or enable the `nvidia-persistenced` service to "
"make it permanent.",
fix="nvidia_persistence",
)]
if state.lower().startswith("enabled"):
return [Finding(OK, "GPU", "NVIDIA persistence mode on", "The driver stays resident.",
fix="nvidia_persistence")]
return []
# --- CPU governor ---------------------------------------------------------------------
def evaluate_governor(governors: set[str]) -> Finding | None:
if not governors:
return None
shown = ", ".join(sorted(governors))
if governors == {"performance"}:
return Finding(OK, "CPU", "CPU governor: performance", "CPUs run at full clocks under load.",
fix="cpu_governor")
if "powersave" in governors:
return Finding(
WARNING, "CPU", f"CPU governor set to power-saving ({shown})",
"A powersave governor caps CPU frequency and can bottleneck frame times.",
"Set it to performance below (or install GameMode to switch it per-game).",
fix="cpu_governor",
)
return Finding(
INFO, "CPU", f"CPU governor: {shown}",
"A dynamic governor scales with load; usually fine.",
"For the most consistent frame pacing, set performance below (or use GameMode).",
fix="cpu_governor",
)
def check_cpu_governor() -> list[Finding]:
govs: set[str] = set()
for p in Path("/sys/devices/system/cpu").glob("cpu*/cpufreq/scaling_governor"):
text = _read(str(p))
if text and text.strip():
govs.add(text.strip())
f = evaluate_governor(govs)
return [f] if f else []
# --- GameMode / MangoHud --------------------------------------------------------------
def check_gamemode() -> list[Finding]:
if shutil.which("gamemoderun") or shutil.which("gamemoded"):
return [Finding(
OK, "Tools", "Feral GameMode installed",
"GameMode can apply the performance governor and other tweaks while a game runs.",
)]
return [Finding(
INFO, "Tools", "GameMode not installed",
"GameMode auto-applies performance tweaks (governor, scheduling) for the duration of a game.",
"Install it: `sudo apt install gamemode`, then launch games with `gamemoderun %command%` "
"(or use a global Steam launch option).",
action="gamemode",
)]
def check_mangohud() -> list[Finding]:
if shutil.which("mangohud"):
return [Finding(OK, "Tools", "MangoHud available", "In-game FPS/temps/frametime overlay is installed.")]
return [Finding(
INFO, "Tools", "MangoHud not installed",
"MangoHud overlays live FPS, frame times, and temps in-game — handy for spotting stutter.",
"Install it: `sudo apt install mangohud`, then launch with `mangohud %command%`.",
action="mangohud",
)]
# --- vm.swappiness --------------------------------------------------------------------
def evaluate_swappiness(value: int) -> Finding:
if value > 10:
return Finding(
INFO, "Memory", f"vm.swappiness is high ({value})",
"A high swappiness lets the kernel swap out memory eagerly, which can cause "
"hitching during gaming on systems with ample RAM.",
"Lower it below (e.g. 10); applies immediately.",
fix="swappiness",
)
return Finding(OK, "Memory", f"vm.swappiness is {value}", "Swapping is conservative.",
fix="swappiness")
def check_swappiness() -> list[Finding]:
text = _read("/proc/sys/vm/swappiness")
if text is None or not text.strip().isdigit():
return []
return [evaluate_swappiness(int(text.strip()))]
# --- shader cache ---------------------------------------------------------------------
def evaluate_shader_cache(env: dict) -> Finding:
disabled = (
env.get("__GL_SHADER_DISK_CACHE") == "0"
or env.get("MESA_SHADER_CACHE_DISABLE", "").lower() in ("1", "true")
or env.get("MESA_GLSL_CACHE_DISABLE", "").lower() in ("1", "true")
)
if disabled:
return Finding(
WARNING, "GPU", "Shader disk cache is disabled",
"With the shader cache off, shaders recompile every run — a common cause of "
"in-game stutter, especially on first encounters.",
"Unset the disabling variable (e.g. remove `__GL_SHADER_DISK_CACHE=0` / "
"`MESA_SHADER_CACHE_DISABLE`) from your environment / launch options.",
)
return Finding(OK, "GPU", "Shader disk cache enabled", "Compiled shaders are cached between runs (default).")
def check_shader_cache() -> list[Finding]:
return [evaluate_shader_cache(os.environ)]
# --- transparent hugepages / CPU mitigations (only when notable) ----------------------
def check_thp() -> list[Finding]:
text = _read("/sys/kernel/mm/transparent_hugepage/enabled")
if not text:
return []
active = _active_aspm(text) # same '[token]' format
if active == "never":
return [Finding(
INFO, "Memory", "Transparent HugePages disabled (never)",
"Some workloads benefit from THP; 'madvise' lets apps opt in without the downsides of 'always'.",
"Optional: set 'madvise' below; applies immediately.",
fix="thp",
)]
return []
def check_mitigations() -> list[Finding]:
cmdline = _read("/proc/cmdline") or ""
if "mitigations=off" in cmdline:
return [Finding(
INFO, "CPU", "CPU security mitigations are disabled",
"`mitigations=off` recovers some CPU performance at the cost of CPU-vulnerability "
"protections — a deliberate trade-off, noted here for awareness.",
"Remove `mitigations=off` from the kernel cmdline to restore protections.",
)]
return []
# --- Proton versions (informational) --------------------------------------------------
def check_proton() -> list[Finding]:
from . import steam
try:
versions = steam.proton_versions()
except Exception:
versions = []
if not versions:
return []
return [Finding(
INFO, "Tools", f"Proton: {len(versions)} version(s) installed",
", ".join(versions),
"Steam picks the Proton version per game (Properties → Compatibility); "
"Proton Experimental often has the latest fixes.",
)]
# --- aggregate ------------------------------------------------------------------------
def run_gameenv_checks() -> list[Finding]:
"""Run all environment checks, sorted by severity (worst first)."""
findings: list[Finding] = []
findings += check_pcie_aspm()
findings += check_gpu_persistence()
findings += check_cpu_governor()
findings += check_gamemode()
findings += check_mangohud()
findings += check_swappiness()
findings += check_shader_cache()
findings += check_thp()
findings += check_mitigations()
findings += check_proton()
findings.sort(key=lambda f: _ORDER.get(f.severity, 9))
return findings
+2
View File
@@ -27,6 +27,8 @@ class Finding:
title: str title: str
detail: str = "" detail: str = ""
suggestion: str = "" suggestion: str = ""
action: str = "" # optional: id of an installable catalog component (for an Install button)
fix: str = "" # optional: id of an applyable runtime tunable (for an Apply dropdown, M6)
# --- NVIDIA Xid knowledge (the seed crash is Xid 79) -------------------------- # --- NVIDIA Xid knowledge (the seed crash is Xid 79) --------------------------
+59
View File
@@ -0,0 +1,59 @@
"""A pseudo-terminal running the host's shell (M12, Tier 3 — host side).
Spawns the user's login shell in a real PTY so interactive programs work over a shared
session: vim, top, tab-completion, colours, Ctrl-C, and `sudo` (which prompts inside the
PTY the host types that password locally, so it's never sent to the guest). Runs as the
host's own user — never elevated. Linux-only (uses `pty`/`termios`).
"""
from __future__ import annotations
import fcntl
import os
import pty
import signal
import struct
import termios
class PtySession:
def __init__(self, rows: int = 24, cols: int = 80):
self.pid, self.master_fd = pty.fork()
if self.pid == 0: # child: become the shell
os.environ["TERM"] = "xterm-256color"
shell = os.environ.get("SHELL", "/bin/bash")
try:
os.execvp(shell, [shell])
finally:
os._exit(1)
os.set_blocking(self.master_fd, False)
self.set_size(rows, cols)
def set_size(self, rows: int, cols: int) -> None:
try:
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0))
except OSError:
pass
def write(self, data: bytes) -> None:
try:
os.write(self.master_fd, data)
except OSError:
pass
def read(self, size: int = 65536) -> bytes:
try:
return os.read(self.master_fd, size)
except (BlockingIOError, OSError):
return b""
def close(self) -> None:
try:
os.close(self.master_fd)
except OSError:
pass
try:
os.kill(self.pid, signal.SIGHUP)
os.waitpid(self.pid, os.WNOHANG)
except (OSError, ChildProcessError, ProcessLookupError):
pass
+5 -1
View File
@@ -38,7 +38,9 @@ def read_status() -> dict | None:
return None return None
def start_background(interval: float | None = None, out: str | None = None) -> int | None: def start_background(
interval: float | None = None, out: str | None = None, game: str | None = None
) -> int | None:
"""Spawn a detached `record run`. Returns the child pid, or None if already running.""" """Spawn a detached `record run`. Returns the child pid, or None if already running."""
if running_pid(): if running_pid():
return None return None
@@ -48,6 +50,8 @@ def start_background(interval: float | None = None, out: str | None = None) -> i
cmd += ["--interval", str(interval)] cmd += ["--interval", str(interval)]
if out: if out:
cmd += ["--out", out] cmd += ["--out", out]
if game:
cmd += ["--game", game]
out_fh = open(config.SPAWN_LOG, "a") out_fh = open(config.SPAWN_LOG, "a")
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, cmd,
+5
View File
@@ -27,12 +27,14 @@ class Recorder:
backups: int = 10, backups: int = 10,
status_path=None, status_path=None,
sampler: Sampler | None = None, sampler: Sampler | None = None,
game: str | None = None,
) -> None: ) -> None:
self.interval = interval self.interval = interval
self.sampler = sampler or Sampler(available_sources()) self.sampler = sampler or Sampler(available_sources())
self.writer = CrashLogWriter(log_path, max_bytes, backups) self.writer = CrashLogWriter(log_path, max_bytes, backups)
self.log_path = Path(log_path) self.log_path = Path(log_path)
self.status_path = Path(status_path) if status_path else None self.status_path = Path(status_path) if status_path else None
self.game = game or None
self.samples = 0 self.samples = 0
self._stop = threading.Event() self._stop = threading.Event()
self._gpu_lost = False self._gpu_lost = False
@@ -43,6 +45,8 @@ class Recorder:
def run(self) -> None: def run(self) -> None:
self.writer.write_event("session-start", f"interval={self.interval:g}s") self.writer.write_event("session-start", f"interval={self.interval:g}s")
if self.game:
self.writer.write_event("game", self.game) # tag the focused-diagnostic target
self._write_status(running=True) self._write_status(running=True)
try: try:
while not self._stop.is_set(): while not self._stop.is_set():
@@ -81,6 +85,7 @@ class Recorder:
"samples": self.samples, "samples": self.samples,
"updated": time.time(), "updated": time.time(),
"gpu_lost": self._gpu_lost, "gpu_lost": self._gpu_lost,
"game": self.game,
} }
if sample is not None: if sample is not None:
data["latest"] = headline(sample) data["latest"] = headline(sample)
+362
View File
@@ -0,0 +1,362 @@
"""Steam library & game detection (M6, the Steam piece of D12 game detection).
Discovers a user's Steam installs, the library folders they've configured (Steam tracks
them all in ``libraryfolders.vdf``, so multiple libraries on multiple drives are covered),
and the games installed in each (one ``appmanifest_<appid>.acf`` per app). Stdlib only
no Steam tooling required, every probe degrades gracefully.
The set of libraries actually scanned is user-chosen (config ``steam_libraries``); nothing
is scanned until the user opts a library in. Scan results are cached in ``games.json`` so the
GUI can show the list instantly and the launch-time background scan can diff against it to
flag newly-installed games.
"""
from __future__ import annotations
import json
import os
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from ..config import GAMES_FILE, load_config
# Steam "apps" that aren't games: runtimes, Proton builds, redistributables. Filtered out of
# scans by appid (known IDs) or by name prefix (covers future Proton/runtime versions).
_TOOL_APPIDS = {
"228980", # Steamworks Common Redistributables
"1070560", # Steam Linux Runtime 1.0 (scout)
"1391110", # Steam Linux Runtime 2.0 (soldier)
"1628350", # Steam Linux Runtime 3.0 (sniper)
"1493710", # Proton Experimental
"2180100", # Proton Hotfix
"1826330", # Proton EasyAntiCheat Runtime
"1161040", # Proton BattlEye Runtime
}
_TOOL_NAME_PREFIXES = ("Proton", "Steam Linux Runtime", "Steamworks Common")
# Where Steam may be installed (native + Flatpak + Snap). Symlinks (~/.steam/steam) are
# resolved and de-duplicated by real path.
_ROOT_CANDIDATES = (
"~/.steam/steam",
"~/.steam/root",
"~/.local/share/Steam",
"~/.var/app/com.valvesoftware.Steam/data/Steam", # Flatpak
"~/snap/steam/common/.local/share/Steam", # Snap
)
@dataclass
class SteamLibrary:
path: str # the library root (contains a steamapps/ dir)
label: str = "" # Steam's label for the folder, if any
@dataclass
class Game:
appid: str
name: str
library: str # library path the game lives in
installdir: str # folder name under <library>/steamapps/common
size_bytes: int = 0
last_updated: int = 0 # epoch seconds (acf LastUpdated), 0 if unknown
# --- VDF (Valve Data Format) parsing --------------------------------------------------
# Minimal text-VDF reader: quoted "key" "value" pairs and "key" { ... } nesting. Enough
# for libraryfolders.vdf and appmanifest_*.acf; ignores #base/#include and unquoted tokens.
def _parse_vdf(text: str) -> dict:
pos = 0
n = len(text)
def skip_ws() -> None:
nonlocal pos
while pos < n:
c = text[pos]
if c in " \t\r\n":
pos += 1
elif c == "/" and pos + 1 < n and text[pos + 1] == "/": # // line comment
while pos < n and text[pos] != "\n":
pos += 1
else:
break
def read_string() -> str:
nonlocal pos
pos += 1 # opening quote
out = []
while pos < n:
c = text[pos]
if c == "\\" and pos + 1 < n:
nxt = text[pos + 1]
out.append({"n": "\n", "t": "\t", "\\": "\\", '"': '"'}.get(nxt, nxt))
pos += 2
continue
if c == '"':
pos += 1
break
out.append(c)
pos += 1
return "".join(out)
def parse_obj() -> dict:
nonlocal pos
obj: dict = {}
while True:
skip_ws()
if pos >= n or text[pos] == "}":
pos += 1 # consume closing brace (or run off the end)
return obj
if text[pos] != '"': # skip unquoted/unsupported tokens defensively
pos += 1
continue
key = read_string()
skip_ws()
if pos < n and text[pos] == "{":
pos += 1
obj[key] = parse_obj()
elif pos < n and text[pos] == '"':
obj[key] = read_string()
else: # malformed; bail on this key
obj[key] = ""
return obj
skip_ws()
if pos < n and text[pos] == '"':
root_key = read_string()
skip_ws()
if pos < n and text[pos] == "{":
pos += 1
return {root_key: parse_obj()}
return {}
def _read_vdf(path: Path) -> dict:
try:
return _parse_vdf(path.read_text(encoding="utf-8", errors="replace"))
except OSError:
return {}
# --- discovery ------------------------------------------------------------------------
def steam_roots() -> list[Path]:
"""Existing Steam install roots, de-duplicated by resolved path."""
seen: set[Path] = set()
roots: list[Path] = []
for cand in _ROOT_CANDIDATES:
p = Path(os.path.expanduser(cand))
if not p.exists():
continue
real = p.resolve()
if real in seen:
continue
seen.add(real)
roots.append(real)
return roots
def _libraryfolders_vdf(root: Path) -> Path | None:
for rel in ("steamapps/libraryfolders.vdf", "config/libraryfolders.vdf"):
p = root / rel
if p.exists():
return p
return None
def discover_libraries() -> list[SteamLibrary]:
"""Every Steam library folder configured on this machine, de-duplicated by real path.
Reads each install's ``libraryfolders.vdf`` (which lists all drives/folders), and
always includes the install root itself as a fallback.
"""
seen: set[Path] = set()
libs: list[SteamLibrary] = []
def add(path: Path, label: str = "") -> None:
if not (path / "steamapps").is_dir():
return
real = path.resolve()
if real in seen:
return
seen.add(real)
libs.append(SteamLibrary(path=str(real), label=label))
for root in steam_roots():
vdf = _libraryfolders_vdf(root)
folders = _read_vdf(vdf).get("libraryfolders", {}) if vdf else {}
if isinstance(folders, dict):
for entry in folders.values():
if isinstance(entry, dict) and entry.get("path"):
add(Path(entry["path"]), entry.get("label", ""))
add(root) # the install root is itself a library
return libs
# --- game scanning --------------------------------------------------------------------
def is_tool(appid: str, name: str) -> bool:
"""True for non-game Steam apps (runtimes, Proton, redistributables)."""
if appid in _TOOL_APPIDS:
return True
return name.startswith(_TOOL_NAME_PREFIXES)
def scan_library(library: str) -> list[Game]:
"""Games installed in one library, parsed from its appmanifest_*.acf files."""
steamapps = Path(library) / "steamapps"
games: list[Game] = []
try:
manifests = sorted(steamapps.glob("appmanifest_*.acf"))
except OSError:
return games
for manifest in manifests:
state = _read_vdf(manifest).get("AppState", {})
if not isinstance(state, dict):
continue
# Steam treats VDF keys case-insensitively (e.g. "SizeOnDisk" but "lastupdated").
state = {k.lower(): v for k, v in state.items()}
appid = state.get("appid", "")
name = state.get("name", "").strip()
if not appid or not name or is_tool(appid, name):
continue
games.append(Game(
appid=appid,
name=name,
library=str(library),
installdir=state.get("installdir", ""),
size_bytes=_int(state.get("sizeondisk")),
last_updated=_int(state.get("lastupdated")),
))
return games
def scan_games(libraries: list[str]) -> list[Game]:
"""All games across the given libraries, de-duplicated by appid, sorted by name."""
by_appid: dict[str, Game] = {}
for lib in libraries:
for game in scan_library(lib):
by_appid.setdefault(game.appid, game) # first library wins on duplicates
return sorted(by_appid.values(), key=lambda g: g.name.lower())
def _int(value) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
def proton_versions() -> list[str]:
"""Installed Proton compatibility-tool versions across all discovered libraries.
Proton builds are the appmanifests we filter out of game scans; here we surface them
for the M6 environment report. Returns unique names, newest-looking last.
"""
names: set[str] = set()
for lib in discover_libraries():
try:
manifests = sorted((Path(lib.path) / "steamapps").glob("appmanifest_*.acf"))
except OSError:
continue
for manifest in manifests:
state = _read_vdf(manifest).get("AppState", {})
if isinstance(state, dict):
state = {k.lower(): v for k, v in state.items()}
name = state.get("name", "").strip()
if name.startswith("Proton"):
names.add(name)
return sorted(names)
# --- config-driven selection ----------------------------------------------------------
def selected_library_paths(cfg: dict | None = None) -> list[str]:
"""Library paths the user has opted in to scanning (config ``steam_libraries``)."""
cfg = cfg or load_config()
paths = cfg.get("steam_libraries") or []
return [str(p) for p in paths]
# --- scan cache + new-game detection --------------------------------------------------
@dataclass
class ScanResult:
games: list[Game]
new_appids: list[str] # newly-installed since the last scan (badge fuel)
scanned_at: float
def load_cache() -> dict | None:
try:
return json.loads(GAMES_FILE.read_text())
except (OSError, ValueError):
return None
def _save_cache(games: list[Game], known: set[str], new: list[str], when: float) -> None:
GAMES_FILE.parent.mkdir(parents=True, exist_ok=True)
data = {
"scanned_at": when,
"known_appids": sorted(known),
"new_appids": new,
"games": [asdict(g) for g in games],
}
GAMES_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False))
def cached_games() -> list[Game]:
"""Games from the last scan (for instant display before a rescan finishes)."""
cache = load_cache()
if not cache:
return []
return [Game(**{k: g.get(k) for k in Game.__dataclass_fields__}) for g in cache.get("games", [])]
def rescan(cfg: dict | None = None) -> ScanResult:
"""Scan the selected libraries, diff against the cache, and persist the result.
Newly-installed games (appids never seen before) are reported in ``new_appids``. The
very first scan reports nothing as new (so the whole library isn't flagged at once);
unacknowledged new games carry forward until they're acknowledged or uninstalled.
"""
games = scan_games(selected_library_paths(cfg))
current = {g.appid for g in games}
prev = load_cache()
if prev is None:
known: set[str] = set(current) # first run: everything is "known", nothing new
new = []
else:
known = set(prev.get("known_appids", []))
carried = set(prev.get("new_appids", [])) & current # still-unacknowledged & installed
new = sorted((current - known) | carried)
known |= current
when = time.time()
_save_cache(games, known, new, when)
return ScanResult(games=games, new_appids=new, scanned_at=when)
def acknowledge_new() -> None:
"""Clear the new-game badge (called when the user views the games list)."""
cache = load_cache()
if not cache or not cache.get("new_appids"):
return
cache["new_appids"] = []
try:
GAMES_FILE.write_text(json.dumps(cache, indent=2, ensure_ascii=False))
except OSError:
pass
# --- formatting -----------------------------------------------------------------------
def human_size(num_bytes: int) -> str:
if num_bytes <= 0:
return ""
size = float(num_bytes)
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024 or unit == "TB":
return f"{size:.0f} {unit}" if unit in ("B", "KB") else f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
+81
View File
@@ -0,0 +1,81 @@
"""Results view for a guided diagnostic session (M6/D12): capture summary + findings."""
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QFont
from PySide6.QtWidgets import (
QDialog,
QFrame,
QHBoxLayout,
QLabel,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from ..render import render_summary
from .widgets import finding_card
class DiagnosticDialog(QDialog):
def __init__(self, result, parent=None) -> None:
super().__init__(parent)
self.setWindowTitle(f"Diagnostic — {result.game}" if result.game else "Diagnostic")
self.resize(660, 680)
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 16)
root.setSpacing(14)
title = QLabel(f"Diagnostic — {result.game}" if result.game else "Diagnostic")
title.setObjectName("PageTitle")
root.addWidget(title)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scroll.setStyleSheet("background: transparent;")
body = QWidget()
col = QVBoxLayout(body)
col.setContentsMargins(0, 0, 0, 0)
col.setSpacing(10)
col.setAlignment(Qt.AlignmentFlag.AlignTop)
# Capture window summary (peaks / events / last samples) — monospace for the columns.
cap_head = QLabel("Capture")
cap_head.setStyleSheet("font-weight: 700; background: transparent;")
col.addWidget(cap_head)
summary = QLabel(render_summary(result.summary))
summary.setObjectName("Report")
summary.setFont(QFont("monospace"))
summary.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
summary.setWordWrap(False)
summary.setStyleSheet(
"background: #0d0f13; color: #cfd3da; border: 1px solid #2a2f39; "
"border-radius: 8px; padding: 10px;"
)
col.addWidget(summary)
find_head = QLabel(f"Findings ({len(result.findings)})")
find_head.setStyleSheet("font-weight: 700; background: transparent;")
col.addWidget(find_head)
if result.findings:
for finding in result.findings:
col.addWidget(finding_card(finding))
else:
none = QLabel("No findings.")
none.setObjectName("Muted")
col.addWidget(none)
scroll.setWidget(body)
root.addWidget(scroll, 1)
buttons = QHBoxLayout()
buttons.addStretch(1)
close = QPushButton("Close")
close.setObjectName("PrimaryButton")
close.clicked.connect(self.accept)
buttons.addWidget(close)
root.addLayout(buttons)
+156
View File
@@ -0,0 +1,156 @@
"""Environment page (M6 in the GUI): runs the gaming-environment checks as findings cards."""
from __future__ import annotations
import threading
import time
from PySide6.QtCore import Qt, QTimer, Signal
from PySide6.QtWidgets import (
QFrame,
QHBoxLayout,
QLabel,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from .widgets import finding_card
def _fail_reason(out: str) -> str:
"""Turn the failed command's output into a short, human reason."""
low = (out or "").lower()
if "not authorized" in low or "dismissed" in low or "authentication" in low:
return "cancelled at the password prompt"
if "operation not permitted" in low or "invalid argument" in low or "permission denied" in low:
return "the system rejected the change (it may be locked by BIOS/kernel)"
last = next((ln.strip() for ln in reversed((out or "").splitlines()) if ln.strip()), "")
return (last[:80] or "no privileges, or cancelled")
class EnvironmentPage(QWidget):
_result = Signal(object) # list[Finding]
_action_done = Signal(object) # (label, rc, output) — install or apply finished
def __init__(self) -> None:
super().__init__()
self.setObjectName("Page")
self._result.connect(self._render_findings)
self._action_done.connect(self._on_action_done)
self._busy = False
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18)
root.setSpacing(16)
header = QHBoxLayout()
title = QLabel("Environment")
title.setObjectName("PageTitle")
header.addWidget(title)
header.addStretch(1)
self._status = QLabel("")
self._status.setObjectName("Muted")
header.addWidget(self._status)
self._run_btn = QPushButton("Run checks")
self._run_btn.setObjectName("PrimaryButton")
self._run_btn.clicked.connect(self._run)
header.addWidget(self._run_btn)
root.addLayout(header)
intro = QLabel(
"System settings that affect gaming stability and performance, with the suggested "
"fix command. RigDoctor only reports — it never changes anything."
)
intro.setObjectName("Muted")
intro.setWordWrap(True)
root.addWidget(intro)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scroll.setStyleSheet("background: transparent;")
self._container = QWidget()
self._list = QVBoxLayout(self._container)
self._list.setContentsMargins(0, 0, 0, 0)
self._list.setSpacing(10)
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll.setWidget(self._container)
root.addWidget(scroll, 1)
QTimer.singleShot(350, self._run) # auto-run shortly after the window opens
def _run(self) -> None:
self._run_btn.setEnabled(False)
self._status.setText("Checking environment…")
threading.Thread(target=self._work, daemon=True).start()
def _work(self) -> None:
from ..core.gameenv import run_gameenv_checks
try:
findings = run_gameenv_checks()
except Exception:
findings = None
self._result.emit(findings)
def _render_findings(self, findings) -> None:
self._run_btn.setEnabled(True)
if findings is None: # check failed — keep current results
self._status.setText("check failed")
return
while self._list.count():
item = self._list.takeAt(0)
w = item.widget()
if w is not None:
w.deleteLater()
crit = sum(1 for f in findings if f.severity == "critical")
warn = sum(1 for f in findings if f.severity == "warning")
self._status.setText(
f"{crit} critical · {warn} warning · {len(findings)} checks · "
f"{time.strftime('%H:%M:%S')}"
)
for finding in findings:
self._list.addWidget(finding_card(finding, on_install=self._install, on_apply=self._apply))
self._list.addStretch(1)
def _install(self, component) -> None:
if self._busy:
return
self._busy = True
self._run_btn.setEnabled(False)
self._status.setText(f"Installing {component.name}… (may prompt for your password)")
threading.Thread(target=self._work_install, args=(component,), daemon=True).start()
def _work_install(self, component) -> None:
from ..core import installer
rc, out = installer.install_packages(list(component.apt))
self._action_done.emit((component.name, rc, out))
def _apply(self, fix_id: str, value: str) -> None:
if self._busy:
return
self._busy = True
self._run_btn.setEnabled(False)
self._status.setText(f"Applying {value}… (may prompt for your password)")
threading.Thread(target=self._work_apply, args=(fix_id, value), daemon=True).start()
def _work_apply(self, fix_id: str, value: str) -> None:
from ..core import fixes
rc, out = fixes.apply(fix_id, value)
self._action_done.emit((value, rc, out))
def _on_action_done(self, result) -> None:
label, rc, out = result
self._busy = False
if rc == 0:
self._status.setText(f"{label} applied — re-checking…")
self._run() # re-run so the finding reflects the new state
else:
self._run_btn.setEnabled(True)
self._status.setText(f"'{label}' failed — {_fail_reason(out)}")
+360
View File
@@ -0,0 +1,360 @@
"""Games page (M6 in the GUI): pick Steam libraries and browse detected games.
Libraries are opt-in the user checks which ones to scan. The list is loaded from the
cache instantly, then a background rescan refreshes it and flags games installed since the
last scan (a "NEW" badge here + a count on the sidebar nav).
"""
from __future__ import annotations
import os
import threading
import time
from PySide6.QtCore import Qt, QTimer, Signal
from PySide6.QtWidgets import (
QCheckBox,
QFrame,
QHBoxLayout,
QLabel,
QMessageBox,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from ..config import load_config, update_config
from .diagnostic_dialog import DiagnosticDialog
from .theme import ACCENT, GOOD, MUTED
def _game_row(name: str, sublabel: str, size: str, is_new: bool, on_diagnose=None) -> QFrame:
card = QFrame()
card.setObjectName("Card")
h = QHBoxLayout(card)
h.setContentsMargins(16, 10, 16, 10)
h.setSpacing(10)
left = QVBoxLayout()
left.setSpacing(2)
title = QLabel(name)
title.setStyleSheet("font-weight: 600; background: transparent;")
title.setWordWrap(True)
left.addWidget(title)
if sublabel:
sub = QLabel(sublabel)
sub.setObjectName("Muted")
left.addWidget(sub)
h.addLayout(left, 1)
if is_new:
badge = QLabel("NEW")
badge.setStyleSheet(
f"color: {GOOD}; border: 1px solid {GOOD}; border-radius: 6px; "
f"padding: 1px 6px; font-weight: 700; background: transparent;"
)
h.addWidget(badge, 0, Qt.AlignmentFlag.AlignVCenter)
size_label = QLabel(size)
size_label.setObjectName("Muted")
size_label.setMinimumWidth(80)
size_label.setAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
h.addWidget(size_label, 0)
if on_diagnose is not None:
diag_btn = QPushButton("Run Diagnostic")
diag_btn.setObjectName("ActionButton")
diag_btn.setCursor(Qt.CursorShape.PointingHandCursor)
diag_btn.clicked.connect(lambda: on_diagnose(name))
h.addWidget(diag_btn, 0)
return card
class GamesPage(QWidget):
_libraries_ready = Signal(object) # list[dict(path, label, count, selected)]
_scanned = Signal(object) # steam.ScanResult
new_count_changed = Signal(int) # newly-installed game count (for the nav badge)
_diag_done = Signal(object) # DiagnosticResult — focused capture analyzed
def __init__(self) -> None:
super().__init__()
self.setObjectName("Page")
self._libraries_ready.connect(self._render_libraries)
self._scanned.connect(self._render_games)
self._diag_done.connect(self._on_diag_done)
self._busy = False
self._new_appids: set[str] = set()
self._diag_game: str | None = None
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18)
root.setSpacing(16)
header = QHBoxLayout()
title = QLabel("Games")
title.setObjectName("PageTitle")
header.addWidget(title)
header.addStretch(1)
self._status = QLabel("")
self._status.setObjectName("Muted")
header.addWidget(self._status)
self._rescan_btn = QPushButton("Rescan")
self._rescan_btn.setObjectName("PrimaryButton")
self._rescan_btn.clicked.connect(self.refresh)
header.addWidget(self._rescan_btn)
root.addLayout(header)
# In-progress diagnostic banner (hidden until a focused capture is running).
self._banner = QFrame()
self._banner.setObjectName("Card")
self._banner.setStyleSheet(f"#Card {{ border: 1px solid {ACCENT}; }}")
banner_h = QHBoxLayout(self._banner)
banner_h.setContentsMargins(16, 10, 16, 10)
banner_h.setSpacing(10)
self._banner_label = QLabel("")
self._banner_label.setStyleSheet(f"color: {ACCENT}; font-weight: 700; background: transparent;")
banner_h.addWidget(self._banner_label, 1)
self._finish_btn = QPushButton("Finish & analyze")
self._finish_btn.setObjectName("ActionButton")
self._finish_btn.clicked.connect(self._finish_diagnostic)
banner_h.addWidget(self._finish_btn)
self._discard_btn = QPushButton("Discard")
self._discard_btn.clicked.connect(self._discard_diagnostic)
banner_h.addWidget(self._discard_btn)
self._banner.hide()
root.addWidget(self._banner)
self._diag_timer = QTimer(self)
self._diag_timer.setInterval(1000)
self._diag_timer.timeout.connect(self._poll_diag)
# Libraries (opt-in checkboxes)
lib_card = QFrame()
lib_card.setObjectName("Card")
lib_v = QVBoxLayout(lib_card)
lib_v.setContentsMargins(16, 12, 16, 12)
lib_v.setSpacing(6)
lib_head = QLabel("Steam libraries")
lib_head.setStyleSheet("font-weight: 700; background: transparent;")
lib_v.addWidget(lib_head)
self._lib_box = QVBoxLayout()
self._lib_box.setSpacing(6)
lib_v.addLayout(self._lib_box)
self._lib_hint = QLabel("Looking for Steam libraries…")
self._lib_hint.setObjectName("Muted")
self._lib_hint.setWordWrap(True)
lib_v.addWidget(self._lib_hint)
root.addWidget(lib_card)
# Games list
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scroll.setStyleSheet("background: transparent;")
self._container = QWidget()
self._list = QVBoxLayout(self._container)
self._list.setContentsMargins(0, 0, 0, 0)
self._list.setSpacing(8)
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll.setWidget(self._container)
root.addWidget(scroll, 1)
self._load_cached() # instant display from the last scan
QTimer.singleShot(400, self.refresh) # then rescan in the background on launch
# --- loading ----------------------------------------------------------------------
def _load_cached(self) -> None:
from ..core import steam
cache = steam.load_cache() or {}
self._new_appids = set(cache.get("new_appids", []))
games = steam.cached_games()
if games:
self._populate_games(games, self._new_appids)
self.new_count_changed.emit(len(self._new_appids))
def refresh(self) -> None:
if self._busy:
return
self._busy = True
self._rescan_btn.setEnabled(False)
self._status.setText("Scanning Steam libraries…")
threading.Thread(target=self._work, daemon=True).start()
def _work(self) -> None:
from ..core import steam
try:
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
libs = [
{"path": lib.path, "label": lib.label, "selected": lib.path in selected,
"count": len(steam.scan_library(lib.path))}
for lib in steam.discover_libraries()
]
self._libraries_ready.emit(libs)
self._scanned.emit(steam.rescan())
except Exception:
self._scanned.emit(None)
# --- rendering --------------------------------------------------------------------
def _render_libraries(self, libs) -> None:
while self._lib_box.count():
item = self._lib_box.takeAt(0)
w = item.widget()
if w is not None:
w.deleteLater()
if not libs:
self._lib_hint.setText("No Steam libraries detected. Is Steam installed?")
self._lib_hint.show()
return
self._lib_hint.hide()
for lib in libs:
label = lib["path"]
if lib["label"]:
label += f" [{lib['label']}]"
cb = QCheckBox(f"{label} · {lib['count']} games")
cb.setChecked(lib["selected"])
cb.toggled.connect(lambda checked, p=lib["path"]: self._toggle_library(p, checked))
self._lib_box.addWidget(cb)
def _toggle_library(self, path: str, checked: bool) -> None:
selected = {os.path.realpath(p) for p in (load_config().get("steam_libraries") or [])}
if checked:
selected.add(os.path.realpath(path))
else:
selected.discard(os.path.realpath(path))
update_config(steam_libraries=sorted(selected))
self.refresh()
def _render_games(self, result) -> None:
self._busy = False
self._rescan_btn.setEnabled(True)
if result is None:
self._status.setText("scan failed")
return
self._new_appids = set(result.new_appids)
self._populate_games(result.games, self._new_appids)
new = len(self._new_appids)
suffix = f" · {new} new" if new else ""
self._status.setText(
f"{len(result.games)} games · {time.strftime('%H:%M:%S')}{suffix}"
)
self.new_count_changed.emit(new)
def _populate_games(self, games, new_appids: set[str]) -> None:
from ..core import steam
while self._list.count():
item = self._list.takeAt(0)
w = item.widget()
if w is not None:
w.deleteLater()
if not games:
empty = QLabel(
"No games to show yet — check a Steam library above to scan it for games."
)
empty.setObjectName("Muted")
empty.setWordWrap(True)
self._list.addWidget(empty)
self._list.addStretch(1)
return
for g in games:
self._list.addWidget(_game_row(
g.name,
os.path.basename(g.library.rstrip("/")) or g.library,
steam.human_size(g.size_bytes),
g.appid in new_appids,
on_diagnose=self._start_diagnostic,
))
self._list.addStretch(1)
# --- guided diagnostic (M6/D12) ---------------------------------------------------
def _start_diagnostic(self, name: str) -> None:
from ..core import diagnostic
if diagnostic.is_running():
QMessageBox.information(
self, "RigDoctor",
"A capture is already running — finish or discard it first.")
return
if diagnostic.start(game=name) is None:
QMessageBox.warning(self, "RigDoctor", "Couldn't start the capture.")
return
self._diag_game = name
self._banner_label.setText(f"● Recording — {name} · starting…")
self._finish_btn.setEnabled(True)
self._discard_btn.setEnabled(True)
self._banner.show()
self._diag_timer.start()
def _poll_diag(self) -> None:
from ..core import diagnostic
status = diagnostic.active()
if not status:
self._diag_timer.stop() # recorder exited on its own
return
samples = status.get("samples", 0)
lost = " · GPU-lost seen" if status.get("gpu_lost") else ""
game = status.get("game") or self._diag_game or ""
self._banner_label.setText(f"● Recording — {game} · {samples} samples{lost}")
def _finish_diagnostic(self) -> None:
self._diag_timer.stop()
self._finish_btn.setEnabled(False)
self._discard_btn.setEnabled(False)
self._banner_label.setText("Analyzing… (running the health report)")
threading.Thread(target=self._work_finish, daemon=True).start()
def _work_finish(self) -> None:
from ..core import diagnostic
try:
result = diagnostic.finish()
except Exception:
result = None
self._diag_done.emit(result)
def _on_diag_done(self, result) -> None:
self._banner.hide()
self._finish_btn.setEnabled(True)
self._discard_btn.setEnabled(True)
if result is None:
QMessageBox.warning(self, "RigDoctor", "The diagnostic couldn't be analyzed.")
return
DiagnosticDialog(result, self).exec()
def _discard_diagnostic(self) -> None:
from ..core import reccontrol
self._diag_timer.stop()
reccontrol.stop_background()
self._banner.hide()
# --- nav badge integration --------------------------------------------------------
def showEvent(self, event) -> None: # noqa: N802 (Qt override)
# Viewing the list acknowledges the new games: clear the sidebar badge. The NEW
# tags stay on the rows for this session so the user can still spot them.
super().showEvent(event)
if self._new_appids:
from ..core import steam
threading.Thread(target=steam.acknowledge_new, daemon=True).start()
self.new_count_changed.emit(0)
# Reflect a capture that's still running (e.g. started earlier, navigated back).
from ..core import diagnostic
if diagnostic.is_running():
status = diagnostic.active() or {}
self._diag_game = status.get("game") or self._diag_game
self._banner.show()
if not self._diag_timer.isActive():
self._diag_timer.start()
+2 -35
View File
@@ -16,40 +16,7 @@ from PySide6.QtWidgets import (
QWidget, QWidget,
) )
from .theme import ACCENT, CRIT, GOOD, MUTED, WARN from .widgets import finding_card
_SEV = {
"critical": ("CRITICAL", CRIT),
"warning": ("WARNING", WARN),
"info": ("INFO", MUTED),
"ok": ("OK", GOOD),
}
def _finding_widget(finding) -> QFrame:
label, color = _SEV.get(finding.severity, ("?", MUTED))
card = QFrame()
card.setObjectName("Card")
v = QVBoxLayout(card)
v.setContentsMargins(16, 12, 16, 12)
v.setSpacing(4)
head = QLabel(f"{label} · {finding.category}: {finding.title}")
head.setStyleSheet(f"color: {color}; font-weight: 700; background: transparent;")
head.setWordWrap(True)
v.addWidget(head)
if finding.detail:
detail = QLabel(finding.detail)
detail.setObjectName("Muted")
detail.setWordWrap(True)
v.addWidget(detail)
if finding.suggestion:
suggestion = QLabel(f"{finding.suggestion}")
suggestion.setStyleSheet(f"color: {ACCENT}; background: transparent;")
suggestion.setWordWrap(True)
v.addWidget(suggestion)
return card
class HealthPage(QWidget): class HealthPage(QWidget):
@@ -125,5 +92,5 @@ class HealthPage(QWidget):
f"{time.strftime('%H:%M:%S')}" f"{time.strftime('%H:%M:%S')}"
) )
for finding in findings: for finding in findings:
self._list.addWidget(_finding_widget(finding)) self._list.addWidget(finding_card(finding))
self._list.addStretch(1) self._list.addStretch(1)
-145
View File
@@ -1,145 +0,0 @@
"""Inventory page (M5 in the GUI): system inventory with copy/save + admin re-collect."""
from __future__ import annotations
import os
import threading
from PySide6.QtCore import Qt, QTimer, Signal
from PySide6.QtWidgets import (
QApplication,
QFileDialog,
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from ..core import inventory
from .theme import MUTED
def _section_card(section) -> QFrame:
card = QFrame()
card.setObjectName("Card")
layout = QVBoxLayout(card)
layout.setContentsMargins(16, 12, 16, 12)
layout.setSpacing(6)
title = QLabel(section.title)
title.setStyleSheet("font-weight: 700; background: transparent;")
layout.addWidget(title)
grid = QGridLayout()
grid.setColumnStretch(1, 1)
grid.setHorizontalSpacing(14)
grid.setVerticalSpacing(4)
for row, (key, value) in enumerate(section.items):
k = QLabel(key)
k.setObjectName("Muted")
v = QLabel(value)
v.setWordWrap(True)
v.setStyleSheet("background: transparent;")
grid.addWidget(k, row, 0)
grid.addWidget(v, row, 1)
layout.addLayout(grid)
return card
class InventoryPage(QWidget):
_result = Signal(object) # list[Section]
def __init__(self) -> None:
super().__init__()
self.setObjectName("Page")
self._sections: list = []
self._result.connect(self._render)
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18)
root.setSpacing(16)
header = QHBoxLayout()
title = QLabel("Inventory")
title.setObjectName("PageTitle")
header.addWidget(title)
header.addStretch(1)
self._status = QLabel("")
self._status.setObjectName("Muted")
header.addWidget(self._status)
self._copy_btn = QPushButton("Copy Markdown")
self._copy_btn.clicked.connect(self._copy)
header.addWidget(self._copy_btn)
self._save_btn = QPushButton("Save…")
self._save_btn.clicked.connect(self._save)
header.addWidget(self._save_btn)
self._refresh_btn = QPushButton("Refresh")
self._refresh_btn.setObjectName("PrimaryButton")
self._refresh_btn.clicked.connect(self._run)
header.addWidget(self._refresh_btn)
root.addLayout(header)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scroll.setStyleSheet("background: transparent;")
self._container = QWidget()
self._list = QVBoxLayout(self._container)
self._list.setContentsMargins(0, 0, 0, 0)
self._list.setSpacing(12)
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll.setWidget(self._container)
root.addWidget(scroll, 1)
QTimer.singleShot(300, self._run)
def _run(self) -> None:
self._busy("Collecting…")
threading.Thread(target=self._work, daemon=True).start()
def _work(self) -> None:
try:
sections = inventory.collect()
except Exception:
sections = []
self._result.emit(sections)
def _busy(self, text: str) -> None:
self._status.setText(text)
for b in (self._refresh_btn, self._copy_btn, self._save_btn):
b.setEnabled(False)
def _render(self, sections) -> None:
self._refresh_btn.setEnabled(True)
self._copy_btn.setEnabled(True)
self._save_btn.setEnabled(True)
if sections is None: # collection failed — keep current
self._status.setText("collection failed")
return
self._sections = sections
while self._list.count():
item = self._list.takeAt(0)
w = item.widget()
if w is not None:
w.deleteLater()
for section in sections:
self._list.addWidget(_section_card(section))
self._list.addStretch(1)
self._status.setText("")
def _copy(self) -> None:
if self._sections:
QApplication.clipboard().setText(inventory.render_markdown(self._sections))
self._status.setText("copied as Markdown")
def _save(self) -> None:
if not self._sections:
return
path, _ = QFileDialog.getSaveFileName(self, "Save inventory", "rigdoctor-inventory.md", "Markdown (*.md)")
if path:
with open(path, "w", encoding="utf-8") as f:
f.write(inventory.render_markdown(self._sections))
self._status.setText(f"saved {os.path.basename(path)}")
+20 -9
View File
@@ -28,8 +28,9 @@ from .. import __version__
from ..config import load_config from ..config import load_config
from ..core import alerts, elevation, updates from ..core import alerts, elevation, updates
from .dashboard import Dashboard from .dashboard import Dashboard
from .environment_page import EnvironmentPage
from .games_page import GamesPage
from .health_page import HealthPage from .health_page import HealthPage
from .inventory_page import InventoryPage
from .notifications_page import NotificationsPage from .notifications_page import NotificationsPage
from .recorder_page import RecorderPage from .recorder_page import RecorderPage
from .setup_page import SetupPage from .setup_page import SetupPage
@@ -37,7 +38,7 @@ from .share_page import SharePage
from .theme import ACCENT, GOOD, MUTED from .theme import ACCENT, GOOD, MUTED
from .worker import SamplerWorker from .worker import SamplerWorker
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Setup", "Inventory", "Notifications", "Share"] _NAV_ITEMS = ["Dashboard", "Logs", "Health", "Games", "Environment", "Setup", "Notifications", "Share"]
class MainWindow(QMainWindow): class MainWindow(QMainWindow):
@@ -67,18 +68,21 @@ class MainWindow(QMainWindow):
self.dashboard = Dashboard() self.dashboard = Dashboard()
self.recorder_page = RecorderPage() self.recorder_page = RecorderPage()
self.health_page = HealthPage() self.health_page = HealthPage()
self.games_page = GamesPage()
self.games_page.new_count_changed.connect(self._set_games_badge)
self.environment_page = EnvironmentPage()
self.setup_page = SetupPage() self.setup_page = SetupPage()
self.inventory_page = InventoryPage()
self.notifications_page = NotificationsPage() self.notifications_page = NotificationsPage()
self.notifications_page.changed.connect(self._apply_alert_settings) self.notifications_page.changed.connect(self._apply_alert_settings)
self.share_page = SharePage() self.share_page = SharePage()
self._stack.addWidget(self.dashboard) # 0 Dashboard self._stack.addWidget(self.dashboard) # 0 Dashboard
self._stack.addWidget(self.recorder_page) # 1 Logs self._stack.addWidget(self.recorder_page) # 1 Logs
self._stack.addWidget(self.health_page) # 2 Health self._stack.addWidget(self.health_page) # 2 Health
self._stack.addWidget(self.setup_page) # 3 Setup self._stack.addWidget(self.games_page) # 3 Games
self._stack.addWidget(self.inventory_page) # 4 Inventory self._stack.addWidget(self.environment_page) # 4 Environment
self._stack.addWidget(self.notifications_page) # 5 Notifications self._stack.addWidget(self.setup_page) # 5 Setup
self._stack.addWidget(self.share_page) # 6 Share self._stack.addWidget(self.notifications_page) # 6 Notifications
self._stack.addWidget(self.share_page) # 7 Share
content_layout.addWidget(self._stack) content_layout.addWidget(self._stack)
layout.addWidget(self._build_sidebar()) layout.addWidget(self._build_sidebar())
@@ -138,6 +142,7 @@ class MainWindow(QMainWindow):
group = QButtonGroup(self) group = QButtonGroup(self)
group.setExclusive(True) group.setExclusive(True)
self._nav_buttons: dict[str, QPushButton] = {}
for i, name in enumerate(_NAV_ITEMS): for i, name in enumerate(_NAV_ITEMS):
btn = QPushButton(name) btn = QPushButton(name)
btn.setObjectName("NavButton") btn.setObjectName("NavButton")
@@ -147,6 +152,7 @@ class MainWindow(QMainWindow):
btn.clicked.connect(lambda _checked, idx=i: self._stack.setCurrentIndex(idx)) btn.clicked.connect(lambda _checked, idx=i: self._stack.setCurrentIndex(idx))
group.addButton(btn, i) group.addButton(btn, i)
v.addWidget(btn) v.addWidget(btn)
self._nav_buttons[name] = btn
v.addStretch(1) v.addStretch(1)
live = QLabel(f'<span style="color:{ACCENT};">●</span> <span style="color:{MUTED};">Live</span>') live = QLabel(f'<span style="color:{ACCENT};">●</span> <span style="color:{MUTED};">Live</span>')
@@ -228,9 +234,14 @@ class MainWindow(QMainWindow):
self._elevated.emit() self._elevated.emit()
def _on_elevated(self) -> None: def _on_elevated(self) -> None:
# Re-run Health and Inventory now that root-only data (SMART/dmidecode) is available. # Re-run Health now that root-only SMART data is available. (dmidecode is still
# collected and used by the relay guest view + the CLI `rigdoctor inventory`.)
self.health_page._run() self.health_page._run()
self.inventory_page._run()
def _set_games_badge(self, count: int) -> None:
btn = self._nav_buttons.get("Games")
if btn is not None:
btn.setText(f"Games ● {count}" if count > 0 else "Games")
def _apply_alert_settings(self) -> None: def _apply_alert_settings(self) -> None:
cfg = load_config() cfg = load_config()
+173 -50
View File
@@ -1,12 +1,20 @@
"""Share page (M12, Tier 2 over the relay): host or join a read-only shared session.""" """Share page (M12): host or join a shared session over the relay.
Guest sees the host's live sensors + health + inventory (read-only). If the host enables it,
a full **PTY terminal** is shared: the guest types and the commands run on the host (as the
host's user), the host reads along, and the host can type too — e.g. a sudo password, which
stays local and is never sent to the guest.
"""
from __future__ import annotations from __future__ import annotations
import base64
import json import json
from PySide6.QtCore import Qt, QTimer, QUrl from PySide6.QtCore import Qt, QSocketNotifier, QTimer, QUrl
from PySide6.QtWebSockets import QWebSocket from PySide6.QtWebSockets import QWebSocket
from PySide6.QtWidgets import ( from PySide6.QtWidgets import (
QCheckBox,
QFrame, QFrame,
QHBoxLayout, QHBoxLayout,
QLabel, QLabel,
@@ -19,14 +27,20 @@ from PySide6.QtWidgets import (
from ..config import load_config, load_token from ..config import load_config, load_token
from ..core import share from ..core import share
from ..core.pty_session import PtySession
from ..core.sampler import Sampler from ..core.sampler import Sampler
from ..core.sources import available_sources from ..core.sources import available_sources
from .terminal_widget import TerminalView
def _relay_url() -> str: def _relay_url() -> str:
return load_config().get("relay_url", "wss://rigdoctor.jesseyvanofferen.com").rstrip("/") return load_config().get("relay_url", "wss://rigdoctor.jesseyvanofferen.com").rstrip("/")
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _card(title: str) -> tuple[QFrame, QVBoxLayout]: def _card(title: str) -> tuple[QFrame, QVBoxLayout]:
card = QFrame() card = QFrame()
card.setObjectName("Card") card.setObjectName("Card")
@@ -46,6 +60,8 @@ class SharePage(QWidget):
self._sampler = Sampler(available_sources()) self._sampler = Sampler(available_sources())
self._host_ws: QWebSocket | None = None self._host_ws: QWebSocket | None = None
self._guest_ws: QWebSocket | None = None self._guest_ws: QWebSocket | None = None
self._pty: PtySession | None = None
self._pty_notifier: QSocketNotifier | None = None
self._last_report = None self._last_report = None
self._last_inv = None self._last_inv = None
self._timer = QTimer(self) self._timer = QTimer(self)
@@ -54,18 +70,22 @@ class SharePage(QWidget):
root = QVBoxLayout(self) root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18) root.setContentsMargins(20, 18, 20, 18)
root.setSpacing(16) root.setSpacing(14)
title = QLabel("Share") title = QLabel("Share")
title.setObjectName("PageTitle") title.setObjectName("PageTitle")
root.addWidget(title) root.addWidget(title)
root.addWidget(self._build_host())
root.addWidget(self._build_guest(), 1)
# Host # ------------------------------------------------------------------ host
host_card, hv = _card("Start a shared session") def _build_host(self) -> QFrame:
card, v = _card("Start a shared session")
self._host_status = QLabel("Let someone with an account view your machine, read-only.") self._host_status = QLabel("Let someone with an account view your machine, read-only.")
self._host_status.setObjectName("Muted") self._host_status.setObjectName("Muted")
self._host_status.setWordWrap(True) self._host_status.setWordWrap(True)
hv.addWidget(self._host_status) v.addWidget(self._host_status)
hrow = QHBoxLayout()
row = QHBoxLayout()
self._start_btn = QPushButton("Start shared session") self._start_btn = QPushButton("Start shared session")
self._start_btn.setObjectName("PrimaryButton") self._start_btn.setObjectName("PrimaryButton")
self._start_btn.clicked.connect(self._start_host) self._start_btn.clicked.connect(self._start_host)
@@ -75,45 +95,25 @@ class SharePage(QWidget):
self._code_label = QLabel("") self._code_label = QLabel("")
self._code_label.setStyleSheet("font-weight:700; font-size:18px; color:#38bdf8; background:transparent;") self._code_label.setStyleSheet("font-weight:700; font-size:18px; color:#38bdf8; background:transparent;")
self._code_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse) self._code_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
hrow.addWidget(self._start_btn) row.addWidget(self._start_btn)
hrow.addWidget(self._stop_btn) row.addWidget(self._stop_btn)
hrow.addSpacing(12) row.addSpacing(12)
hrow.addWidget(self._code_label) row.addWidget(self._code_label)
hrow.addStretch(1) row.addStretch(1)
hv.addLayout(hrow) v.addLayout(row)
root.addWidget(host_card)
# Guest self._allow_term = QCheckBox("Allow remote terminal — the guest runs commands as your user (you read along; you can type too, e.g. a sudo password)")
guest_card, gv = _card("Join a shared session") self._allow_term.setStyleSheet("color:#fb923c; background:transparent;")
grow = QHBoxLayout() self._allow_term.toggled.connect(self._toggle_terminal)
self._code_input = QLineEdit() v.addWidget(self._allow_term)
self._code_input.setPlaceholderText("Enter share code")
self._code_input.setMaxLength(6)
self._code_input.setFixedWidth(160)
self._join_btn = QPushButton("Join")
self._join_btn.setObjectName("PrimaryButton")
self._join_btn.clicked.connect(self._join)
self._leave_btn = QPushButton("Leave")
self._leave_btn.setEnabled(False)
self._leave_btn.clicked.connect(self._leave)
grow.addWidget(self._code_input)
grow.addWidget(self._join_btn)
grow.addWidget(self._leave_btn)
grow.addStretch(1)
gv.addLayout(grow)
self._guest_status = QLabel("")
self._guest_status.setObjectName("Muted")
gv.addWidget(self._guest_status)
root.addWidget(guest_card)
self._view = QTextEdit() self._host_term = TerminalView()
self._view.setObjectName("Report") self._host_term.keys.connect(lambda b: self._pty.write(b) if self._pty else None)
self._view.setReadOnly(True) self._host_term.resized.connect(lambda r, c: self._pty.set_size(r, c) if self._pty else None)
self._view.setVisible(False) self._host_term.setVisible(False)
root.addWidget(self._view, 1) v.addWidget(self._host_term)
root.addStretch(0) return card
# --- host ---------------------------------------------------------------
def _start_host(self) -> None: def _start_host(self) -> None:
if not load_token(): if not load_token():
self._host_status.setText("Set a Gitea access token in Setup → Account access first.") self._host_status.setText("Set a Gitea access token in Setup → Account access first.")
@@ -135,13 +135,69 @@ class SharePage(QWidget):
if data.get("error"): if data.get("error"):
self._host_status.setText(f"Rejected: {data['error']}") self._host_status.setText(f"Rejected: {data['error']}")
return return
if "code" in data: if "code" in data: # relay handshake
self._code_label.setText(data["code"]) self._code_label.setText(data["code"])
self._host_status.setText(f"Sharing as {data.get('user', '?')} — give this code to whoever should view your machine.") self._host_status.setText(f"Sharing as {data.get('user', '?')} — give this code to whoever should view your machine.")
self._stop_btn.setEnabled(True) self._stop_btn.setEnabled(True)
self._host_ws.sendTextMessage(share.host_full_frame(self._sampler)) self._host_ws.sendTextMessage(share.host_full_frame(self._sampler))
self._send_terminal_state()
if self._allow_term.isChecked():
self._start_pty()
self._timer.start() self._timer.start()
# guest input also arrives here; ignored (read-only session) return
kind = data.get("type") # frames forwarded from a guest
if kind == "req_full":
# A guest just joined — send a full frame AND the current terminal state, so a
# guest that joins *after* the host enabled the terminal still gets access.
self._host_ws.sendTextMessage(share.host_full_frame(self._sampler))
self._send_terminal_state()
elif kind == "pty_in" and self._pty:
self._pty.write(base64.b64decode(data["data"]))
elif kind == "pty_resize" and self._pty:
self._pty.set_size(int(data["rows"]), int(data["cols"]))
def _toggle_terminal(self, on: bool) -> None:
if on and self._host_ws and self._code_label.text():
self._start_pty()
elif not on:
self._stop_pty()
self._send_terminal_state()
def _send_terminal_state(self) -> None:
if self._host_ws and self._code_label.text():
self._host_ws.sendTextMessage(json.dumps({"type": "terminal", "enabled": self._allow_term.isChecked()}))
def _start_pty(self) -> None:
if self._pty:
return
rows, cols = self._host_term.grid()
self._pty = PtySession(rows=rows, cols=cols)
self._pty_notifier = QSocketNotifier(self._pty.master_fd, QSocketNotifier.Type.Read, self)
self._pty_notifier.activated.connect(self._on_pty_output)
self._host_term.reset()
self._host_term.setVisible(True)
def _on_pty_output(self) -> None:
if not self._pty:
return
data = self._pty.read()
if not data: # shell exited / EOF
self._stop_pty()
self._send_terminal_state()
self._allow_term.setChecked(False)
return
self._host_term.feed(data)
if self._host_ws:
self._host_ws.sendTextMessage(json.dumps({"type": "pty", "data": _b64(data)}))
def _stop_pty(self) -> None:
if self._pty_notifier:
self._pty_notifier.setEnabled(False)
self._pty_notifier = None
if self._pty:
self._pty.close()
self._pty = None
self._host_term.setVisible(False)
def _stream(self) -> None: def _stream(self) -> None:
if self._host_ws: if self._host_ws:
@@ -149,6 +205,7 @@ class SharePage(QWidget):
def _stop_host(self) -> None: def _stop_host(self) -> None:
self._timer.stop() self._timer.stop()
self._stop_pty()
if self._host_ws: if self._host_ws:
self._host_ws.close() self._host_ws.close()
self._host_ws = None self._host_ws = None
@@ -159,13 +216,54 @@ class SharePage(QWidget):
def _host_closed(self) -> None: def _host_closed(self) -> None:
self._timer.stop() self._timer.stop()
self._stop_pty()
self._start_btn.setEnabled(True) self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False) self._stop_btn.setEnabled(False)
if self._code_label.text(): if self._code_label.text():
self._code_label.setText("") self._code_label.setText("")
self._host_status.setText("Disconnected from the relay.") self._host_status.setText("Disconnected from the relay.")
# --- guest -------------------------------------------------------------- # ----------------------------------------------------------------- guest
def _build_guest(self) -> QFrame:
card, v = _card("Join a shared session")
row = QHBoxLayout()
self._code_input = QLineEdit()
self._code_input.setPlaceholderText("Enter share code")
self._code_input.setMaxLength(6)
self._code_input.setFixedWidth(160)
self._join_btn = QPushButton("Join")
self._join_btn.setObjectName("PrimaryButton")
self._join_btn.clicked.connect(self._join)
self._leave_btn = QPushButton("Leave")
self._leave_btn.setEnabled(False)
self._leave_btn.clicked.connect(self._leave)
row.addWidget(self._code_input)
row.addWidget(self._join_btn)
row.addWidget(self._leave_btn)
row.addStretch(1)
v.addLayout(row)
self._guest_status = QLabel("")
self._guest_status.setObjectName("Muted")
v.addWidget(self._guest_status)
self._view = QTextEdit()
self._view.setObjectName("Report")
self._view.setReadOnly(True)
self._view.setVisible(False)
self._view.setMinimumHeight(200)
v.addWidget(self._view)
self._term_label = QLabel("")
self._term_label.setObjectName("Muted")
self._term_label.setVisible(False)
v.addWidget(self._term_label)
self._guest_term = TerminalView()
self._guest_term.keys.connect(self._guest_key)
self._guest_term.resized.connect(self._guest_resize)
self._guest_term.setVisible(False)
v.addWidget(self._guest_term)
return card
def _join(self) -> None: def _join(self) -> None:
code = self._code_input.text().strip().upper() code = self._code_input.text().strip().upper()
if not load_token(): if not load_token():
@@ -195,19 +293,43 @@ class SharePage(QWidget):
self._guest_status.setText(f"Viewing {data.get('host', '?')}'s machine — read-only.") self._guest_status.setText(f"Viewing {data.get('host', '?')}'s machine — read-only.")
self._leave_btn.setEnabled(True) self._leave_btn.setEnabled(True)
self._view.setVisible(True) self._view.setVisible(True)
self._guest_ws.sendTextMessage(json.dumps({"type": "req_full"}))
return return
kind = data.get("type") kind = data.get("type")
if kind == "full":
self._last_report = data.get("report")
self._last_inv = data.get("inventory")
if kind in ("full", "snapshot"): if kind in ("full", "snapshot"):
if kind == "full":
self._last_report = data.get("report")
self._last_inv = data.get("inventory")
self._view.setHtml(share.guest_html(data.get("snapshot"), self._last_report, self._last_inv)) self._view.setHtml(share.guest_html(data.get("snapshot"), self._last_report, self._last_inv))
elif kind == "terminal":
self._set_terminal_visible(bool(data.get("enabled")))
elif kind == "pty":
self._guest_term.feed(base64.b64decode(data["data"]))
def _set_terminal_visible(self, enabled: bool) -> None:
self._term_label.setVisible(True)
self._term_label.setText("Terminal enabled by host — your keystrokes run on their machine. Click here and type."
if enabled else "Terminal not enabled by the host.")
self._guest_term.setVisible(enabled)
if enabled:
self._guest_term.reset()
self._guest_resize(*self._guest_term.grid())
self._guest_term.setFocus()
def _guest_key(self, data: bytes) -> None:
if self._guest_ws:
self._guest_ws.sendTextMessage(json.dumps({"type": "pty_in", "data": _b64(data)}))
def _guest_resize(self, rows: int, cols: int) -> None:
if self._guest_ws:
self._guest_ws.sendTextMessage(json.dumps({"type": "pty_resize", "rows": rows, "cols": cols}))
def _leave(self) -> None: def _leave(self) -> None:
if self._guest_ws: if self._guest_ws:
self._guest_ws.close() self._guest_ws.close()
self._guest_ws = None self._guest_ws = None
self._view.setVisible(False) for w in (self._view, self._term_label, self._guest_term):
w.setVisible(False)
self._leave_btn.setEnabled(False) self._leave_btn.setEnabled(False)
self._join_btn.setEnabled(True) self._join_btn.setEnabled(True)
self._guest_status.setText("Left the session.") self._guest_status.setText("Left the session.")
@@ -220,6 +342,7 @@ class SharePage(QWidget):
def shutdown(self) -> None: def shutdown(self) -> None:
self._timer.stop() self._timer.stop()
self._stop_pty()
for ws in (self._host_ws, self._guest_ws): for ws in (self._host_ws, self._guest_ws):
if ws: if ws:
ws.close() ws.close()
+98
View File
@@ -0,0 +1,98 @@
"""A minimal terminal view: renders PTY output via pyte and emits keystrokes (M12, Tier 3).
Used by both sides of a shared session the host (mirrors its local PTY, can also type, e.g.
a sudo password) and the guest (renders the streamed PTY, sends keystrokes). Monochrome for
now; cursor addressing / layout (vim, top) work via pyte.
"""
from __future__ import annotations
import pyte
from PySide6.QtCore import Qt, Signal
from PySide6.QtGui import QFontDatabase, QFontMetrics, QTextCursor
from PySide6.QtWidgets import QPlainTextEdit
class TerminalView(QPlainTextEdit):
keys = Signal(bytes) # user keystrokes -> bytes for the PTY
resized = Signal(int, int) # rows, cols
def __init__(self, rows: int = 24, cols: int = 80):
super().__init__()
self.setLineWrapMode(QPlainTextEdit.LineWrapMode.NoWrap)
self.setFont(QFontDatabase.systemFont(QFontDatabase.SystemFont.FixedFont))
self.setUndoRedoEnabled(False)
self.setMinimumHeight(260)
self._rows, self._cols = rows, cols
self._screen = pyte.HistoryScreen(cols, rows, history=1000, ratio=0.5)
self._stream = pyte.ByteStream(self._screen)
def grid(self) -> tuple[int, int]:
return self._rows, self._cols
def feed(self, data: bytes) -> None:
self._stream.feed(data)
self._render()
def reset(self) -> None:
self._screen.reset()
self._render()
def _row_text(self, row) -> str:
return "".join(row[x].data for x in range(self._cols)).rstrip()
def _render(self) -> None:
bar = self.verticalScrollBar()
at_bottom = bar.value() >= bar.maximum() - 2
prev = bar.value()
history = [self._row_text(r) for r in self._screen.history.top] # scrollback
self.setPlainText("\n".join(history + list(self._screen.display)))
if at_bottom: # follow output; place caret at the real (row, col)
cursor = self.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.Start)
cursor.movePosition(QTextCursor.MoveOperation.Down, QTextCursor.MoveMode.MoveAnchor, len(history) + self._screen.cursor.y)
cursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.MoveAnchor, self._screen.cursor.x)
self.setTextCursor(cursor)
self.ensureCursorVisible()
else: # user scrolled up to read — keep their place
bar.setValue(prev)
def resizeEvent(self, event): # noqa: N802 (Qt override)
super().resizeEvent(event)
fm = QFontMetrics(self.font())
cw = max(1, fm.horizontalAdvance("M"))
ch = max(1, fm.height())
cols = max(20, self.viewport().width() // cw)
rows = max(6, self.viewport().height() // ch)
if (rows, cols) != (self._rows, self._cols):
self._rows, self._cols = rows, cols
self._screen.resize(rows, cols)
self._render()
self.resized.emit(rows, cols)
def keyPressEvent(self, event): # noqa: N802 (Qt override)
data = self._translate(event)
if data:
self.keys.emit(data)
event.accept() # display comes from PTY output, not local editing
@staticmethod
def _translate(event) -> bytes:
key = event.key()
mod = event.modifiers()
k = Qt.Key
if mod & Qt.KeyboardModifier.ControlModifier and k.Key_A.value <= key <= k.Key_Z.value:
return bytes([key - k.Key_A.value + 1]) # Ctrl-A..Ctrl-Z
special = {
k.Key_Return.value: b"\r", k.Key_Enter.value: b"\r",
k.Key_Backspace.value: b"\x7f", k.Key_Tab.value: b"\t",
k.Key_Escape.value: b"\x1b",
k.Key_Up.value: b"\x1b[A", k.Key_Down.value: b"\x1b[B",
k.Key_Right.value: b"\x1b[C", k.Key_Left.value: b"\x1b[D",
k.Key_Home.value: b"\x1b[H", k.Key_End.value: b"\x1b[F",
k.Key_Delete.value: b"\x1b[3~", k.Key_PageUp.value: b"\x1b[5~", k.Key_PageDown.value: b"\x1b[6~",
}
if key in special:
return special[key]
text = event.text()
return text.encode("utf-8") if text else b""
+30
View File
@@ -14,6 +14,7 @@ CARD_BORDER = "#2a2f39"
TRACK = "#2a2f39" TRACK = "#2a2f39"
TEXT = "#e6e8eb" TEXT = "#e6e8eb"
MUTED = "#8b929c" MUTED = "#8b929c"
INPUT_BG = "#0d0f13" # form-control background (must stay dark — see contrast rule)
ACCENT = "#38bdf8" ACCENT = "#38bdf8"
COLD = "#7dd3fc" # icey-blue COLD = "#7dd3fc" # icey-blue
@@ -103,6 +104,15 @@ QPushButton#PrimaryButton {{ background: {ACCENT}; color: #06222e; border: none;
QPushButton#PrimaryButton:hover {{ background: #5cc8fb; }} QPushButton#PrimaryButton:hover {{ background: #5cc8fb; }}
QPushButton#PrimaryButton:disabled {{ background: #27424f; color: #5f7c8a; }} QPushButton#PrimaryButton:disabled {{ background: #27424f; color: #5f7c8a; }}
/* Inline per-finding action buttons (Install / Apply). Outlined: bright accent text on the
dark card so it stays readable regardless of fill painting; fills accent on hover. */
QPushButton#ActionButton {{
background: transparent; color: {ACCENT}; border: 1px solid {ACCENT};
border-radius: 8px; padding: 6px 16px; font-weight: 700; min-height: 18px;
}}
QPushButton#ActionButton:hover {{ background: {ACCENT}; color: #06222e; }}
QPushButton#ActionButton:disabled {{ color: {MUTED}; border-color: {CARD_BORDER}; }}
QDoubleSpinBox, QSpinBox {{ QDoubleSpinBox, QSpinBox {{
background: #262b34; color: {TEXT}; border: 1px solid {CARD_BORDER}; background: #262b34; color: {TEXT}; border: 1px solid {CARD_BORDER};
border-radius: 6px; padding: 4px 6px; border-radius: 6px; padding: 4px 6px;
@@ -138,4 +148,24 @@ QCheckBox::indicator:checked {{
QDialog {{ background: {BG}; }} QDialog {{ background: {BG}; }}
QMessageBox {{ background: {CARD}; }} QMessageBox {{ background: {CARD}; }}
QDialog QLabel, QMessageBox QLabel {{ color: {TEXT}; background: transparent; }} QDialog QLabel, QMessageBox QLabel {{ color: {TEXT}; background: transparent; }}
/* Form controls: keep dark bg + light text (Fusion defaults to light-on-light here). */
QLineEdit, QPlainTextEdit, QAbstractSpinBox, QComboBox {{
background: {INPUT_BG}; color: {TEXT};
border: 1px solid {CARD_BORDER}; border-radius: 6px; padding: 5px 8px;
selection-background-color: {ACCENT}; selection-color: #06222e;
}}
QLineEdit:focus, QPlainTextEdit:focus, QAbstractSpinBox:focus, QComboBox:focus {{
border: 1px solid {ACCENT};
}}
QLineEdit:disabled, QPlainTextEdit:disabled, QAbstractSpinBox:disabled {{ color: {MUTED}; }}
/* The combo-box drop-down list is a separate popup view unstyled it renders
light-on-light (same Fusion trap as the closed control above). */
QComboBox QAbstractItemView {{
background: {CARD}; color: {TEXT};
border: 1px solid {CARD_BORDER}; outline: 0;
selection-background-color: {ACCENT}; selection-color: #06222e;
}}
QComboBox QAbstractItemView::item {{ padding: 5px 8px; min-height: 22px; }}
""" """
+101 -1
View File
@@ -5,6 +5,7 @@ from __future__ import annotations
from PySide6.QtCore import QRectF, Qt from PySide6.QtCore import QRectF, Qt
from PySide6.QtGui import QColor, QFont, QPainter, QPen from PySide6.QtGui import QColor, QFont, QPainter, QPen
from PySide6.QtWidgets import ( from PySide6.QtWidgets import (
QComboBox,
QFrame, QFrame,
QHBoxLayout, QHBoxLayout,
QLabel, QLabel,
@@ -16,7 +17,106 @@ from PySide6.QtWidgets import (
from ..core.sample import Reading from ..core.sample import Reading
from ..render import format_value from ..render import format_value
from .theme import MUTED, TEXT, TRACK, gauge_color, temp_color from .theme import ACCENT, CRIT, GOOD, MUTED, TEXT, TRACK, WARN, gauge_color, temp_color
_SEV = {
"critical": ("CRITICAL", CRIT),
"warning": ("WARNING", WARN),
"info": ("INFO", MUTED),
"ok": ("OK", GOOD),
}
def finding_card(finding, on_install=None, on_apply=None) -> QFrame:
"""A card for one M4/M6 Finding (severity-colored title, detail, suggested fix).
If the finding names an installable catalog component (``finding.action``) and an
``on_install(component)`` callback is given, an "Install" button is shown so a
"tool not installed" finding becomes one click instead of a copy-pasted apt command.
If the finding names a runtime tunable (``finding.fix``) and an ``on_apply(fix_id,
value)`` callback is given, a dropdown of the live options + an Apply button is shown
(M6 live fixes D22).
"""
label, color = _SEV.get(finding.severity, ("?", MUTED))
card = QFrame()
card.setObjectName("Card")
v = QVBoxLayout(card)
v.setContentsMargins(16, 12, 16, 12)
v.setSpacing(4)
head = QLabel(f"{label} · {finding.category}: {finding.title}")
head.setStyleSheet(f"color: {color}; font-weight: 700; background: transparent;")
head.setWordWrap(True)
v.addWidget(head)
if finding.detail:
detail = QLabel(finding.detail)
detail.setObjectName("Muted")
detail.setWordWrap(True)
v.addWidget(detail)
if finding.suggestion:
suggestion = QLabel(f"{finding.suggestion}")
suggestion.setStyleSheet(f"color: {ACCENT}; background: transparent;")
suggestion.setWordWrap(True)
v.addWidget(suggestion)
component = _installable_component(finding) if on_install else None
if component is not None:
row = QHBoxLayout()
row.addStretch(1)
btn = QPushButton(f"Install {component.name}")
btn.setObjectName("ActionButton")
btn.setCursor(Qt.CursorShape.PointingHandCursor)
btn.clicked.connect(lambda: on_install(component))
row.addWidget(btn)
v.addLayout(row)
tunable = _tunable(finding) if on_apply else None
if tunable is not None and tunable.options:
row = QHBoxLayout()
name = QLabel(f"{tunable.label}:")
name.setObjectName("Muted")
combo = QComboBox()
combo.addItems(tunable.options)
if tunable.current in tunable.options:
combo.setCurrentText(tunable.current)
combo.setCursor(Qt.CursorShape.PointingHandCursor)
apply_btn = QPushButton("Apply")
apply_btn.setObjectName("ActionButton")
apply_btn.setCursor(Qt.CursorShape.PointingHandCursor)
apply_btn.clicked.connect(lambda: on_apply(tunable.id, combo.currentText()))
row.addWidget(name)
row.addWidget(combo, 1)
row.addWidget(apply_btn)
v.addLayout(row)
if tunable.note:
note = QLabel(tunable.note)
note.setObjectName("Muted")
v.addWidget(note)
return card
def _tunable(finding):
"""The runtime tunable a finding can apply, if any."""
fix = getattr(finding, "fix", "")
if not fix:
return None
from ..core import fixes
return fixes.get_tunable(fix)
def _installable_component(finding):
"""The catalog component a finding offers to install, if any and if apt is usable."""
action = getattr(finding, "action", "")
if not action:
return None
from ..core import catalog, sysenv
if sysenv.package_manager() != "apt":
return None # apt-only (D15) — no one-click install elsewhere
return catalog.by_id(action)
class Card(QFrame): class Card(QFrame):
+3 -3
View File
@@ -102,12 +102,12 @@ def _aggregate_peaks(maxima: dict) -> list[tuple[str, str, float, str, float, st
_SEV_LABEL = {"critical": "CRITICAL", "warning": "WARNING", "info": "INFO", "ok": "OK"} _SEV_LABEL = {"critical": "CRITICAL", "warning": "WARNING", "info": "INFO", "ok": "OK"}
def render_health(findings: list) -> str: def render_health(findings: list, title: str = "Health report") -> str:
if not findings: if not findings:
return "Health report: no findings." return f"{title}: no findings."
crit = sum(1 for f in findings if f.severity == "critical") crit = sum(1 for f in findings if f.severity == "critical")
warn = sum(1 for f in findings if f.severity == "warning") warn = sum(1 for f in findings if f.severity == "warning")
lines = ["Health report", "", f" {crit} critical · {warn} warning · {len(findings)} checks", ""] lines = [title, "", f" {crit} critical · {warn} warning · {len(findings)} checks", ""]
for f in findings: for f in findings:
lines.append(f"[{_SEV_LABEL.get(f.severity, '?')}] {f.category}: {f.title}") lines.append(f"[{_SEV_LABEL.get(f.severity, '?')}] {f.category}: {f.title}")
if f.detail: if f.detail:
+10
View File
@@ -19,6 +19,16 @@ class ConfigTests(unittest.TestCase):
self.assertEqual(loaded["gpu_temp_alert"], 88.0) self.assertEqual(loaded["gpu_temp_alert"], 88.0)
self.assertEqual(loaded["update_check_minutes"], 5) self.assertEqual(loaded["update_check_minutes"], 5)
def test_list_value_round_trip(self):
with tempfile.TemporaryDirectory() as d:
cf = Path(d) / "config.toml"
with mock.patch.object(config, "CONFIG_FILE", cf), mock.patch.object(config, "CONFIG_DIR", Path(d)):
paths = ["/home/u/.local/share/Steam", "/mnt/games/SteamLibrary"]
config.update_config(steam_libraries=paths)
self.assertEqual(config.load_config()["steam_libraries"], paths)
config.update_config(steam_libraries=[])
self.assertEqual(config.load_config()["steam_libraries"], [])
def test_update_config_merges_and_keeps_defaults(self): def test_update_config_merges_and_keeps_defaults(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
cf = Path(d) / "config.toml" cf = Path(d) / "config.toml"
+61
View File
@@ -0,0 +1,61 @@
"""Tests for the guided diagnostic orchestration (M3+M4 glue)."""
import tempfile
import time
import unittest
from pathlib import Path
from unittest import mock
from rigdoctor.core import diagnostic
from rigdoctor.core.crashlog import CrashLogWriter, summarize
from rigdoctor.core.health import Finding
from rigdoctor.core.sample import Reading, Sample
def _write_log(path: str, game: str) -> None:
w = CrashLogWriter(path)
w.write_event("session-start", "interval=1s")
w.write_event("game", game)
for temp in (60.0, 72.0, 81.0):
w.write_sample(Sample(ts=time.time(), readings=[Reading("gpu", "temp", temp, "°C", "")]))
w.write_event("gpu-lost", "nvidia-smi query timed out")
w.close()
class GameRecoveryTests(unittest.TestCase):
def test_game_recovered_from_log_event(self):
with tempfile.TemporaryDirectory() as d:
log = str(Path(d) / "capture.jsonl")
_write_log(log, "Path of Exile 2")
summary = summarize(log)
self.assertEqual(diagnostic._game_from_summary(summary), "Path of Exile 2")
def test_no_game_event_returns_none(self):
with tempfile.TemporaryDirectory() as d:
log = str(Path(d) / "capture.jsonl")
w = CrashLogWriter(log)
w.write_event("session-start")
w.close()
self.assertIsNone(diagnostic._game_from_summary(summarize(log)))
class FinishTests(unittest.TestCase):
def test_finish_combines_summary_and_findings(self):
with tempfile.TemporaryDirectory() as d:
log = Path(d) / "capture.jsonl"
_write_log(str(log), "Satisfactory")
fake = [Finding("warning", "GPU", "NVIDIA Xid 79 ×1", "fell off the bus")]
with mock.patch("rigdoctor.core.health.run_health_checks", return_value=fake), \
mock.patch.object(diagnostic.reccontrol, "stop_background", return_value=False), \
mock.patch.object(diagnostic.reccontrol, "running_pid", return_value=None):
result = diagnostic.finish(log_path=log)
self.assertEqual(result.game, "Satisfactory")
self.assertEqual(result.summary.samples, 3)
self.assertEqual(result.findings, fake)
# peak GPU temp captured in the window, GPU-lost event recorded
self.assertEqual(result.summary.maxima["gpu.temp"][0], 81.0)
self.assertTrue(any(kind == "gpu-lost" for _ts, kind, _d in result.summary.events))
if __name__ == "__main__":
unittest.main()
+63
View File
@@ -0,0 +1,63 @@
"""Tests for M6 runtime tunables (parse, command builders, value validation)."""
import unittest
from unittest import mock
from rigdoctor.core import fixes
from rigdoctor.core.fixes import Tunable
class ParseTests(unittest.TestCase):
def test_bracketed(self):
self.assertEqual(fixes._bracketed("always [madvise] never"), (["always", "madvise", "never"], "madvise"))
def test_bracketed_none_active(self):
self.assertEqual(fixes._bracketed("a b c"), (["a", "b", "c"], None))
class CommandBuilderTests(unittest.TestCase):
def test_governor_cmd_writes_value_to_sysfs(self):
cmd = fixes._cpu_governor_cmd("performance")
self.assertEqual(cmd[:2], ["/bin/sh", "-c"])
self.assertIn("performance", cmd[2])
self.assertIn("scaling_governor", cmd[2])
def test_persistence_cmd(self):
self.assertEqual(fixes._nvidia_persistence_cmd("Enabled"), ["nvidia-smi", "-pm", "1"])
self.assertEqual(fixes._nvidia_persistence_cmd("Disabled"), ["nvidia-smi", "-pm", "0"])
def test_swappiness_cmd_targets_procfs(self):
self.assertIn("/proc/sys/vm/swappiness", fixes._swappiness_cmd("10")[2])
def test_quoting_is_safe(self):
# A value that would be dangerous unquoted stays a single quoted token.
cmd = fixes._pcie_aspm_cmd("performance; rm -rf /")
self.assertIn("'performance; rm -rf /'", cmd[2])
class ApplyValidationTests(unittest.TestCase):
def test_unknown_fix_returns_none(self):
self.assertIsNone(fixes.apply_command("does_not_exist", "x"))
def test_value_validated_against_live_options(self):
fake = Tunable("x", "X", ["a", "b"], "a")
with mock.patch.dict(fixes._TUNABLES, {"x": (lambda: fake, lambda v: ["echo", v])}, clear=False):
self.assertEqual(fixes.apply_command("x", "a"), ["echo", "a"])
self.assertIsNone(fixes.apply_command("x", "not-an-option"))
def test_apply_unknown_is_error(self):
rc, _ = fixes.apply("nope", "x")
self.assertEqual(rc, 1)
class GameenvWiringTests(unittest.TestCase):
def test_findings_reference_known_fix_ids(self):
from rigdoctor.core import gameenv
fix_ids = {f.fix for f in gameenv.run_gameenv_checks() if f.fix}
# Whatever fixes the live system surfaces, each must be a real tunable id.
self.assertTrue(fix_ids.issubset(set(fixes._TUNABLES)))
if __name__ == "__main__":
unittest.main()
+73
View File
@@ -0,0 +1,73 @@
"""Tests for M6 gaming-environment checks (pure evaluators + aggregate smoke test)."""
import unittest
from rigdoctor.core import gameenv
from rigdoctor.core.health import Finding
class AspmTests(unittest.TestCase):
def test_powersave_is_warning(self):
f = gameenv.evaluate_aspm("[powersave] performance powersupersave\n")
self.assertEqual(f.severity, "warning")
self.assertEqual(f.category, "PCIe")
def test_performance_is_ok(self):
self.assertEqual(gameenv.evaluate_aspm("[performance] powersave powersupersave").severity, "ok")
def test_default_is_info(self):
self.assertEqual(gameenv.evaluate_aspm("[default] performance powersave").severity, "info")
def test_missing_is_none(self):
self.assertIsNone(gameenv.evaluate_aspm(None))
self.assertIsNone(gameenv.evaluate_aspm("no brackets here"))
class GovernorTests(unittest.TestCase):
def test_performance_only_is_ok(self):
self.assertEqual(gameenv.evaluate_governor({"performance"}).severity, "ok")
def test_powersave_is_warning(self):
f = gameenv.evaluate_governor({"powersave"})
self.assertEqual(f.severity, "warning")
self.assertEqual(f.fix, "cpu_governor") # offers the live Apply dropdown
def test_dynamic_is_info(self):
self.assertEqual(gameenv.evaluate_governor({"schedutil"}).severity, "info")
def test_empty_is_none(self):
self.assertIsNone(gameenv.evaluate_governor(set()))
class SwappinessTests(unittest.TestCase):
def test_high_is_info_with_suggestion(self):
f = gameenv.evaluate_swappiness(60)
self.assertEqual(f.severity, "info")
self.assertEqual(f.fix, "swappiness") # offers the live Apply dropdown
def test_low_is_ok(self):
self.assertEqual(gameenv.evaluate_swappiness(10).severity, "ok")
class ShaderCacheTests(unittest.TestCase):
def test_disabled_nvidia_is_warning(self):
self.assertEqual(gameenv.evaluate_shader_cache({"__GL_SHADER_DISK_CACHE": "0"}).severity, "warning")
def test_disabled_mesa_is_warning(self):
self.assertEqual(gameenv.evaluate_shader_cache({"MESA_SHADER_CACHE_DISABLE": "true"}).severity, "warning")
def test_default_is_ok(self):
self.assertEqual(gameenv.evaluate_shader_cache({}).severity, "ok")
class AggregateTests(unittest.TestCase):
def test_run_returns_sorted_findings(self):
findings = gameenv.run_gameenv_checks()
self.assertTrue(all(isinstance(f, Finding) for f in findings))
order = {"critical": 0, "warning": 1, "info": 2, "ok": 3}
sevs = [order.get(f.severity, 9) for f in findings]
self.assertEqual(sevs, sorted(sevs)) # worst-first
if __name__ == "__main__":
unittest.main()
+27
View File
@@ -0,0 +1,27 @@
"""Tests for the host PTY session (M12 Tier 3)."""
import time
import unittest
from rigdoctor.core.pty_session import PtySession
class PtySessionTests(unittest.TestCase):
def test_runs_command_and_reads_output(self):
pty = PtySession(rows=24, cols=80)
try:
time.sleep(0.4)
pty.read() # drain the shell prompt
pty.write(b"echo PTY_MARKER_42\n")
deadline = time.time() + 3
buf = ""
while time.time() < deadline and "PTY_MARKER_42" not in buf:
time.sleep(0.1)
buf += pty.read().decode(errors="replace")
self.assertIn("PTY_MARKER_42", buf)
finally:
pty.close()
if __name__ == "__main__":
unittest.main()
+147
View File
@@ -0,0 +1,147 @@
"""Tests for M6 Steam library & game detection (VDF parse, scan, tool filter, cache diff)."""
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from rigdoctor.core import steam
_GAME_ACF = """"AppState"
{{
\t"appid"\t\t"{appid}"
\t"name"\t\t"{name}"
\t"installdir"\t\t"{installdir}"
\t"SizeOnDisk"\t\t"{size}"
\t"LastUpdated"\t\t"{updated}"
}}
"""
_LIBRARYFOLDERS = """"libraryfolders"
{{
\t"0"
\t{{
\t\t"path"\t\t"{path}"
\t\t"label"\t\t"Main"
\t\t"apps"
\t\t{{
\t\t\t"570"\t\t"123"
\t\t}}
\t}}
}}
"""
def _make_library(root: Path, games) -> Path:
"""games: list of (appid, name, installdir, size, updated). Returns the library path."""
steamapps = root / "steamapps"
steamapps.mkdir(parents=True, exist_ok=True)
for appid, name, installdir, size, updated in games:
(steamapps / f"appmanifest_{appid}.acf").write_text(
_GAME_ACF.format(appid=appid, name=name, installdir=installdir, size=size, updated=updated)
)
return root
class VdfTests(unittest.TestCase):
def test_parse_nested_and_pairs(self):
data = steam._parse_vdf(_GAME_ACF.format(
appid="570", name="Dota 2", installdir="dota 2 beta", size="15", updated="1700"))
state = data["AppState"]
self.assertEqual(state["appid"], "570")
self.assertEqual(state["name"], "Dota 2")
self.assertEqual(state["installdir"], "dota 2 beta")
def test_parse_handles_quotes_in_names(self):
acf = _GAME_ACF.format(appid="1", name="Baldur\\'s Gate 3", installdir="bg3", size="1", updated="1")
data = steam._parse_vdf(acf)
self.assertIn("Baldur", data["AppState"]["name"])
def test_parse_garbage_returns_empty(self):
self.assertEqual(steam._parse_vdf("not vdf at all"), {})
class ToolFilterTests(unittest.TestCase):
def test_known_tool_appid(self):
self.assertTrue(steam.is_tool("228980", "Steamworks Common Redistributables"))
def test_proton_name_prefix(self):
self.assertTrue(steam.is_tool("9999999", "Proton 8.0"))
self.assertTrue(steam.is_tool("9999998", "Steam Linux Runtime 3.0 (sniper)"))
def test_real_game_is_not_a_tool(self):
self.assertFalse(steam.is_tool("570", "Dota 2"))
class ScanTests(unittest.TestCase):
def test_scan_library_filters_tools(self):
with tempfile.TemporaryDirectory() as d:
lib = _make_library(Path(d), [
("570", "Dota 2", "dota 2 beta", "15000000000", "1700000000"),
("228980", "Steamworks Common Redistributables", "Steamworks Shared", "0", "0"),
("1493710", "Proton Experimental", "Proton - Experimental", "0", "0"),
])
games = steam.scan_library(str(lib))
names = {g.name for g in games}
self.assertEqual(names, {"Dota 2"})
self.assertEqual(games[0].size_bytes, 15000000000)
def test_scan_games_dedupes_and_sorts(self):
with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2:
a = _make_library(Path(d1), [("10", "Zeta", "zeta", "1", "1"), ("20", "Alpha", "alpha", "1", "1")])
b = _make_library(Path(d2), [("20", "Alpha", "alpha", "1", "1")]) # dup appid 20
games = steam.scan_games([str(a), str(b)])
self.assertEqual([g.name for g in games], ["Alpha", "Zeta"]) # sorted, deduped
class DiscoverTests(unittest.TestCase):
def test_discover_reads_libraryfolders(self):
with tempfile.TemporaryDirectory() as d:
root = Path(d) / "Steam"
(root / "steamapps").mkdir(parents=True)
extra = Path(d) / "Extra"
(extra / "steamapps").mkdir(parents=True)
(root / "steamapps" / "libraryfolders.vdf").write_text(
_LIBRARYFOLDERS.format(path=str(extra)))
with mock.patch.object(steam, "steam_roots", return_value=[root]):
libs = steam.discover_libraries()
paths = {lib.path for lib in libs}
self.assertIn(str(root.resolve()), paths) # root itself
self.assertIn(str(extra.resolve()), paths) # the configured extra library
class CacheDiffTests(unittest.TestCase):
def _rescan(self, lib, games_file, cfg):
with mock.patch.object(steam, "GAMES_FILE", games_file):
return steam.rescan(cfg=cfg)
def test_first_scan_has_no_new_then_added_game_is_new(self):
with tempfile.TemporaryDirectory() as d:
lib = _make_library(Path(d) / "lib", [("10", "Alpha", "alpha", "1", "1")])
games_file = Path(d) / "games.json"
cfg = {"steam_libraries": [str(lib)]}
first = self._rescan(lib, games_file, cfg)
self.assertEqual(first.new_appids, []) # first run flags nothing as new
# Install a second game; it should be flagged new on the next scan.
_make_library(lib, [("10", "Alpha", "alpha", "1", "1"), ("20", "Beta", "beta", "1", "1")])
second = self._rescan(lib, games_file, cfg)
self.assertEqual(second.new_appids, ["20"])
self.assertEqual({g.name for g in second.games}, {"Alpha", "Beta"})
def test_acknowledge_clears_new(self):
with tempfile.TemporaryDirectory() as d:
lib = _make_library(Path(d) / "lib", [("10", "Alpha", "alpha", "1", "1")])
games_file = Path(d) / "games.json"
cfg = {"steam_libraries": [str(lib)]}
self._rescan(lib, games_file, cfg)
_make_library(lib, [("10", "Alpha", "alpha", "1", "1"), ("20", "Beta", "beta", "1", "1")])
self._rescan(lib, games_file, cfg)
with mock.patch.object(steam, "GAMES_FILE", games_file):
steam.acknowledge_new()
self.assertEqual(steam.load_cache()["new_appids"], [])
if __name__ == "__main__":
unittest.main()