Initial commit: docs, decisions, and M1 sensor core

Planning docs (SPEC, ARCHITECTURE, MODULES, ROADMAP, DECISIONS) with
decisions D1-D15 settled: RigDoctor name, Python 3 + Qt/PySide6 stack
(core/CLI/daemon stdlib-only), Ubuntu + NVIDIA first, .deb packaging,
read-only + suggestions, GUI + tray modules, stress module dropped.

First code: the M1 sensor core (stdlib-only) and a CLI.
- core engine: Reading/Sample model, Sampler, hwmon reader
- self-probing sources (NVIDIA first): nvidia-smi GPU, coretemp/k10temp
  CPU, /proc/meminfo + DDR5 SPD memory, NVMe storage
- CLI: snapshot (text/JSON), monitor, sources; record/report stubbed
- stdlib unittest smoke tests

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-21 16:40:21 +02:00
commit 305b6c4497
25 changed files with 1286 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.1.0"
+7
View File
@@ -0,0 +1,7 @@
"""Allow `python -m rigdoctor`."""
import sys
from .cli import main
sys.exit(main())
+93
View File
@@ -0,0 +1,93 @@
"""RigDoctor command-line interface."""
from __future__ import annotations
import argparse
import json
import sys
from . import __version__
from .config import load_config
from .core.sampler import Sampler
from .core.sources import available_sources
from .render import render_snapshot
def _sampler() -> Sampler:
return Sampler(available_sources())
def cmd_sources(args) -> int:
srcs = available_sources()
if not srcs:
print("No sensor sources detected.")
return 1
print("Detected sources:")
for s in srcs:
print(f" - {s.name} ({type(s).__name__})")
return 0
def cmd_snapshot(args) -> int:
sample = _sampler().sample()
if args.json:
payload = {"ts": sample.ts, "readings": sample.to_rows()}
print(json.dumps(payload, indent=2, ensure_ascii=False))
else:
print(render_snapshot(sample))
return 0
def cmd_monitor(args) -> int:
interval = args.interval or load_config()["interval"]
try:
for sample in _sampler().stream(interval=interval):
# Basic full-screen redraw; the rich TUI (M2) comes later.
print("\033[2J\033[H", end="")
print(f"RigDoctor — live (every {interval:g}s, Ctrl-C to quit)\n")
print(render_snapshot(sample))
sys.stdout.flush()
except KeyboardInterrupt:
print()
return 0
def cmd_record(args) -> int:
print("`record` (M3 crash-capture logger) is not implemented yet — next on the roadmap.")
return 2
def cmd_report(args) -> int:
print("`report` (M4 health report) is not implemented yet — next on the roadmap.")
return 2
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="rigdoctor",
description="Hardware monitoring & crash diagnostics for Linux gamers.",
)
p.add_argument("-V", "--version", action="version", version=f"rigdoctor {__version__}")
sub = p.add_subparsers(dest="command", required=True)
sp = sub.add_parser("snapshot", help="print a one-shot reading of all sensors")
sp.add_argument("--json", action="store_true", help="output JSON instead of text")
sp.set_defaults(func=cmd_snapshot)
mp = sub.add_parser("monitor", help="live-refreshing sensor view")
mp.add_argument("-n", "--interval", type=float, default=None, help="refresh interval (s)")
mp.set_defaults(func=cmd_monitor)
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
sub.add_parser("record", help="crash-capture logger (coming soon)").set_defaults(func=cmd_record)
sub.add_parser("report", help="health report (coming soon)").set_defaults(func=cmd_report)
return p
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
return args.func(args)
if __name__ == "__main__":
sys.exit(main())
+38
View File
@@ -0,0 +1,38 @@
"""Paths and configuration defaults (XDG layout, see ARCHITECTURE §10)."""
from __future__ import annotations
import os
from pathlib import Path
APP = "rigdoctor"
def _xdg(env: str, default: str) -> Path:
base = os.environ.get(env) or str(Path.home() / default)
return Path(base) / APP
CONFIG_DIR = _xdg("XDG_CONFIG_HOME", ".config")
DATA_DIR = _xdg("XDG_DATA_HOME", ".local/share")
STATE_DIR = _xdg("XDG_STATE_HOME", ".local/state")
LOG_DIR = DATA_DIR / "logs"
CONFIG_FILE = CONFIG_DIR / "config.toml"
DEFAULTS: dict = {
"interval": 1.0, # sampling interval in seconds (default ≤1 Hz, low overhead — NFR)
}
def load_config() -> dict:
"""Return defaults merged with config.toml if present (best-effort)."""
cfg = dict(DEFAULTS)
try:
import tomllib
if CONFIG_FILE.exists():
with CONFIG_FILE.open("rb") as f:
cfg.update(tomllib.load(f))
except Exception:
pass
return cfg
+1
View File
@@ -0,0 +1 @@
"""Core engine: sources → sampler → samples (stdlib-only)."""
+41
View File
@@ -0,0 +1,41 @@
"""Minimal /sys/class/hwmon reader (stdlib only)."""
from __future__ import annotations
from pathlib import Path
HWMON_ROOT = Path("/sys/class/hwmon")
def _read(path: Path) -> str | None:
try:
return path.read_text().strip()
except OSError:
return None
def find_by_name(name: str) -> list[Path]:
"""Return hwmon dirs whose `name` file equals `name` (e.g. 'coretemp')."""
matches: list[Path] = []
if not HWMON_ROOT.exists():
return matches
for d in sorted(HWMON_ROOT.glob("hwmon*")):
if _read(d / "name") == name:
matches.append(d)
return matches
def read_temps(hwmon_dir: Path) -> list[tuple[str, float]]:
"""Return (label, °C) for each tempN_input in a hwmon dir."""
out: list[tuple[str, float]] = []
for inp in sorted(hwmon_dir.glob("temp*_input")):
raw = _read(inp)
if raw is None:
continue
try:
celsius = int(raw) / 1000.0
except ValueError:
continue
label = _read(inp.with_name(inp.name.replace("_input", "_label")))
out.append((label or inp.name.replace("_input", ""), celsius))
return out
+45
View File
@@ -0,0 +1,45 @@
"""Core data model: a Reading and a Sample (one tick across all sources)."""
from __future__ import annotations
import time
from dataclasses import asdict, dataclass, field
@dataclass(frozen=True)
class Reading:
"""A single normalized sensor value.
`value` is None when the metric is unavailable/N-A, so consumers can render
"N/A" rather than crash (graceful degradation — NFR).
"""
source: str # subsystem id: "gpu", "cpu", "memory", "storage"
metric: str # what it measures: "temp", "power", "util", ...
value: float | None # None == unavailable
unit: str = "" # "°C", "W", "%", "MHz", "GB", "MiB", ...
label: str = "" # optional detail: core name, DIMM, device, "junction"
@property
def key(self) -> str:
suffix = f".{self.label}" if self.label else ""
return f"{self.source}.{self.metric}{suffix}"
@dataclass
class Sample:
"""All readings captured in one sampling tick."""
ts: float = field(default_factory=time.time)
readings: list[Reading] = field(default_factory=list)
def by_source(self) -> dict[str, list[Reading]]:
"""Group readings by subsystem, preserving insertion order."""
groups: dict[str, list[Reading]] = {}
for r in self.readings:
groups.setdefault(r.source, []).append(r)
return groups
def to_rows(self) -> list[dict]:
"""Flatten to plain dicts for CSV/JSON logging."""
return [{"ts": self.ts, **asdict(r)} for r in self.readings]
+37
View File
@@ -0,0 +1,37 @@
"""Sampling loop: poll all sources into Samples."""
from __future__ import annotations
import time
from collections.abc import Iterator
from .sample import Sample
from .sources.base import Source
class Sampler:
"""Polls a set of sources, producing one Sample per tick."""
def __init__(self, sources: list[Source]):
self.sources = sources
def sample(self) -> Sample:
s = Sample()
for src in self.sources:
try:
s.readings.extend(src.read())
except Exception:
# A single misbehaving source must not abort the whole tick.
continue
return s
def stream(self, interval: float = 1.0, count: int | None = None) -> Iterator[Sample]:
"""Yield Samples every `interval` seconds (forever, or `count` times)."""
n = 0
while count is None or n < count:
start = time.monotonic()
yield self.sample()
n += 1
if count is not None and n >= count:
break
time.sleep(max(0.0, interval - (time.monotonic() - start)))
+31
View File
@@ -0,0 +1,31 @@
"""Source discovery. GPU/NVIDIA first (D4), then CPU, memory, storage."""
from __future__ import annotations
from .base import Source
from .cpu import CpuSource
from .memory import MemorySource
from .nvidia import NvidiaSource
from .storage import StorageSource
# Display order: GPU first (the seed use case), then CPU, memory, storage.
ALL_SOURCE_TYPES: list[type[Source]] = [
NvidiaSource,
CpuSource,
MemorySource,
StorageSource,
]
def available_sources() -> list[Source]:
"""Instantiate and return the sources that probe successfully here."""
found: list[Source] = []
for cls in ALL_SOURCE_TYPES:
src = cls()
try:
if src.probe():
found.append(src)
except Exception:
# A misbehaving probe must not hide the other sources.
continue
return found
+24
View File
@@ -0,0 +1,24 @@
"""Source interface."""
from __future__ import annotations
from ..sample import Reading
class Source:
"""A pluggable sensor source for one subsystem.
Subclasses set `name` and implement `probe()` and `read()`. Sources must never
raise on a missing sensor/tool — return an empty list, or Readings with
value=None, so the rest of the system degrades gracefully (NFR).
"""
name: str = "unknown"
def probe(self) -> bool:
"""Return True if this source can produce readings on this machine."""
raise NotImplementedError
def read(self) -> list[Reading]:
"""Return current readings (entries may have value=None)."""
raise NotImplementedError
+32
View File
@@ -0,0 +1,32 @@
"""CPU temperatures (coretemp/k10temp hwmon) + load average."""
from __future__ import annotations
import os
from ..hwmon import find_by_name, read_temps
from ..sample import Reading
from .base import Source
class CpuSource(Source):
name = "cpu"
def _hwmons(self):
# Intel exposes 'coretemp'; AMD exposes 'k10temp'.
return find_by_name("coretemp") or find_by_name("k10temp")
def probe(self) -> bool:
return bool(self._hwmons())
def read(self) -> list[Reading]:
readings: list[Reading] = []
for d in self._hwmons():
for label, celsius in read_temps(d):
readings.append(Reading("cpu", "temp", round(celsius, 1), "°C", label))
try:
load1 = os.getloadavg()[0]
readings.append(Reading("cpu", "load", round(load1, 2), "", "loadavg-1m"))
except (OSError, AttributeError):
pass
return readings
+48
View File
@@ -0,0 +1,48 @@
"""System memory usage (/proc/meminfo) + DDR5 SPD temps (spd5118 hwmon)."""
from __future__ import annotations
from pathlib import Path
from ..hwmon import find_by_name, read_temps
from ..sample import Reading
from .base import Source
MEMINFO = Path("/proc/meminfo")
KB_PER_GB = 1024 * 1024
def _meminfo() -> dict[str, int]:
data: dict[str, int] = {}
try:
for line in MEMINFO.read_text().splitlines():
key, _, rest = line.partition(":")
data[key.strip()] = int(rest.strip().split()[0]) # kB
except (OSError, ValueError, IndexError):
pass
return data
class MemorySource(Source):
name = "memory"
def probe(self) -> bool:
return MEMINFO.exists()
def read(self) -> list[Reading]:
readings: list[Reading] = []
info = _meminfo()
total = info.get("MemTotal")
avail = info.get("MemAvailable")
if total is not None:
readings.append(Reading("memory", "total", round(total / KB_PER_GB, 1), "GB"))
if avail is not None:
used = total - avail
readings.append(Reading("memory", "used", round(used / KB_PER_GB, 1), "GB"))
readings.append(Reading("memory", "available", round(avail / KB_PER_GB, 1), "GB"))
readings.append(Reading("memory", "used_pct", round(100 * used / total, 1), "%"))
# DDR5 module temperatures, if exposed by the SPD hub.
for i, d in enumerate(find_by_name("spd5118")):
for _, celsius in read_temps(d):
readings.append(Reading("memory", "temp", round(celsius, 1), "°C", f"DIMM{i}"))
return readings
+93
View File
@@ -0,0 +1,93 @@
"""NVIDIA GPU readings via nvidia-smi (NVML wrapper)."""
from __future__ import annotations
import shutil
import subprocess
from ..sample import Reading
from .base import Source
# Fields queried from nvidia-smi, in order.
_QUERY = [
"name",
"temperature.gpu",
"temperature.memory",
"utilization.gpu",
"utilization.memory",
"power.draw",
"power.limit",
"clocks.current.graphics",
"clocks.current.memory",
"fan.speed",
"memory.used",
"memory.total",
"pcie.link.gen.current",
"pcie.link.width.current",
]
def _f(token: str) -> float | None:
token = token.strip()
if not token or token.startswith("[") or token.lower() in ("n/a", "not supported"):
return None
try:
return float(token)
except ValueError:
return None
class NvidiaSource(Source):
name = "gpu"
def probe(self) -> bool:
if shutil.which("nvidia-smi") is None:
return False
try:
subprocess.run(["nvidia-smi", "-L"], capture_output=True, timeout=5, check=True)
return True
except (subprocess.SubprocessError, OSError):
return False
def read(self) -> list[Reading]:
try:
proc = subprocess.run(
[
"nvidia-smi",
f"--query-gpu={','.join(_QUERY)}",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=10,
check=True,
)
except subprocess.TimeoutExpired:
# A query timeout is itself a signal: the GPU may be hung/lost.
return [Reading("gpu", "status", None, "", "query-timeout")]
except (subprocess.SubprocessError, OSError):
return []
readings: list[Reading] = []
for line in proc.stdout.strip().splitlines():
cols = [c.strip() for c in line.split(",")]
if len(cols) != len(_QUERY):
continue
v = dict(zip(_QUERY, cols))
readings += [
Reading("gpu", "name", None, "", v["name"]),
Reading("gpu", "temp", _f(v["temperature.gpu"]), "°C"),
Reading("gpu", "temp", _f(v["temperature.memory"]), "°C", "memory"),
Reading("gpu", "util", _f(v["utilization.gpu"]), "%"),
Reading("gpu", "mem_util", _f(v["utilization.memory"]), "%"),
Reading("gpu", "power", _f(v["power.draw"]), "W"),
Reading("gpu", "power_limit", _f(v["power.limit"]), "W"),
Reading("gpu", "clock", _f(v["clocks.current.graphics"]), "MHz", "core"),
Reading("gpu", "clock", _f(v["clocks.current.memory"]), "MHz", "memory"),
Reading("gpu", "fan", _f(v["fan.speed"]), "%"),
Reading("gpu", "mem_used", _f(v["memory.used"]), "MiB"),
Reading("gpu", "mem_total", _f(v["memory.total"]), "MiB"),
Reading("gpu", "pcie_gen", _f(v["pcie.link.gen.current"]), "", "current"),
Reading("gpu", "pcie_width", _f(v["pcie.link.width.current"]), "x", "current"),
]
return readings
+34
View File
@@ -0,0 +1,34 @@
"""NVMe / SSD temperatures via hwmon."""
from __future__ import annotations
from pathlib import Path
from ..hwmon import find_by_name, read_temps
from ..sample import Reading
from .base import Source
def _device_name(hwmon_dir: Path) -> str:
# /sys/class/hwmon/hwmonX/device -> .../nvme/nvme0 (best-effort label)
try:
return (hwmon_dir / "device").resolve().name
except OSError:
return hwmon_dir.name
class StorageSource(Source):
name = "storage"
def probe(self) -> bool:
return bool(find_by_name("nvme"))
def read(self) -> list[Reading]:
readings: list[Reading] = []
for d in find_by_name("nvme"):
dev = _device_name(d)
for label, celsius in read_temps(d):
readings.append(
Reading("storage", "temp", round(celsius, 1), "°C", f"{dev}:{label}")
)
return readings
+38
View File
@@ -0,0 +1,38 @@
"""Human-readable rendering of a Sample for the terminal."""
from __future__ import annotations
from .core.sample import Reading, Sample
_GROUP_ORDER = ["gpu", "cpu", "memory", "storage"]
_GROUP_TITLES = {"gpu": "GPU", "cpu": "CPU", "memory": "Memory", "storage": "Storage"}
def _fmt_value(r: Reading) -> str:
if r.value is None:
return "N/A"
if r.unit == "°C":
return f"{r.value:.1f} °C"
if r.unit:
return f"{r.value:g} {r.unit}"
return f"{r.value:g}"
def _fmt(r: Reading) -> str:
if r.metric == "name": # GPU/device identity line
return f" {r.label}"
name = f"{r.metric} {r.label}".strip()
return f" {name:<22} {_fmt_value(r)}"
def render_snapshot(sample: Sample) -> str:
groups = sample.by_source()
ordered = [k for k in _GROUP_ORDER if k in groups]
ordered += [k for k in groups if k not in _GROUP_ORDER]
blocks: list[str] = []
for key in ordered:
title = _GROUP_TITLES.get(key, key.title())
lines = [title] + [_fmt(r) for r in groups[key]]
blocks.append("\n".join(lines))
return "\n\n".join(blocks)