Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d7f07dd7c0 | |||
| 0642eb4712 | |||
| f25ac939cc | |||
| b47006bc22 | |||
| 00394c287c | |||
| 2f6cab72c4 |
@@ -5,6 +5,55 @@ All notable changes to RigDoctor are recorded here. Format follows
|
||||
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
|
||||
release tag (so the auto-updater, D18, can compare versions).
|
||||
|
||||
## [0.8.0] - 2026-05-22
|
||||
### Added
|
||||
- **Gaming environment checks (M6) — Steam game detection.** RigDoctor now finds your Steam
|
||||
libraries (across multiple drives, via `libraryfolders.vdf`) and the games installed in each
|
||||
(parsing `appmanifest_*.acf` — stdlib only, no Steam tooling needed). Runtimes, Proton builds,
|
||||
and redistributables are filtered out.
|
||||
- **Opt-in libraries:** detected libraries are listed with a per-library game count; you check
|
||||
the ones to scan. Nothing is scanned until you pick a library.
|
||||
- **Background scan on every launch:** the GUI rescans the selected libraries in the background
|
||||
when it opens and flags games installed since the last scan with a **NEW** badge plus a count
|
||||
on the **Games** sidebar item (cleared when you view the page). Results are cached
|
||||
(`~/.local/state/rigdoctor/games.json`) so the list shows instantly.
|
||||
- **CLI:** `rigdoctor games` lists detected games; `rigdoctor games libraries
|
||||
[--enable PATH | --disable PATH | --all]` lists/selects libraries (headless-complete, D17).
|
||||
- Config now supports list values (TOML arrays); `steam_libraries` records the selected libraries.
|
||||
|
||||
## [0.7.3] - 2026-05-21
|
||||
### Fixed
|
||||
- Shared terminal now has **scrollback** — large output (e.g. `ls -la`) can be scrolled up to
|
||||
read; it keeps a history buffer and only auto-scrolls to the bottom when you're already there.
|
||||
|
||||
## [0.7.2] - 2026-05-21
|
||||
### Changed
|
||||
- Removed the GUI **Inventory** tab — use the CLI `rigdoctor inventory` instead. (Inventory is
|
||||
still collected for the relay guest view, so a remote helper still sees the host's hardware.)
|
||||
### Fixed
|
||||
- Shared terminal caret now sits at the real cursor position (row **and** column) instead of
|
||||
the start of the line.
|
||||
|
||||
## [0.7.1] - 2026-05-21
|
||||
### Fixed
|
||||
- Shared terminal: a guest who joined **after** the host enabled the terminal stayed read-only.
|
||||
The host now re-sends the terminal state when a guest joins, so the terminal is available.
|
||||
- Inventory page no longer jumps back to the top when it refreshes (e.g. when elevated data
|
||||
arrives) — scroll position is preserved and unchanged data isn't re-rendered.
|
||||
- Shared terminal now follows the cursor to the bottom as output arrives (e.g. `ls -la`),
|
||||
instead of staying scrolled up.
|
||||
|
||||
## [0.7.0] - 2026-05-21
|
||||
### Added
|
||||
- **Shared terminal (M12, Tier 3)**: when the host enables it, the session shares a real **PTY**
|
||||
shell — the guest gets an interactive terminal (vim, top, tab-completion, Ctrl-C) running on
|
||||
the host as the host's user. The host **reads along** live and can type too, e.g. a `sudo`
|
||||
password — which stays local and is never sent to the guest. Off by default, host-consented.
|
||||
The guest also pulls the host's inventory on join.
|
||||
### Fixed
|
||||
- **Input contrast**: all form controls (text fields, spin boxes, combo boxes, terminals) now
|
||||
use the dark theme with readable text (Fusion defaulted them to light-on-light).
|
||||
|
||||
## [0.6.0] - 2026-05-21
|
||||
### Added
|
||||
- **Session sharing over the relay (M12)**: a **Share** tab — *Start shared session* (host)
|
||||
|
||||
+10
-2
@@ -14,7 +14,7 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
|
||||
| M2 | Live monitor (TUI) | Monitoring | none (stdlib curses) | all | P1 | ⬜ |
|
||||
| M8 | Alerting | Monitoring | libnotify (opt) | all | P2 | 🟨 |
|
||||
| M5 | System inventory | Diagnostics | none (opt: lm-sensors, dmidecode) | all | P1 | 🟨 |
|
||||
| M6 | Gaming env checks | Diagnostics | none | all | P2 | ⬜ |
|
||||
| M6 | Gaming env checks | Diagnostics | none | all | P2 | 🟨 |
|
||||
| M10 | Desktop GUI | Desktop UI | **python3-pyside6** | all | P2 | 🟨 |
|
||||
| M11 | Tray / menu-bar applet | Desktop UI | **python3-pyside6** (+ AppIndicator on GNOME) | all | P2 | ⬜ |
|
||||
| M9 | Installer | (meta) | none | all | P1 | 🟨 |
|
||||
@@ -41,7 +41,15 @@ Status: ⬜ not started · 🟦 designing · 🟨 in progress · ✅ done
|
||||
(text/JSON) + GUI Health tab. GPU-firmware verification deferred.
|
||||
- **M2 Live monitor** — depends on M1; the terminal "HWMonitor for Linux" face. Stdlib-only.
|
||||
- **M5 / M6 Diagnostics** — inventory export + gaming-env checks; M6 flags risky settings and
|
||||
suggests the fix command but does not apply it (D9).
|
||||
suggests the fix command but does not apply it (D9). *M6 implemented (Steam detection first —
|
||||
the D12 "pick a game" foundation):* discovers Steam installs + all library folders
|
||||
(`libraryfolders.vdf`, multi-drive) and the games in each (`appmanifest_*.acf`), filtering
|
||||
runtimes/Proton/redistributables — stdlib only. **Libraries are opt-in** (`steam_libraries`
|
||||
config); the GUI **Games** page lists them with per-library counts and rescans in the
|
||||
background on every launch, badging games installed since the last scan (cached in
|
||||
`state/games.json`). CLI: `rigdoctor games` / `games libraries [--enable|--disable|--all]`.
|
||||
*Pending:* the env-check probes (CPU governor, GPU persistence, GameMode/MangoHud, swappiness,
|
||||
hugepages, mitigations, PCIe ASPM) and non-Steam launchers (Lutris/Heroic).
|
||||
- **M8 Alerting** — threshold/event notifications; integrates with the tray applet (M11).
|
||||
- **M10 Desktop GUI** — PySide6 graphical front-end over the core engine (dashboard, log
|
||||
browser, report viewer, logger controls). Optional; adds the Qt dependency. *Bootstrapped
|
||||
|
||||
+7
-3
@@ -27,7 +27,11 @@ Ubuntu + NVIDIA first; `.deb` distribution (see `DECISIONS.md`).
|
||||
|
||||
## Phase 3 — Diagnostics breadth
|
||||
- [ ] M5 system inventory + exportable report
|
||||
- [ ] M6 gaming environment checks (suggest-only)
|
||||
- [~] M6 gaming environment checks (suggest-only) — *Steam game/library detection done*
|
||||
(multi-library `libraryfolders.vdf` discovery + `appmanifest` scan, opt-in libraries,
|
||||
launch-time background rescan with new-game badge; CLI `rigdoctor games`, GUI Games page).
|
||||
This is also the D12 "pick a game" foundation. *Pending:* the env-check probes (governor,
|
||||
GPU persistence, GameMode/MangoHud, swappiness, hugepages, mitigations, PCIe ASPM).
|
||||
- [ ] SMART integration (smartmontools if present)
|
||||
|
||||
## Phase 4 — Desktop UI & installer
|
||||
@@ -60,8 +64,8 @@ Escalating ladder, built in order:
|
||||
it in RigDoctor. One-way, safest.
|
||||
- [x] Tier 2: live read-only view — `rigdoctor share serve` (stdlib HTTP, token-gated:
|
||||
sensors + health + inventory). Remote = user-chosen tunnel; GUI controls still to add.
|
||||
- [ ] Tier 3: gated interactive terminal (wrap tmate/sshx; read-only default, read-write on
|
||||
explicit consent), with session audit log.
|
||||
- [x] Tier 3: host-consented interactive terminal — a real PTY shell shared over the relay
|
||||
(own `pty`, pyte-rendered guest), off by default; host reads along + can type (sudo).
|
||||
|
||||
> **Out of scope:** stress/repro module (D7); multi-distro support and packaging beyond
|
||||
> Ubuntu/apt + `.deb` (D15) — a thin seam is kept but not built out.
|
||||
|
||||
+2
-2
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "rigdoctor"
|
||||
version = "0.6.0"
|
||||
version = "0.8.0"
|
||||
description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
@@ -13,7 +13,7 @@ requires-python = ">=3.11"
|
||||
dependencies = []
|
||||
|
||||
[project.optional-dependencies]
|
||||
gui = ["PySide6"]
|
||||
gui = ["PySide6", "pyte"]
|
||||
|
||||
[project.scripts]
|
||||
rigdoctor = "rigdoctor.cli:main"
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
|
||||
|
||||
__version__ = "0.6.0"
|
||||
__version__ = "0.8.0"
|
||||
|
||||
@@ -345,6 +345,73 @@ def cmd_report(args) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_games(args) -> int:
|
||||
from .core import steam
|
||||
|
||||
selected = steam.selected_library_paths()
|
||||
if not selected:
|
||||
print("No Steam libraries selected to scan.")
|
||||
print(" See them with: rigdoctor games libraries")
|
||||
print(" Then enable one: rigdoctor games libraries --enable <path> (or --all)")
|
||||
return 1
|
||||
result = steam.rescan()
|
||||
if args.json:
|
||||
from dataclasses import asdict
|
||||
|
||||
print(json.dumps({
|
||||
"scanned_at": result.scanned_at,
|
||||
"new_appids": result.new_appids,
|
||||
"games": [asdict(g) for g in result.games],
|
||||
}, indent=2, ensure_ascii=False))
|
||||
return 0
|
||||
if not result.games:
|
||||
print("No games found in the selected Steam libraries.")
|
||||
return 0
|
||||
new = set(result.new_appids)
|
||||
print(f"{len(result.games)} game(s) across {len(selected)} librar(y/ies):\n")
|
||||
for g in result.games:
|
||||
flag = " NEW" if g.appid in new else ""
|
||||
print(f" {g.name:<48} {steam.human_size(g.size_bytes):>9}{flag}")
|
||||
if new:
|
||||
print(f"\n{len(new)} newly-installed since the last scan.")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_games_libraries(args) -> int:
|
||||
from .core import steam
|
||||
|
||||
discovered = steam.discover_libraries()
|
||||
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
|
||||
|
||||
# --all / --enable / --disable adjust the selection, then we list the result.
|
||||
if args.all or args.enable or args.disable:
|
||||
if args.all:
|
||||
selected = {lib.path for lib in discovered}
|
||||
for raw in args.enable or []:
|
||||
selected.add(os.path.realpath(os.path.expanduser(raw)))
|
||||
for raw in args.disable or []:
|
||||
selected.discard(os.path.realpath(os.path.expanduser(raw)))
|
||||
config.update_config(steam_libraries=sorted(selected))
|
||||
|
||||
if not discovered:
|
||||
print("No Steam libraries detected (is Steam installed?).")
|
||||
return 1
|
||||
if args.json:
|
||||
print(json.dumps([
|
||||
{"path": lib.path, "label": lib.label, "selected": lib.path in selected,
|
||||
"games": len(steam.scan_library(lib.path))}
|
||||
for lib in discovered
|
||||
], indent=2, ensure_ascii=False))
|
||||
return 0
|
||||
print("Steam libraries (checked = scanned for games):\n")
|
||||
for lib in discovered:
|
||||
mark = "x" if lib.path in selected else " "
|
||||
count = len(steam.scan_library(lib.path))
|
||||
label = f" [{lib.label}]" if lib.label else ""
|
||||
print(f" [{mark}] {lib.path}{label} ({count} games)")
|
||||
return 0
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
prog="rigdoctor",
|
||||
@@ -423,6 +490,17 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
inv.add_argument("--markdown", action="store_true", help="output Markdown (for forum/bug reports)")
|
||||
inv.add_argument("-o", "--output", default=None, help="write to a file instead of stdout")
|
||||
inv.set_defaults(func=cmd_inventory)
|
||||
|
||||
games_p = sub.add_parser("games", help="Steam game & library detection (M6)")
|
||||
games_p.add_argument("--json", action="store_true", help="output JSON")
|
||||
games_p.set_defaults(func=cmd_games)
|
||||
games_sub = games_p.add_subparsers(dest="games_cmd")
|
||||
lib_p = games_sub.add_parser("libraries", help="list/select Steam libraries to scan")
|
||||
lib_p.add_argument("--enable", action="append", metavar="PATH", help="scan this library (repeatable)")
|
||||
lib_p.add_argument("--disable", action="append", metavar="PATH", help="stop scanning this library (repeatable)")
|
||||
lib_p.add_argument("--all", action="store_true", help="scan all detected libraries")
|
||||
lib_p.add_argument("--json", action="store_true", help="output JSON")
|
||||
lib_p.set_defaults(func=cmd_games_libraries)
|
||||
return p
|
||||
|
||||
|
||||
|
||||
@@ -27,6 +27,10 @@ STATUS_FILE = STATE_DIR / "recorder.json"
|
||||
PID_FILE = STATE_DIR / "recorder.pid"
|
||||
SPAWN_LOG = STATE_DIR / "recorder.out"
|
||||
|
||||
# Gaming environment / game detection (M6) — cached Steam game scan (mutable state,
|
||||
# not config: refreshed by the background scan on every launch).
|
||||
GAMES_FILE = STATE_DIR / "games.json"
|
||||
|
||||
# Update access token (M13) — gates updates to Gitea account holders (D18).
|
||||
# Stored in the OS keyring (Secret Service / GNOME Keyring) via `secret-tool` when
|
||||
# available — encrypted at rest, unlocked with the login session — else a 0600 file.
|
||||
@@ -143,6 +147,7 @@ DEFAULTS: dict = {
|
||||
"gpu_temp_alert": 90.0, # °C — alert when GPU reaches this
|
||||
"cpu_temp_alert": 95.0, # °C — alert when CPU reaches this
|
||||
"relay_url": "wss://rigdoctor.jesseyvanofferen.com", # session-sharing relay (M12)
|
||||
"steam_libraries": [], # Steam library paths to scan for games (M6); empty = none picked yet
|
||||
}
|
||||
|
||||
|
||||
@@ -165,6 +170,8 @@ def _toml_value(value) -> str:
|
||||
return "true" if value else "false"
|
||||
if isinstance(value, (int, float)):
|
||||
return repr(value)
|
||||
if isinstance(value, (list, tuple)):
|
||||
return "[" + ", ".join(_toml_value(v) for v in value) + "]"
|
||||
return '"' + str(value).replace("\\", "\\\\").replace('"', '\\"') + '"'
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
"""A pseudo-terminal running the host's shell (M12, Tier 3 — host side).
|
||||
|
||||
Spawns the user's login shell in a real PTY so interactive programs work over a shared
|
||||
session: vim, top, tab-completion, colours, Ctrl-C, and `sudo` (which prompts inside the
|
||||
PTY — the host types that password locally, so it's never sent to the guest). Runs as the
|
||||
host's own user — never elevated. Linux-only (uses `pty`/`termios`).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import fcntl
|
||||
import os
|
||||
import pty
|
||||
import signal
|
||||
import struct
|
||||
import termios
|
||||
|
||||
|
||||
class PtySession:
|
||||
def __init__(self, rows: int = 24, cols: int = 80):
|
||||
self.pid, self.master_fd = pty.fork()
|
||||
if self.pid == 0: # child: become the shell
|
||||
os.environ["TERM"] = "xterm-256color"
|
||||
shell = os.environ.get("SHELL", "/bin/bash")
|
||||
try:
|
||||
os.execvp(shell, [shell])
|
||||
finally:
|
||||
os._exit(1)
|
||||
os.set_blocking(self.master_fd, False)
|
||||
self.set_size(rows, cols)
|
||||
|
||||
def set_size(self, rows: int, cols: int) -> None:
|
||||
try:
|
||||
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
try:
|
||||
os.write(self.master_fd, data)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def read(self, size: int = 65536) -> bytes:
|
||||
try:
|
||||
return os.read(self.master_fd, size)
|
||||
except (BlockingIOError, OSError):
|
||||
return b""
|
||||
|
||||
def close(self) -> None:
|
||||
try:
|
||||
os.close(self.master_fd)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.kill(self.pid, signal.SIGHUP)
|
||||
os.waitpid(self.pid, os.WNOHANG)
|
||||
except (OSError, ChildProcessError, ProcessLookupError):
|
||||
pass
|
||||
@@ -0,0 +1,340 @@
|
||||
"""Steam library & game detection (M6, the Steam piece of D12 game detection).
|
||||
|
||||
Discovers a user's Steam installs, the library folders they've configured (Steam tracks
|
||||
them all in ``libraryfolders.vdf``, so multiple libraries on multiple drives are covered),
|
||||
and the games installed in each (one ``appmanifest_<appid>.acf`` per app). Stdlib only —
|
||||
no Steam tooling required, every probe degrades gracefully.
|
||||
|
||||
The set of libraries actually scanned is user-chosen (config ``steam_libraries``); nothing
|
||||
is scanned until the user opts a library in. Scan results are cached in ``games.json`` so the
|
||||
GUI can show the list instantly and the launch-time background scan can diff against it to
|
||||
flag newly-installed games.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ..config import GAMES_FILE, load_config
|
||||
|
||||
# Steam "apps" that aren't games: runtimes, Proton builds, redistributables. Filtered out of
|
||||
# scans by appid (known IDs) or by name prefix (covers future Proton/runtime versions).
|
||||
_TOOL_APPIDS = {
|
||||
"228980", # Steamworks Common Redistributables
|
||||
"1070560", # Steam Linux Runtime 1.0 (scout)
|
||||
"1391110", # Steam Linux Runtime 2.0 (soldier)
|
||||
"1628350", # Steam Linux Runtime 3.0 (sniper)
|
||||
"1493710", # Proton Experimental
|
||||
"2180100", # Proton Hotfix
|
||||
"1826330", # Proton EasyAntiCheat Runtime
|
||||
"1161040", # Proton BattlEye Runtime
|
||||
}
|
||||
_TOOL_NAME_PREFIXES = ("Proton", "Steam Linux Runtime", "Steamworks Common")
|
||||
|
||||
# Where Steam may be installed (native + Flatpak + Snap). Symlinks (~/.steam/steam) are
|
||||
# resolved and de-duplicated by real path.
|
||||
_ROOT_CANDIDATES = (
|
||||
"~/.steam/steam",
|
||||
"~/.steam/root",
|
||||
"~/.local/share/Steam",
|
||||
"~/.var/app/com.valvesoftware.Steam/data/Steam", # Flatpak
|
||||
"~/snap/steam/common/.local/share/Steam", # Snap
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SteamLibrary:
|
||||
path: str # the library root (contains a steamapps/ dir)
|
||||
label: str = "" # Steam's label for the folder, if any
|
||||
|
||||
|
||||
@dataclass
|
||||
class Game:
|
||||
appid: str
|
||||
name: str
|
||||
library: str # library path the game lives in
|
||||
installdir: str # folder name under <library>/steamapps/common
|
||||
size_bytes: int = 0
|
||||
last_updated: int = 0 # epoch seconds (acf LastUpdated), 0 if unknown
|
||||
|
||||
|
||||
# --- VDF (Valve Data Format) parsing --------------------------------------------------
|
||||
# Minimal text-VDF reader: quoted "key" "value" pairs and "key" { ... } nesting. Enough
|
||||
# for libraryfolders.vdf and appmanifest_*.acf; ignores #base/#include and unquoted tokens.
|
||||
|
||||
def _parse_vdf(text: str) -> dict:
|
||||
pos = 0
|
||||
n = len(text)
|
||||
|
||||
def skip_ws() -> None:
|
||||
nonlocal pos
|
||||
while pos < n:
|
||||
c = text[pos]
|
||||
if c in " \t\r\n":
|
||||
pos += 1
|
||||
elif c == "/" and pos + 1 < n and text[pos + 1] == "/": # // line comment
|
||||
while pos < n and text[pos] != "\n":
|
||||
pos += 1
|
||||
else:
|
||||
break
|
||||
|
||||
def read_string() -> str:
|
||||
nonlocal pos
|
||||
pos += 1 # opening quote
|
||||
out = []
|
||||
while pos < n:
|
||||
c = text[pos]
|
||||
if c == "\\" and pos + 1 < n:
|
||||
nxt = text[pos + 1]
|
||||
out.append({"n": "\n", "t": "\t", "\\": "\\", '"': '"'}.get(nxt, nxt))
|
||||
pos += 2
|
||||
continue
|
||||
if c == '"':
|
||||
pos += 1
|
||||
break
|
||||
out.append(c)
|
||||
pos += 1
|
||||
return "".join(out)
|
||||
|
||||
def parse_obj() -> dict:
|
||||
nonlocal pos
|
||||
obj: dict = {}
|
||||
while True:
|
||||
skip_ws()
|
||||
if pos >= n or text[pos] == "}":
|
||||
pos += 1 # consume closing brace (or run off the end)
|
||||
return obj
|
||||
if text[pos] != '"': # skip unquoted/unsupported tokens defensively
|
||||
pos += 1
|
||||
continue
|
||||
key = read_string()
|
||||
skip_ws()
|
||||
if pos < n and text[pos] == "{":
|
||||
pos += 1
|
||||
obj[key] = parse_obj()
|
||||
elif pos < n and text[pos] == '"':
|
||||
obj[key] = read_string()
|
||||
else: # malformed; bail on this key
|
||||
obj[key] = ""
|
||||
return obj
|
||||
|
||||
skip_ws()
|
||||
if pos < n and text[pos] == '"':
|
||||
root_key = read_string()
|
||||
skip_ws()
|
||||
if pos < n and text[pos] == "{":
|
||||
pos += 1
|
||||
return {root_key: parse_obj()}
|
||||
return {}
|
||||
|
||||
|
||||
def _read_vdf(path: Path) -> dict:
|
||||
try:
|
||||
return _parse_vdf(path.read_text(encoding="utf-8", errors="replace"))
|
||||
except OSError:
|
||||
return {}
|
||||
|
||||
|
||||
# --- discovery ------------------------------------------------------------------------
|
||||
|
||||
def steam_roots() -> list[Path]:
|
||||
"""Existing Steam install roots, de-duplicated by resolved path."""
|
||||
seen: set[Path] = set()
|
||||
roots: list[Path] = []
|
||||
for cand in _ROOT_CANDIDATES:
|
||||
p = Path(os.path.expanduser(cand))
|
||||
if not p.exists():
|
||||
continue
|
||||
real = p.resolve()
|
||||
if real in seen:
|
||||
continue
|
||||
seen.add(real)
|
||||
roots.append(real)
|
||||
return roots
|
||||
|
||||
|
||||
def _libraryfolders_vdf(root: Path) -> Path | None:
|
||||
for rel in ("steamapps/libraryfolders.vdf", "config/libraryfolders.vdf"):
|
||||
p = root / rel
|
||||
if p.exists():
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def discover_libraries() -> list[SteamLibrary]:
|
||||
"""Every Steam library folder configured on this machine, de-duplicated by real path.
|
||||
|
||||
Reads each install's ``libraryfolders.vdf`` (which lists all drives/folders), and
|
||||
always includes the install root itself as a fallback.
|
||||
"""
|
||||
seen: set[Path] = set()
|
||||
libs: list[SteamLibrary] = []
|
||||
|
||||
def add(path: Path, label: str = "") -> None:
|
||||
if not (path / "steamapps").is_dir():
|
||||
return
|
||||
real = path.resolve()
|
||||
if real in seen:
|
||||
return
|
||||
seen.add(real)
|
||||
libs.append(SteamLibrary(path=str(real), label=label))
|
||||
|
||||
for root in steam_roots():
|
||||
vdf = _libraryfolders_vdf(root)
|
||||
folders = _read_vdf(vdf).get("libraryfolders", {}) if vdf else {}
|
||||
if isinstance(folders, dict):
|
||||
for entry in folders.values():
|
||||
if isinstance(entry, dict) and entry.get("path"):
|
||||
add(Path(entry["path"]), entry.get("label", ""))
|
||||
add(root) # the install root is itself a library
|
||||
return libs
|
||||
|
||||
|
||||
# --- game scanning --------------------------------------------------------------------
|
||||
|
||||
def is_tool(appid: str, name: str) -> bool:
|
||||
"""True for non-game Steam apps (runtimes, Proton, redistributables)."""
|
||||
if appid in _TOOL_APPIDS:
|
||||
return True
|
||||
return name.startswith(_TOOL_NAME_PREFIXES)
|
||||
|
||||
|
||||
def scan_library(library: str) -> list[Game]:
|
||||
"""Games installed in one library, parsed from its appmanifest_*.acf files."""
|
||||
steamapps = Path(library) / "steamapps"
|
||||
games: list[Game] = []
|
||||
try:
|
||||
manifests = sorted(steamapps.glob("appmanifest_*.acf"))
|
||||
except OSError:
|
||||
return games
|
||||
for manifest in manifests:
|
||||
state = _read_vdf(manifest).get("AppState", {})
|
||||
if not isinstance(state, dict):
|
||||
continue
|
||||
# Steam treats VDF keys case-insensitively (e.g. "SizeOnDisk" but "lastupdated").
|
||||
state = {k.lower(): v for k, v in state.items()}
|
||||
appid = state.get("appid", "")
|
||||
name = state.get("name", "").strip()
|
||||
if not appid or not name or is_tool(appid, name):
|
||||
continue
|
||||
games.append(Game(
|
||||
appid=appid,
|
||||
name=name,
|
||||
library=str(library),
|
||||
installdir=state.get("installdir", ""),
|
||||
size_bytes=_int(state.get("sizeondisk")),
|
||||
last_updated=_int(state.get("lastupdated")),
|
||||
))
|
||||
return games
|
||||
|
||||
|
||||
def scan_games(libraries: list[str]) -> list[Game]:
|
||||
"""All games across the given libraries, de-duplicated by appid, sorted by name."""
|
||||
by_appid: dict[str, Game] = {}
|
||||
for lib in libraries:
|
||||
for game in scan_library(lib):
|
||||
by_appid.setdefault(game.appid, game) # first library wins on duplicates
|
||||
return sorted(by_appid.values(), key=lambda g: g.name.lower())
|
||||
|
||||
|
||||
def _int(value) -> int:
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
# --- config-driven selection ----------------------------------------------------------
|
||||
|
||||
def selected_library_paths(cfg: dict | None = None) -> list[str]:
|
||||
"""Library paths the user has opted in to scanning (config ``steam_libraries``)."""
|
||||
cfg = cfg or load_config()
|
||||
paths = cfg.get("steam_libraries") or []
|
||||
return [str(p) for p in paths]
|
||||
|
||||
|
||||
# --- scan cache + new-game detection --------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class ScanResult:
|
||||
games: list[Game]
|
||||
new_appids: list[str] # newly-installed since the last scan (badge fuel)
|
||||
scanned_at: float
|
||||
|
||||
|
||||
def load_cache() -> dict | None:
|
||||
try:
|
||||
return json.loads(GAMES_FILE.read_text())
|
||||
except (OSError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _save_cache(games: list[Game], known: set[str], new: list[str], when: float) -> None:
|
||||
GAMES_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {
|
||||
"scanned_at": when,
|
||||
"known_appids": sorted(known),
|
||||
"new_appids": new,
|
||||
"games": [asdict(g) for g in games],
|
||||
}
|
||||
GAMES_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
def cached_games() -> list[Game]:
|
||||
"""Games from the last scan (for instant display before a rescan finishes)."""
|
||||
cache = load_cache()
|
||||
if not cache:
|
||||
return []
|
||||
return [Game(**{k: g.get(k) for k in Game.__dataclass_fields__}) for g in cache.get("games", [])]
|
||||
|
||||
|
||||
def rescan(cfg: dict | None = None) -> ScanResult:
|
||||
"""Scan the selected libraries, diff against the cache, and persist the result.
|
||||
|
||||
Newly-installed games (appids never seen before) are reported in ``new_appids``. The
|
||||
very first scan reports nothing as new (so the whole library isn't flagged at once);
|
||||
unacknowledged new games carry forward until they're acknowledged or uninstalled.
|
||||
"""
|
||||
games = scan_games(selected_library_paths(cfg))
|
||||
current = {g.appid for g in games}
|
||||
prev = load_cache()
|
||||
if prev is None:
|
||||
known: set[str] = set(current) # first run: everything is "known", nothing new
|
||||
new = []
|
||||
else:
|
||||
known = set(prev.get("known_appids", []))
|
||||
carried = set(prev.get("new_appids", [])) & current # still-unacknowledged & installed
|
||||
new = sorted((current - known) | carried)
|
||||
known |= current
|
||||
when = time.time()
|
||||
_save_cache(games, known, new, when)
|
||||
return ScanResult(games=games, new_appids=new, scanned_at=when)
|
||||
|
||||
|
||||
def acknowledge_new() -> None:
|
||||
"""Clear the new-game badge (called when the user views the games list)."""
|
||||
cache = load_cache()
|
||||
if not cache or not cache.get("new_appids"):
|
||||
return
|
||||
cache["new_appids"] = []
|
||||
try:
|
||||
GAMES_FILE.write_text(json.dumps(cache, indent=2, ensure_ascii=False))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# --- formatting -----------------------------------------------------------------------
|
||||
|
||||
def human_size(num_bytes: int) -> str:
|
||||
if num_bytes <= 0:
|
||||
return "—"
|
||||
size = float(num_bytes)
|
||||
for unit in ("B", "KB", "MB", "GB", "TB"):
|
||||
if size < 1024 or unit == "TB":
|
||||
return f"{size:.0f} {unit}" if unit in ("B", "KB") else f"{size:.1f} {unit}"
|
||||
size /= 1024
|
||||
return f"{size:.1f} TB"
|
||||
@@ -0,0 +1,249 @@
|
||||
"""Games page (M6 in the GUI): pick Steam libraries and browse detected games.
|
||||
|
||||
Libraries are opt-in — the user checks which ones to scan. The list is loaded from the
|
||||
cache instantly, then a background rescan refreshes it and flags games installed since the
|
||||
last scan (a "NEW" badge here + a count on the sidebar nav).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from PySide6.QtCore import Qt, QTimer, Signal
|
||||
from PySide6.QtWidgets import (
|
||||
QCheckBox,
|
||||
QFrame,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QScrollArea,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from ..config import load_config, update_config
|
||||
from .theme import ACCENT, GOOD, MUTED
|
||||
|
||||
|
||||
def _game_row(name: str, sublabel: str, size: str, is_new: bool) -> QFrame:
|
||||
card = QFrame()
|
||||
card.setObjectName("Card")
|
||||
h = QHBoxLayout(card)
|
||||
h.setContentsMargins(16, 10, 16, 10)
|
||||
h.setSpacing(10)
|
||||
|
||||
left = QVBoxLayout()
|
||||
left.setSpacing(2)
|
||||
title = QLabel(name)
|
||||
title.setStyleSheet("font-weight: 600; background: transparent;")
|
||||
title.setWordWrap(True)
|
||||
left.addWidget(title)
|
||||
if sublabel:
|
||||
sub = QLabel(sublabel)
|
||||
sub.setObjectName("Muted")
|
||||
left.addWidget(sub)
|
||||
h.addLayout(left, 1)
|
||||
|
||||
if is_new:
|
||||
badge = QLabel("NEW")
|
||||
badge.setStyleSheet(
|
||||
f"color: {GOOD}; border: 1px solid {GOOD}; border-radius: 6px; "
|
||||
f"padding: 1px 6px; font-weight: 700; background: transparent;"
|
||||
)
|
||||
h.addWidget(badge, 0, Qt.AlignmentFlag.AlignVCenter)
|
||||
|
||||
size_label = QLabel(size)
|
||||
size_label.setObjectName("Muted")
|
||||
size_label.setMinimumWidth(80)
|
||||
size_label.setAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
|
||||
h.addWidget(size_label, 0)
|
||||
return card
|
||||
|
||||
|
||||
class GamesPage(QWidget):
|
||||
_libraries_ready = Signal(object) # list[dict(path, label, count, selected)]
|
||||
_scanned = Signal(object) # steam.ScanResult
|
||||
new_count_changed = Signal(int) # newly-installed game count (for the nav badge)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setObjectName("Page")
|
||||
self._libraries_ready.connect(self._render_libraries)
|
||||
self._scanned.connect(self._render_games)
|
||||
self._busy = False
|
||||
self._new_appids: set[str] = set()
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
root.setSpacing(16)
|
||||
|
||||
header = QHBoxLayout()
|
||||
title = QLabel("Games")
|
||||
title.setObjectName("PageTitle")
|
||||
header.addWidget(title)
|
||||
header.addStretch(1)
|
||||
self._status = QLabel("")
|
||||
self._status.setObjectName("Muted")
|
||||
header.addWidget(self._status)
|
||||
self._rescan_btn = QPushButton("Rescan")
|
||||
self._rescan_btn.setObjectName("PrimaryButton")
|
||||
self._rescan_btn.clicked.connect(self.refresh)
|
||||
header.addWidget(self._rescan_btn)
|
||||
root.addLayout(header)
|
||||
|
||||
# Libraries (opt-in checkboxes)
|
||||
lib_card = QFrame()
|
||||
lib_card.setObjectName("Card")
|
||||
lib_v = QVBoxLayout(lib_card)
|
||||
lib_v.setContentsMargins(16, 12, 16, 12)
|
||||
lib_v.setSpacing(6)
|
||||
lib_head = QLabel("Steam libraries")
|
||||
lib_head.setStyleSheet("font-weight: 700; background: transparent;")
|
||||
lib_v.addWidget(lib_head)
|
||||
self._lib_box = QVBoxLayout()
|
||||
self._lib_box.setSpacing(6)
|
||||
lib_v.addLayout(self._lib_box)
|
||||
self._lib_hint = QLabel("Looking for Steam libraries…")
|
||||
self._lib_hint.setObjectName("Muted")
|
||||
self._lib_hint.setWordWrap(True)
|
||||
lib_v.addWidget(self._lib_hint)
|
||||
root.addWidget(lib_card)
|
||||
|
||||
# Games list
|
||||
scroll = QScrollArea()
|
||||
scroll.setWidgetResizable(True)
|
||||
scroll.setFrameShape(QFrame.Shape.NoFrame)
|
||||
scroll.setStyleSheet("background: transparent;")
|
||||
self._container = QWidget()
|
||||
self._list = QVBoxLayout(self._container)
|
||||
self._list.setContentsMargins(0, 0, 0, 0)
|
||||
self._list.setSpacing(8)
|
||||
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
|
||||
scroll.setWidget(self._container)
|
||||
root.addWidget(scroll, 1)
|
||||
|
||||
self._load_cached() # instant display from the last scan
|
||||
QTimer.singleShot(400, self.refresh) # then rescan in the background on launch
|
||||
|
||||
# --- loading ----------------------------------------------------------------------
|
||||
|
||||
def _load_cached(self) -> None:
|
||||
from ..core import steam
|
||||
|
||||
cache = steam.load_cache() or {}
|
||||
self._new_appids = set(cache.get("new_appids", []))
|
||||
games = steam.cached_games()
|
||||
if games:
|
||||
self._populate_games(games, self._new_appids)
|
||||
self.new_count_changed.emit(len(self._new_appids))
|
||||
|
||||
def refresh(self) -> None:
|
||||
if self._busy:
|
||||
return
|
||||
self._busy = True
|
||||
self._rescan_btn.setEnabled(False)
|
||||
self._status.setText("Scanning Steam libraries…")
|
||||
threading.Thread(target=self._work, daemon=True).start()
|
||||
|
||||
def _work(self) -> None:
|
||||
from ..core import steam
|
||||
|
||||
try:
|
||||
selected = {os.path.realpath(p) for p in steam.selected_library_paths()}
|
||||
libs = [
|
||||
{"path": lib.path, "label": lib.label, "selected": lib.path in selected,
|
||||
"count": len(steam.scan_library(lib.path))}
|
||||
for lib in steam.discover_libraries()
|
||||
]
|
||||
self._libraries_ready.emit(libs)
|
||||
self._scanned.emit(steam.rescan())
|
||||
except Exception:
|
||||
self._scanned.emit(None)
|
||||
|
||||
# --- rendering --------------------------------------------------------------------
|
||||
|
||||
def _render_libraries(self, libs) -> None:
|
||||
while self._lib_box.count():
|
||||
item = self._lib_box.takeAt(0)
|
||||
w = item.widget()
|
||||
if w is not None:
|
||||
w.deleteLater()
|
||||
if not libs:
|
||||
self._lib_hint.setText("No Steam libraries detected. Is Steam installed?")
|
||||
self._lib_hint.show()
|
||||
return
|
||||
self._lib_hint.hide()
|
||||
for lib in libs:
|
||||
label = lib["path"]
|
||||
if lib["label"]:
|
||||
label += f" [{lib['label']}]"
|
||||
cb = QCheckBox(f"{label} · {lib['count']} games")
|
||||
cb.setChecked(lib["selected"])
|
||||
cb.toggled.connect(lambda checked, p=lib["path"]: self._toggle_library(p, checked))
|
||||
self._lib_box.addWidget(cb)
|
||||
|
||||
def _toggle_library(self, path: str, checked: bool) -> None:
|
||||
selected = {os.path.realpath(p) for p in (load_config().get("steam_libraries") or [])}
|
||||
if checked:
|
||||
selected.add(os.path.realpath(path))
|
||||
else:
|
||||
selected.discard(os.path.realpath(path))
|
||||
update_config(steam_libraries=sorted(selected))
|
||||
self.refresh()
|
||||
|
||||
def _render_games(self, result) -> None:
|
||||
self._busy = False
|
||||
self._rescan_btn.setEnabled(True)
|
||||
if result is None:
|
||||
self._status.setText("scan failed")
|
||||
return
|
||||
self._new_appids = set(result.new_appids)
|
||||
self._populate_games(result.games, self._new_appids)
|
||||
new = len(self._new_appids)
|
||||
suffix = f" · {new} new" if new else ""
|
||||
self._status.setText(
|
||||
f"{len(result.games)} games · {time.strftime('%H:%M:%S')}{suffix}"
|
||||
)
|
||||
self.new_count_changed.emit(new)
|
||||
|
||||
def _populate_games(self, games, new_appids: set[str]) -> None:
|
||||
from ..core import steam
|
||||
|
||||
while self._list.count():
|
||||
item = self._list.takeAt(0)
|
||||
w = item.widget()
|
||||
if w is not None:
|
||||
w.deleteLater()
|
||||
|
||||
if not games:
|
||||
empty = QLabel(
|
||||
"No games to show yet — check a Steam library above to scan it for games."
|
||||
)
|
||||
empty.setObjectName("Muted")
|
||||
empty.setWordWrap(True)
|
||||
self._list.addWidget(empty)
|
||||
self._list.addStretch(1)
|
||||
return
|
||||
|
||||
for g in games:
|
||||
self._list.addWidget(_game_row(
|
||||
g.name,
|
||||
os.path.basename(g.library.rstrip("/")) or g.library,
|
||||
steam.human_size(g.size_bytes),
|
||||
g.appid in new_appids,
|
||||
))
|
||||
self._list.addStretch(1)
|
||||
|
||||
# --- nav badge integration --------------------------------------------------------
|
||||
|
||||
def showEvent(self, event) -> None: # noqa: N802 (Qt override)
|
||||
# Viewing the list acknowledges the new games: clear the sidebar badge. The NEW
|
||||
# tags stay on the rows for this session so the user can still spot them.
|
||||
super().showEvent(event)
|
||||
if self._new_appids:
|
||||
from ..core import steam
|
||||
|
||||
threading.Thread(target=steam.acknowledge_new, daemon=True).start()
|
||||
self.new_count_changed.emit(0)
|
||||
@@ -1,145 +0,0 @@
|
||||
"""Inventory page (M5 in the GUI): system inventory with copy/save + admin re-collect."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, QTimer, Signal
|
||||
from PySide6.QtWidgets import (
|
||||
QApplication,
|
||||
QFileDialog,
|
||||
QFrame,
|
||||
QGridLayout,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QScrollArea,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from ..core import inventory
|
||||
from .theme import MUTED
|
||||
|
||||
|
||||
def _section_card(section) -> QFrame:
|
||||
card = QFrame()
|
||||
card.setObjectName("Card")
|
||||
layout = QVBoxLayout(card)
|
||||
layout.setContentsMargins(16, 12, 16, 12)
|
||||
layout.setSpacing(6)
|
||||
title = QLabel(section.title)
|
||||
title.setStyleSheet("font-weight: 700; background: transparent;")
|
||||
layout.addWidget(title)
|
||||
grid = QGridLayout()
|
||||
grid.setColumnStretch(1, 1)
|
||||
grid.setHorizontalSpacing(14)
|
||||
grid.setVerticalSpacing(4)
|
||||
for row, (key, value) in enumerate(section.items):
|
||||
k = QLabel(key)
|
||||
k.setObjectName("Muted")
|
||||
v = QLabel(value)
|
||||
v.setWordWrap(True)
|
||||
v.setStyleSheet("background: transparent;")
|
||||
grid.addWidget(k, row, 0)
|
||||
grid.addWidget(v, row, 1)
|
||||
layout.addLayout(grid)
|
||||
return card
|
||||
|
||||
|
||||
class InventoryPage(QWidget):
|
||||
_result = Signal(object) # list[Section]
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setObjectName("Page")
|
||||
self._sections: list = []
|
||||
self._result.connect(self._render)
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
root.setSpacing(16)
|
||||
|
||||
header = QHBoxLayout()
|
||||
title = QLabel("Inventory")
|
||||
title.setObjectName("PageTitle")
|
||||
header.addWidget(title)
|
||||
header.addStretch(1)
|
||||
self._status = QLabel("")
|
||||
self._status.setObjectName("Muted")
|
||||
header.addWidget(self._status)
|
||||
self._copy_btn = QPushButton("Copy Markdown")
|
||||
self._copy_btn.clicked.connect(self._copy)
|
||||
header.addWidget(self._copy_btn)
|
||||
self._save_btn = QPushButton("Save…")
|
||||
self._save_btn.clicked.connect(self._save)
|
||||
header.addWidget(self._save_btn)
|
||||
self._refresh_btn = QPushButton("Refresh")
|
||||
self._refresh_btn.setObjectName("PrimaryButton")
|
||||
self._refresh_btn.clicked.connect(self._run)
|
||||
header.addWidget(self._refresh_btn)
|
||||
root.addLayout(header)
|
||||
|
||||
scroll = QScrollArea()
|
||||
scroll.setWidgetResizable(True)
|
||||
scroll.setFrameShape(QFrame.Shape.NoFrame)
|
||||
scroll.setStyleSheet("background: transparent;")
|
||||
self._container = QWidget()
|
||||
self._list = QVBoxLayout(self._container)
|
||||
self._list.setContentsMargins(0, 0, 0, 0)
|
||||
self._list.setSpacing(12)
|
||||
self._list.setAlignment(Qt.AlignmentFlag.AlignTop)
|
||||
scroll.setWidget(self._container)
|
||||
root.addWidget(scroll, 1)
|
||||
|
||||
QTimer.singleShot(300, self._run)
|
||||
|
||||
def _run(self) -> None:
|
||||
self._busy("Collecting…")
|
||||
threading.Thread(target=self._work, daemon=True).start()
|
||||
|
||||
def _work(self) -> None:
|
||||
try:
|
||||
sections = inventory.collect()
|
||||
except Exception:
|
||||
sections = []
|
||||
self._result.emit(sections)
|
||||
|
||||
def _busy(self, text: str) -> None:
|
||||
self._status.setText(text)
|
||||
for b in (self._refresh_btn, self._copy_btn, self._save_btn):
|
||||
b.setEnabled(False)
|
||||
|
||||
def _render(self, sections) -> None:
|
||||
self._refresh_btn.setEnabled(True)
|
||||
self._copy_btn.setEnabled(True)
|
||||
self._save_btn.setEnabled(True)
|
||||
if sections is None: # collection failed — keep current
|
||||
self._status.setText("collection failed")
|
||||
return
|
||||
|
||||
self._sections = sections
|
||||
while self._list.count():
|
||||
item = self._list.takeAt(0)
|
||||
w = item.widget()
|
||||
if w is not None:
|
||||
w.deleteLater()
|
||||
for section in sections:
|
||||
self._list.addWidget(_section_card(section))
|
||||
self._list.addStretch(1)
|
||||
self._status.setText("")
|
||||
|
||||
def _copy(self) -> None:
|
||||
if self._sections:
|
||||
QApplication.clipboard().setText(inventory.render_markdown(self._sections))
|
||||
self._status.setText("copied as Markdown")
|
||||
|
||||
def _save(self) -> None:
|
||||
if not self._sections:
|
||||
return
|
||||
path, _ = QFileDialog.getSaveFileName(self, "Save inventory", "rigdoctor-inventory.md", "Markdown (*.md)")
|
||||
if path:
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(inventory.render_markdown(self._sections))
|
||||
self._status.setText(f"saved {os.path.basename(path)}")
|
||||
@@ -28,8 +28,8 @@ from .. import __version__
|
||||
from ..config import load_config
|
||||
from ..core import alerts, elevation, updates
|
||||
from .dashboard import Dashboard
|
||||
from .games_page import GamesPage
|
||||
from .health_page import HealthPage
|
||||
from .inventory_page import InventoryPage
|
||||
from .notifications_page import NotificationsPage
|
||||
from .recorder_page import RecorderPage
|
||||
from .setup_page import SetupPage
|
||||
@@ -37,7 +37,7 @@ from .share_page import SharePage
|
||||
from .theme import ACCENT, GOOD, MUTED
|
||||
from .worker import SamplerWorker
|
||||
|
||||
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Setup", "Inventory", "Notifications", "Share"]
|
||||
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Games", "Setup", "Notifications", "Share"]
|
||||
|
||||
|
||||
class MainWindow(QMainWindow):
|
||||
@@ -67,16 +67,17 @@ class MainWindow(QMainWindow):
|
||||
self.dashboard = Dashboard()
|
||||
self.recorder_page = RecorderPage()
|
||||
self.health_page = HealthPage()
|
||||
self.games_page = GamesPage()
|
||||
self.games_page.new_count_changed.connect(self._set_games_badge)
|
||||
self.setup_page = SetupPage()
|
||||
self.inventory_page = InventoryPage()
|
||||
self.notifications_page = NotificationsPage()
|
||||
self.notifications_page.changed.connect(self._apply_alert_settings)
|
||||
self.share_page = SharePage()
|
||||
self._stack.addWidget(self.dashboard) # 0 Dashboard
|
||||
self._stack.addWidget(self.recorder_page) # 1 Logs
|
||||
self._stack.addWidget(self.health_page) # 2 Health
|
||||
self._stack.addWidget(self.setup_page) # 3 Setup
|
||||
self._stack.addWidget(self.inventory_page) # 4 Inventory
|
||||
self._stack.addWidget(self.games_page) # 3 Games
|
||||
self._stack.addWidget(self.setup_page) # 4 Setup
|
||||
self._stack.addWidget(self.notifications_page) # 5 Notifications
|
||||
self._stack.addWidget(self.share_page) # 6 Share
|
||||
content_layout.addWidget(self._stack)
|
||||
@@ -138,6 +139,7 @@ class MainWindow(QMainWindow):
|
||||
|
||||
group = QButtonGroup(self)
|
||||
group.setExclusive(True)
|
||||
self._nav_buttons: dict[str, QPushButton] = {}
|
||||
for i, name in enumerate(_NAV_ITEMS):
|
||||
btn = QPushButton(name)
|
||||
btn.setObjectName("NavButton")
|
||||
@@ -147,6 +149,7 @@ class MainWindow(QMainWindow):
|
||||
btn.clicked.connect(lambda _checked, idx=i: self._stack.setCurrentIndex(idx))
|
||||
group.addButton(btn, i)
|
||||
v.addWidget(btn)
|
||||
self._nav_buttons[name] = btn
|
||||
|
||||
v.addStretch(1)
|
||||
live = QLabel(f'<span style="color:{ACCENT};">●</span> <span style="color:{MUTED};">Live</span>')
|
||||
@@ -228,9 +231,14 @@ class MainWindow(QMainWindow):
|
||||
self._elevated.emit()
|
||||
|
||||
def _on_elevated(self) -> None:
|
||||
# Re-run Health and Inventory now that root-only data (SMART/dmidecode) is available.
|
||||
# Re-run Health now that root-only SMART data is available. (dmidecode is still
|
||||
# collected and used by the relay guest view + the CLI `rigdoctor inventory`.)
|
||||
self.health_page._run()
|
||||
self.inventory_page._run()
|
||||
|
||||
def _set_games_badge(self, count: int) -> None:
|
||||
btn = self._nav_buttons.get("Games")
|
||||
if btn is not None:
|
||||
btn.setText(f"Games ● {count}" if count > 0 else "Games")
|
||||
|
||||
def _apply_alert_settings(self) -> None:
|
||||
cfg = load_config()
|
||||
|
||||
+173
-50
@@ -1,12 +1,20 @@
|
||||
"""Share page (M12, Tier 2 over the relay): host or join a read-only shared session."""
|
||||
"""Share page (M12): host or join a shared session over the relay.
|
||||
|
||||
Guest sees the host's live sensors + health + inventory (read-only). If the host enables it,
|
||||
a full **PTY terminal** is shared: the guest types and the commands run on the host (as the
|
||||
host's user), the host reads along, and the host can type too — e.g. a sudo password, which
|
||||
stays local and is never sent to the guest.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
|
||||
from PySide6.QtCore import Qt, QTimer, QUrl
|
||||
from PySide6.QtCore import Qt, QSocketNotifier, QTimer, QUrl
|
||||
from PySide6.QtWebSockets import QWebSocket
|
||||
from PySide6.QtWidgets import (
|
||||
QCheckBox,
|
||||
QFrame,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
@@ -19,14 +27,20 @@ from PySide6.QtWidgets import (
|
||||
|
||||
from ..config import load_config, load_token
|
||||
from ..core import share
|
||||
from ..core.pty_session import PtySession
|
||||
from ..core.sampler import Sampler
|
||||
from ..core.sources import available_sources
|
||||
from .terminal_widget import TerminalView
|
||||
|
||||
|
||||
def _relay_url() -> str:
|
||||
return load_config().get("relay_url", "wss://rigdoctor.jesseyvanofferen.com").rstrip("/")
|
||||
|
||||
|
||||
def _b64(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode("ascii")
|
||||
|
||||
|
||||
def _card(title: str) -> tuple[QFrame, QVBoxLayout]:
|
||||
card = QFrame()
|
||||
card.setObjectName("Card")
|
||||
@@ -46,6 +60,8 @@ class SharePage(QWidget):
|
||||
self._sampler = Sampler(available_sources())
|
||||
self._host_ws: QWebSocket | None = None
|
||||
self._guest_ws: QWebSocket | None = None
|
||||
self._pty: PtySession | None = None
|
||||
self._pty_notifier: QSocketNotifier | None = None
|
||||
self._last_report = None
|
||||
self._last_inv = None
|
||||
self._timer = QTimer(self)
|
||||
@@ -54,18 +70,22 @@ class SharePage(QWidget):
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
root.setSpacing(16)
|
||||
root.setSpacing(14)
|
||||
title = QLabel("Share")
|
||||
title.setObjectName("PageTitle")
|
||||
root.addWidget(title)
|
||||
root.addWidget(self._build_host())
|
||||
root.addWidget(self._build_guest(), 1)
|
||||
|
||||
# Host
|
||||
host_card, hv = _card("Start a shared session")
|
||||
# ------------------------------------------------------------------ host
|
||||
def _build_host(self) -> QFrame:
|
||||
card, v = _card("Start a shared session")
|
||||
self._host_status = QLabel("Let someone with an account view your machine, read-only.")
|
||||
self._host_status.setObjectName("Muted")
|
||||
self._host_status.setWordWrap(True)
|
||||
hv.addWidget(self._host_status)
|
||||
hrow = QHBoxLayout()
|
||||
v.addWidget(self._host_status)
|
||||
|
||||
row = QHBoxLayout()
|
||||
self._start_btn = QPushButton("Start shared session")
|
||||
self._start_btn.setObjectName("PrimaryButton")
|
||||
self._start_btn.clicked.connect(self._start_host)
|
||||
@@ -75,45 +95,25 @@ class SharePage(QWidget):
|
||||
self._code_label = QLabel("")
|
||||
self._code_label.setStyleSheet("font-weight:700; font-size:18px; color:#38bdf8; background:transparent;")
|
||||
self._code_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
|
||||
hrow.addWidget(self._start_btn)
|
||||
hrow.addWidget(self._stop_btn)
|
||||
hrow.addSpacing(12)
|
||||
hrow.addWidget(self._code_label)
|
||||
hrow.addStretch(1)
|
||||
hv.addLayout(hrow)
|
||||
root.addWidget(host_card)
|
||||
row.addWidget(self._start_btn)
|
||||
row.addWidget(self._stop_btn)
|
||||
row.addSpacing(12)
|
||||
row.addWidget(self._code_label)
|
||||
row.addStretch(1)
|
||||
v.addLayout(row)
|
||||
|
||||
# Guest
|
||||
guest_card, gv = _card("Join a shared session")
|
||||
grow = QHBoxLayout()
|
||||
self._code_input = QLineEdit()
|
||||
self._code_input.setPlaceholderText("Enter share code")
|
||||
self._code_input.setMaxLength(6)
|
||||
self._code_input.setFixedWidth(160)
|
||||
self._join_btn = QPushButton("Join")
|
||||
self._join_btn.setObjectName("PrimaryButton")
|
||||
self._join_btn.clicked.connect(self._join)
|
||||
self._leave_btn = QPushButton("Leave")
|
||||
self._leave_btn.setEnabled(False)
|
||||
self._leave_btn.clicked.connect(self._leave)
|
||||
grow.addWidget(self._code_input)
|
||||
grow.addWidget(self._join_btn)
|
||||
grow.addWidget(self._leave_btn)
|
||||
grow.addStretch(1)
|
||||
gv.addLayout(grow)
|
||||
self._guest_status = QLabel("")
|
||||
self._guest_status.setObjectName("Muted")
|
||||
gv.addWidget(self._guest_status)
|
||||
root.addWidget(guest_card)
|
||||
self._allow_term = QCheckBox("Allow remote terminal — the guest runs commands as your user (you read along; you can type too, e.g. a sudo password)")
|
||||
self._allow_term.setStyleSheet("color:#fb923c; background:transparent;")
|
||||
self._allow_term.toggled.connect(self._toggle_terminal)
|
||||
v.addWidget(self._allow_term)
|
||||
|
||||
self._view = QTextEdit()
|
||||
self._view.setObjectName("Report")
|
||||
self._view.setReadOnly(True)
|
||||
self._view.setVisible(False)
|
||||
root.addWidget(self._view, 1)
|
||||
root.addStretch(0)
|
||||
self._host_term = TerminalView()
|
||||
self._host_term.keys.connect(lambda b: self._pty.write(b) if self._pty else None)
|
||||
self._host_term.resized.connect(lambda r, c: self._pty.set_size(r, c) if self._pty else None)
|
||||
self._host_term.setVisible(False)
|
||||
v.addWidget(self._host_term)
|
||||
return card
|
||||
|
||||
# --- host ---------------------------------------------------------------
|
||||
def _start_host(self) -> None:
|
||||
if not load_token():
|
||||
self._host_status.setText("Set a Gitea access token in Setup → Account access first.")
|
||||
@@ -135,13 +135,69 @@ class SharePage(QWidget):
|
||||
if data.get("error"):
|
||||
self._host_status.setText(f"Rejected: {data['error']}")
|
||||
return
|
||||
if "code" in data:
|
||||
if "code" in data: # relay handshake
|
||||
self._code_label.setText(data["code"])
|
||||
self._host_status.setText(f"Sharing as {data.get('user', '?')} — give this code to whoever should view your machine.")
|
||||
self._stop_btn.setEnabled(True)
|
||||
self._host_ws.sendTextMessage(share.host_full_frame(self._sampler))
|
||||
self._send_terminal_state()
|
||||
if self._allow_term.isChecked():
|
||||
self._start_pty()
|
||||
self._timer.start()
|
||||
# guest input also arrives here; ignored (read-only session)
|
||||
return
|
||||
kind = data.get("type") # frames forwarded from a guest
|
||||
if kind == "req_full":
|
||||
# A guest just joined — send a full frame AND the current terminal state, so a
|
||||
# guest that joins *after* the host enabled the terminal still gets access.
|
||||
self._host_ws.sendTextMessage(share.host_full_frame(self._sampler))
|
||||
self._send_terminal_state()
|
||||
elif kind == "pty_in" and self._pty:
|
||||
self._pty.write(base64.b64decode(data["data"]))
|
||||
elif kind == "pty_resize" and self._pty:
|
||||
self._pty.set_size(int(data["rows"]), int(data["cols"]))
|
||||
|
||||
def _toggle_terminal(self, on: bool) -> None:
|
||||
if on and self._host_ws and self._code_label.text():
|
||||
self._start_pty()
|
||||
elif not on:
|
||||
self._stop_pty()
|
||||
self._send_terminal_state()
|
||||
|
||||
def _send_terminal_state(self) -> None:
|
||||
if self._host_ws and self._code_label.text():
|
||||
self._host_ws.sendTextMessage(json.dumps({"type": "terminal", "enabled": self._allow_term.isChecked()}))
|
||||
|
||||
def _start_pty(self) -> None:
|
||||
if self._pty:
|
||||
return
|
||||
rows, cols = self._host_term.grid()
|
||||
self._pty = PtySession(rows=rows, cols=cols)
|
||||
self._pty_notifier = QSocketNotifier(self._pty.master_fd, QSocketNotifier.Type.Read, self)
|
||||
self._pty_notifier.activated.connect(self._on_pty_output)
|
||||
self._host_term.reset()
|
||||
self._host_term.setVisible(True)
|
||||
|
||||
def _on_pty_output(self) -> None:
|
||||
if not self._pty:
|
||||
return
|
||||
data = self._pty.read()
|
||||
if not data: # shell exited / EOF
|
||||
self._stop_pty()
|
||||
self._send_terminal_state()
|
||||
self._allow_term.setChecked(False)
|
||||
return
|
||||
self._host_term.feed(data)
|
||||
if self._host_ws:
|
||||
self._host_ws.sendTextMessage(json.dumps({"type": "pty", "data": _b64(data)}))
|
||||
|
||||
def _stop_pty(self) -> None:
|
||||
if self._pty_notifier:
|
||||
self._pty_notifier.setEnabled(False)
|
||||
self._pty_notifier = None
|
||||
if self._pty:
|
||||
self._pty.close()
|
||||
self._pty = None
|
||||
self._host_term.setVisible(False)
|
||||
|
||||
def _stream(self) -> None:
|
||||
if self._host_ws:
|
||||
@@ -149,6 +205,7 @@ class SharePage(QWidget):
|
||||
|
||||
def _stop_host(self) -> None:
|
||||
self._timer.stop()
|
||||
self._stop_pty()
|
||||
if self._host_ws:
|
||||
self._host_ws.close()
|
||||
self._host_ws = None
|
||||
@@ -159,13 +216,54 @@ class SharePage(QWidget):
|
||||
|
||||
def _host_closed(self) -> None:
|
||||
self._timer.stop()
|
||||
self._stop_pty()
|
||||
self._start_btn.setEnabled(True)
|
||||
self._stop_btn.setEnabled(False)
|
||||
if self._code_label.text():
|
||||
self._code_label.setText("")
|
||||
self._host_status.setText("Disconnected from the relay.")
|
||||
|
||||
# --- guest --------------------------------------------------------------
|
||||
# ----------------------------------------------------------------- guest
|
||||
def _build_guest(self) -> QFrame:
|
||||
card, v = _card("Join a shared session")
|
||||
row = QHBoxLayout()
|
||||
self._code_input = QLineEdit()
|
||||
self._code_input.setPlaceholderText("Enter share code")
|
||||
self._code_input.setMaxLength(6)
|
||||
self._code_input.setFixedWidth(160)
|
||||
self._join_btn = QPushButton("Join")
|
||||
self._join_btn.setObjectName("PrimaryButton")
|
||||
self._join_btn.clicked.connect(self._join)
|
||||
self._leave_btn = QPushButton("Leave")
|
||||
self._leave_btn.setEnabled(False)
|
||||
self._leave_btn.clicked.connect(self._leave)
|
||||
row.addWidget(self._code_input)
|
||||
row.addWidget(self._join_btn)
|
||||
row.addWidget(self._leave_btn)
|
||||
row.addStretch(1)
|
||||
v.addLayout(row)
|
||||
self._guest_status = QLabel("")
|
||||
self._guest_status.setObjectName("Muted")
|
||||
v.addWidget(self._guest_status)
|
||||
|
||||
self._view = QTextEdit()
|
||||
self._view.setObjectName("Report")
|
||||
self._view.setReadOnly(True)
|
||||
self._view.setVisible(False)
|
||||
self._view.setMinimumHeight(200)
|
||||
v.addWidget(self._view)
|
||||
|
||||
self._term_label = QLabel("")
|
||||
self._term_label.setObjectName("Muted")
|
||||
self._term_label.setVisible(False)
|
||||
v.addWidget(self._term_label)
|
||||
self._guest_term = TerminalView()
|
||||
self._guest_term.keys.connect(self._guest_key)
|
||||
self._guest_term.resized.connect(self._guest_resize)
|
||||
self._guest_term.setVisible(False)
|
||||
v.addWidget(self._guest_term)
|
||||
return card
|
||||
|
||||
def _join(self) -> None:
|
||||
code = self._code_input.text().strip().upper()
|
||||
if not load_token():
|
||||
@@ -195,19 +293,43 @@ class SharePage(QWidget):
|
||||
self._guest_status.setText(f"Viewing {data.get('host', '?')}'s machine — read-only.")
|
||||
self._leave_btn.setEnabled(True)
|
||||
self._view.setVisible(True)
|
||||
self._guest_ws.sendTextMessage(json.dumps({"type": "req_full"}))
|
||||
return
|
||||
kind = data.get("type")
|
||||
if kind == "full":
|
||||
self._last_report = data.get("report")
|
||||
self._last_inv = data.get("inventory")
|
||||
if kind in ("full", "snapshot"):
|
||||
if kind == "full":
|
||||
self._last_report = data.get("report")
|
||||
self._last_inv = data.get("inventory")
|
||||
self._view.setHtml(share.guest_html(data.get("snapshot"), self._last_report, self._last_inv))
|
||||
elif kind == "terminal":
|
||||
self._set_terminal_visible(bool(data.get("enabled")))
|
||||
elif kind == "pty":
|
||||
self._guest_term.feed(base64.b64decode(data["data"]))
|
||||
|
||||
def _set_terminal_visible(self, enabled: bool) -> None:
|
||||
self._term_label.setVisible(True)
|
||||
self._term_label.setText("Terminal enabled by host — your keystrokes run on their machine. Click here and type."
|
||||
if enabled else "Terminal not enabled by the host.")
|
||||
self._guest_term.setVisible(enabled)
|
||||
if enabled:
|
||||
self._guest_term.reset()
|
||||
self._guest_resize(*self._guest_term.grid())
|
||||
self._guest_term.setFocus()
|
||||
|
||||
def _guest_key(self, data: bytes) -> None:
|
||||
if self._guest_ws:
|
||||
self._guest_ws.sendTextMessage(json.dumps({"type": "pty_in", "data": _b64(data)}))
|
||||
|
||||
def _guest_resize(self, rows: int, cols: int) -> None:
|
||||
if self._guest_ws:
|
||||
self._guest_ws.sendTextMessage(json.dumps({"type": "pty_resize", "rows": rows, "cols": cols}))
|
||||
|
||||
def _leave(self) -> None:
|
||||
if self._guest_ws:
|
||||
self._guest_ws.close()
|
||||
self._guest_ws = None
|
||||
self._view.setVisible(False)
|
||||
for w in (self._view, self._term_label, self._guest_term):
|
||||
w.setVisible(False)
|
||||
self._leave_btn.setEnabled(False)
|
||||
self._join_btn.setEnabled(True)
|
||||
self._guest_status.setText("Left the session.")
|
||||
@@ -220,6 +342,7 @@ class SharePage(QWidget):
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._timer.stop()
|
||||
self._stop_pty()
|
||||
for ws in (self._host_ws, self._guest_ws):
|
||||
if ws:
|
||||
ws.close()
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
"""A minimal terminal view: renders PTY output via pyte and emits keystrokes (M12, Tier 3).
|
||||
|
||||
Used by both sides of a shared session — the host (mirrors its local PTY, can also type, e.g.
|
||||
a sudo password) and the guest (renders the streamed PTY, sends keystrokes). Monochrome for
|
||||
now; cursor addressing / layout (vim, top) work via pyte.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pyte
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtGui import QFontDatabase, QFontMetrics, QTextCursor
|
||||
from PySide6.QtWidgets import QPlainTextEdit
|
||||
|
||||
|
||||
class TerminalView(QPlainTextEdit):
|
||||
keys = Signal(bytes) # user keystrokes -> bytes for the PTY
|
||||
resized = Signal(int, int) # rows, cols
|
||||
|
||||
def __init__(self, rows: int = 24, cols: int = 80):
|
||||
super().__init__()
|
||||
self.setLineWrapMode(QPlainTextEdit.LineWrapMode.NoWrap)
|
||||
self.setFont(QFontDatabase.systemFont(QFontDatabase.SystemFont.FixedFont))
|
||||
self.setUndoRedoEnabled(False)
|
||||
self.setMinimumHeight(260)
|
||||
self._rows, self._cols = rows, cols
|
||||
self._screen = pyte.HistoryScreen(cols, rows, history=1000, ratio=0.5)
|
||||
self._stream = pyte.ByteStream(self._screen)
|
||||
|
||||
def grid(self) -> tuple[int, int]:
|
||||
return self._rows, self._cols
|
||||
|
||||
def feed(self, data: bytes) -> None:
|
||||
self._stream.feed(data)
|
||||
self._render()
|
||||
|
||||
def reset(self) -> None:
|
||||
self._screen.reset()
|
||||
self._render()
|
||||
|
||||
def _row_text(self, row) -> str:
|
||||
return "".join(row[x].data for x in range(self._cols)).rstrip()
|
||||
|
||||
def _render(self) -> None:
|
||||
bar = self.verticalScrollBar()
|
||||
at_bottom = bar.value() >= bar.maximum() - 2
|
||||
prev = bar.value()
|
||||
history = [self._row_text(r) for r in self._screen.history.top] # scrollback
|
||||
self.setPlainText("\n".join(history + list(self._screen.display)))
|
||||
if at_bottom: # follow output; place caret at the real (row, col)
|
||||
cursor = self.textCursor()
|
||||
cursor.movePosition(QTextCursor.MoveOperation.Start)
|
||||
cursor.movePosition(QTextCursor.MoveOperation.Down, QTextCursor.MoveMode.MoveAnchor, len(history) + self._screen.cursor.y)
|
||||
cursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.MoveAnchor, self._screen.cursor.x)
|
||||
self.setTextCursor(cursor)
|
||||
self.ensureCursorVisible()
|
||||
else: # user scrolled up to read — keep their place
|
||||
bar.setValue(prev)
|
||||
|
||||
def resizeEvent(self, event): # noqa: N802 (Qt override)
|
||||
super().resizeEvent(event)
|
||||
fm = QFontMetrics(self.font())
|
||||
cw = max(1, fm.horizontalAdvance("M"))
|
||||
ch = max(1, fm.height())
|
||||
cols = max(20, self.viewport().width() // cw)
|
||||
rows = max(6, self.viewport().height() // ch)
|
||||
if (rows, cols) != (self._rows, self._cols):
|
||||
self._rows, self._cols = rows, cols
|
||||
self._screen.resize(rows, cols)
|
||||
self._render()
|
||||
self.resized.emit(rows, cols)
|
||||
|
||||
def keyPressEvent(self, event): # noqa: N802 (Qt override)
|
||||
data = self._translate(event)
|
||||
if data:
|
||||
self.keys.emit(data)
|
||||
event.accept() # display comes from PTY output, not local editing
|
||||
|
||||
@staticmethod
|
||||
def _translate(event) -> bytes:
|
||||
key = event.key()
|
||||
mod = event.modifiers()
|
||||
k = Qt.Key
|
||||
if mod & Qt.KeyboardModifier.ControlModifier and k.Key_A.value <= key <= k.Key_Z.value:
|
||||
return bytes([key - k.Key_A.value + 1]) # Ctrl-A..Ctrl-Z
|
||||
special = {
|
||||
k.Key_Return.value: b"\r", k.Key_Enter.value: b"\r",
|
||||
k.Key_Backspace.value: b"\x7f", k.Key_Tab.value: b"\t",
|
||||
k.Key_Escape.value: b"\x1b",
|
||||
k.Key_Up.value: b"\x1b[A", k.Key_Down.value: b"\x1b[B",
|
||||
k.Key_Right.value: b"\x1b[C", k.Key_Left.value: b"\x1b[D",
|
||||
k.Key_Home.value: b"\x1b[H", k.Key_End.value: b"\x1b[F",
|
||||
k.Key_Delete.value: b"\x1b[3~", k.Key_PageUp.value: b"\x1b[5~", k.Key_PageDown.value: b"\x1b[6~",
|
||||
}
|
||||
if key in special:
|
||||
return special[key]
|
||||
text = event.text()
|
||||
return text.encode("utf-8") if text else b""
|
||||
@@ -14,6 +14,7 @@ CARD_BORDER = "#2a2f39"
|
||||
TRACK = "#2a2f39"
|
||||
TEXT = "#e6e8eb"
|
||||
MUTED = "#8b929c"
|
||||
INPUT_BG = "#0d0f13" # form-control background (must stay dark — see contrast rule)
|
||||
|
||||
ACCENT = "#38bdf8"
|
||||
COLD = "#7dd3fc" # icey-blue
|
||||
@@ -138,4 +139,15 @@ QCheckBox::indicator:checked {{
|
||||
QDialog {{ background: {BG}; }}
|
||||
QMessageBox {{ background: {CARD}; }}
|
||||
QDialog QLabel, QMessageBox QLabel {{ color: {TEXT}; background: transparent; }}
|
||||
|
||||
/* Form controls: keep dark bg + light text (Fusion defaults to light-on-light here). */
|
||||
QLineEdit, QPlainTextEdit, QAbstractSpinBox, QComboBox {{
|
||||
background: {INPUT_BG}; color: {TEXT};
|
||||
border: 1px solid {CARD_BORDER}; border-radius: 6px; padding: 5px 8px;
|
||||
selection-background-color: {ACCENT}; selection-color: #06222e;
|
||||
}}
|
||||
QLineEdit:focus, QPlainTextEdit:focus, QAbstractSpinBox:focus, QComboBox:focus {{
|
||||
border: 1px solid {ACCENT};
|
||||
}}
|
||||
QLineEdit:disabled, QPlainTextEdit:disabled, QAbstractSpinBox:disabled {{ color: {MUTED}; }}
|
||||
"""
|
||||
|
||||
@@ -19,6 +19,16 @@ class ConfigTests(unittest.TestCase):
|
||||
self.assertEqual(loaded["gpu_temp_alert"], 88.0)
|
||||
self.assertEqual(loaded["update_check_minutes"], 5)
|
||||
|
||||
def test_list_value_round_trip(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
cf = Path(d) / "config.toml"
|
||||
with mock.patch.object(config, "CONFIG_FILE", cf), mock.patch.object(config, "CONFIG_DIR", Path(d)):
|
||||
paths = ["/home/u/.local/share/Steam", "/mnt/games/SteamLibrary"]
|
||||
config.update_config(steam_libraries=paths)
|
||||
self.assertEqual(config.load_config()["steam_libraries"], paths)
|
||||
config.update_config(steam_libraries=[])
|
||||
self.assertEqual(config.load_config()["steam_libraries"], [])
|
||||
|
||||
def test_update_config_merges_and_keeps_defaults(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
cf = Path(d) / "config.toml"
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Tests for the host PTY session (M12 Tier 3)."""
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from rigdoctor.core.pty_session import PtySession
|
||||
|
||||
|
||||
class PtySessionTests(unittest.TestCase):
|
||||
def test_runs_command_and_reads_output(self):
|
||||
pty = PtySession(rows=24, cols=80)
|
||||
try:
|
||||
time.sleep(0.4)
|
||||
pty.read() # drain the shell prompt
|
||||
pty.write(b"echo PTY_MARKER_42\n")
|
||||
deadline = time.time() + 3
|
||||
buf = ""
|
||||
while time.time() < deadline and "PTY_MARKER_42" not in buf:
|
||||
time.sleep(0.1)
|
||||
buf += pty.read().decode(errors="replace")
|
||||
self.assertIn("PTY_MARKER_42", buf)
|
||||
finally:
|
||||
pty.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,147 @@
|
||||
"""Tests for M6 Steam library & game detection (VDF parse, scan, tool filter, cache diff)."""
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from rigdoctor.core import steam
|
||||
|
||||
_GAME_ACF = """"AppState"
|
||||
{{
|
||||
\t"appid"\t\t"{appid}"
|
||||
\t"name"\t\t"{name}"
|
||||
\t"installdir"\t\t"{installdir}"
|
||||
\t"SizeOnDisk"\t\t"{size}"
|
||||
\t"LastUpdated"\t\t"{updated}"
|
||||
}}
|
||||
"""
|
||||
|
||||
_LIBRARYFOLDERS = """"libraryfolders"
|
||||
{{
|
||||
\t"0"
|
||||
\t{{
|
||||
\t\t"path"\t\t"{path}"
|
||||
\t\t"label"\t\t"Main"
|
||||
\t\t"apps"
|
||||
\t\t{{
|
||||
\t\t\t"570"\t\t"123"
|
||||
\t\t}}
|
||||
\t}}
|
||||
}}
|
||||
"""
|
||||
|
||||
|
||||
def _make_library(root: Path, games) -> Path:
|
||||
"""games: list of (appid, name, installdir, size, updated). Returns the library path."""
|
||||
steamapps = root / "steamapps"
|
||||
steamapps.mkdir(parents=True, exist_ok=True)
|
||||
for appid, name, installdir, size, updated in games:
|
||||
(steamapps / f"appmanifest_{appid}.acf").write_text(
|
||||
_GAME_ACF.format(appid=appid, name=name, installdir=installdir, size=size, updated=updated)
|
||||
)
|
||||
return root
|
||||
|
||||
|
||||
class VdfTests(unittest.TestCase):
|
||||
def test_parse_nested_and_pairs(self):
|
||||
data = steam._parse_vdf(_GAME_ACF.format(
|
||||
appid="570", name="Dota 2", installdir="dota 2 beta", size="15", updated="1700"))
|
||||
state = data["AppState"]
|
||||
self.assertEqual(state["appid"], "570")
|
||||
self.assertEqual(state["name"], "Dota 2")
|
||||
self.assertEqual(state["installdir"], "dota 2 beta")
|
||||
|
||||
def test_parse_handles_quotes_in_names(self):
|
||||
acf = _GAME_ACF.format(appid="1", name="Baldur\\'s Gate 3", installdir="bg3", size="1", updated="1")
|
||||
data = steam._parse_vdf(acf)
|
||||
self.assertIn("Baldur", data["AppState"]["name"])
|
||||
|
||||
def test_parse_garbage_returns_empty(self):
|
||||
self.assertEqual(steam._parse_vdf("not vdf at all"), {})
|
||||
|
||||
|
||||
class ToolFilterTests(unittest.TestCase):
|
||||
def test_known_tool_appid(self):
|
||||
self.assertTrue(steam.is_tool("228980", "Steamworks Common Redistributables"))
|
||||
|
||||
def test_proton_name_prefix(self):
|
||||
self.assertTrue(steam.is_tool("9999999", "Proton 8.0"))
|
||||
self.assertTrue(steam.is_tool("9999998", "Steam Linux Runtime 3.0 (sniper)"))
|
||||
|
||||
def test_real_game_is_not_a_tool(self):
|
||||
self.assertFalse(steam.is_tool("570", "Dota 2"))
|
||||
|
||||
|
||||
class ScanTests(unittest.TestCase):
|
||||
def test_scan_library_filters_tools(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
lib = _make_library(Path(d), [
|
||||
("570", "Dota 2", "dota 2 beta", "15000000000", "1700000000"),
|
||||
("228980", "Steamworks Common Redistributables", "Steamworks Shared", "0", "0"),
|
||||
("1493710", "Proton Experimental", "Proton - Experimental", "0", "0"),
|
||||
])
|
||||
games = steam.scan_library(str(lib))
|
||||
names = {g.name for g in games}
|
||||
self.assertEqual(names, {"Dota 2"})
|
||||
self.assertEqual(games[0].size_bytes, 15000000000)
|
||||
|
||||
def test_scan_games_dedupes_and_sorts(self):
|
||||
with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2:
|
||||
a = _make_library(Path(d1), [("10", "Zeta", "zeta", "1", "1"), ("20", "Alpha", "alpha", "1", "1")])
|
||||
b = _make_library(Path(d2), [("20", "Alpha", "alpha", "1", "1")]) # dup appid 20
|
||||
games = steam.scan_games([str(a), str(b)])
|
||||
self.assertEqual([g.name for g in games], ["Alpha", "Zeta"]) # sorted, deduped
|
||||
|
||||
|
||||
class DiscoverTests(unittest.TestCase):
|
||||
def test_discover_reads_libraryfolders(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
root = Path(d) / "Steam"
|
||||
(root / "steamapps").mkdir(parents=True)
|
||||
extra = Path(d) / "Extra"
|
||||
(extra / "steamapps").mkdir(parents=True)
|
||||
(root / "steamapps" / "libraryfolders.vdf").write_text(
|
||||
_LIBRARYFOLDERS.format(path=str(extra)))
|
||||
with mock.patch.object(steam, "steam_roots", return_value=[root]):
|
||||
libs = steam.discover_libraries()
|
||||
paths = {lib.path for lib in libs}
|
||||
self.assertIn(str(root.resolve()), paths) # root itself
|
||||
self.assertIn(str(extra.resolve()), paths) # the configured extra library
|
||||
|
||||
|
||||
class CacheDiffTests(unittest.TestCase):
|
||||
def _rescan(self, lib, games_file, cfg):
|
||||
with mock.patch.object(steam, "GAMES_FILE", games_file):
|
||||
return steam.rescan(cfg=cfg)
|
||||
|
||||
def test_first_scan_has_no_new_then_added_game_is_new(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
lib = _make_library(Path(d) / "lib", [("10", "Alpha", "alpha", "1", "1")])
|
||||
games_file = Path(d) / "games.json"
|
||||
cfg = {"steam_libraries": [str(lib)]}
|
||||
|
||||
first = self._rescan(lib, games_file, cfg)
|
||||
self.assertEqual(first.new_appids, []) # first run flags nothing as new
|
||||
|
||||
# Install a second game; it should be flagged new on the next scan.
|
||||
_make_library(lib, [("10", "Alpha", "alpha", "1", "1"), ("20", "Beta", "beta", "1", "1")])
|
||||
second = self._rescan(lib, games_file, cfg)
|
||||
self.assertEqual(second.new_appids, ["20"])
|
||||
self.assertEqual({g.name for g in second.games}, {"Alpha", "Beta"})
|
||||
|
||||
def test_acknowledge_clears_new(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
lib = _make_library(Path(d) / "lib", [("10", "Alpha", "alpha", "1", "1")])
|
||||
games_file = Path(d) / "games.json"
|
||||
cfg = {"steam_libraries": [str(lib)]}
|
||||
self._rescan(lib, games_file, cfg)
|
||||
_make_library(lib, [("10", "Alpha", "alpha", "1", "1"), ("20", "Beta", "beta", "1", "1")])
|
||||
self._rescan(lib, games_file, cfg)
|
||||
with mock.patch.object(steam, "GAMES_FILE", games_file):
|
||||
steam.acknowledge_new()
|
||||
self.assertEqual(steam.load_cache()["new_appids"], [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user