Release 0.0.2: M3 logger (CLI + GUI), GUI-first, CI release workflow
release / release (push) Successful in 2m13s

Crash-capture logger (M3):
- crash-safe JSONL (fsync per sample), size-based rotation, GPU-lost/recovered
  markers, atomic status file
- CLI: record run/start/stop/status/report (run = systemd-ready entrypoint)
- shared core.reccontrol so CLI + GUI drive the same recorder
- crashlog tests (writer, rotation, reader, summary, recorder)

GUI:
- Recording/Logs page: start/stop/interval controls, live status, post-crash report
- shared render helpers (format_raw/headline, render_summary)

Docs/decisions:
- GUI-first (D17); CLI keeps full parity
- D8 revised: user-local self-updating install primary, .deb optional
- planned: M12 session sharing (D16), M13 no-root auto-update from public repo (D18)
- versioning + CHANGELOG convention (D19)

Infra:
- .gitea/workflows/release.yml: build wheel+sdist and publish a Gitea release
  v<version> on push to main
- align version to the 0.0.x release line; bump to 0.0.2

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-21 17:16:41 +02:00
parent 2ccf7ca50c
commit ce5f830393
20 changed files with 1157 additions and 60 deletions
+1 -1
View File
@@ -1,3 +1,3 @@
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
__version__ = "0.1.0"
__version__ = "0.0.2"
+122 -6
View File
@@ -4,13 +4,18 @@ from __future__ import annotations
import argparse
import json
import os
import signal
import sys
import time
from pathlib import Path
from . import __version__
from . import __version__, config
from .config import load_config
from .core import reccontrol
from .core.sampler import Sampler
from .core.sources import available_sources
from .render import render_snapshot
from .render import format_headline, render_snapshot, render_summary
def _sampler() -> Sampler:
@@ -64,9 +69,99 @@ def cmd_gui(args) -> int:
return gui_main([sys.argv[0]])
def cmd_record(args) -> int:
print("`record` (M3 crash-capture logger) is not implemented yet — next on the roadmap.")
return 2
# --- M3 crash-capture logger ---------------------------------------------------
def cmd_record_run(args) -> int:
cfg = load_config()
interval = args.interval or cfg["interval"]
log_path = Path(args.out) if args.out else config.LOG_FILE
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
config.PID_FILE.write_text(str(os.getpid()))
from .core.recorder import Recorder
recorder = Recorder(
interval=interval,
log_path=log_path,
max_bytes=cfg["log_max_bytes"],
backups=cfg["log_backups"],
status_path=config.STATUS_FILE,
)
def _handle(_sig, _frame):
recorder.stop()
signal.signal(signal.SIGTERM, _handle)
signal.signal(signal.SIGINT, _handle)
print(f"Recording to {log_path} every {interval:g}s — stop with Ctrl-C or `rigdoctor record stop`.")
try:
recorder.run()
finally:
try:
config.PID_FILE.unlink()
except OSError:
pass
print(f"Stopped after {recorder.samples} samples.")
return 0
def cmd_record_start(args) -> int:
if reccontrol.running_pid():
print(f"Recorder already running (pid {reccontrol.running_pid()}).")
return 0
pid = reccontrol.start_background(args.interval, args.out)
time.sleep(1.0) # let it come up
if pid and reccontrol.pid_alive(pid):
print(f"Recording started in the background (pid {pid}).")
print(f" log: {args.out or config.LOG_FILE}")
print(" status: rigdoctor record status · stop: rigdoctor record stop")
return 0
print(f"Recorder failed to start; see {config.SPAWN_LOG}")
return 1
def cmd_record_stop(args) -> int:
pid = reccontrol.running_pid()
if not pid:
print("Recorder is not running.")
return 0
if not reccontrol.stop_background():
print(f"Could not stop recorder (pid {pid}).")
return 1
for _ in range(50):
if not reccontrol.pid_alive(pid):
break
time.sleep(0.1)
print(f"Recorder stopped (pid {pid}).")
return 0
def cmd_record_status(args) -> int:
pid = reccontrol.running_pid()
status = reccontrol.read_status()
print(f"● recording (pid {pid})" if pid else "○ not recording")
if status:
print(f" log: {status.get('log')}")
print(f" samples: {status.get('samples')}")
if status.get("started"):
print(f" started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(status['started']))}")
if status.get("updated"):
print(f" updated: {time.strftime('%H:%M:%S', time.localtime(status['updated']))}")
if status.get("gpu_lost"):
print(" ⚠ a GPU-lost event was recorded this session")
if status.get("latest"):
print(f" latest: {format_headline(status['latest'])}")
return 0
def cmd_record_report(args) -> int:
from .core.crashlog import summarize
log_path = Path(args.log) if args.log else config.LOG_FILE
summary = summarize(log_path, last_n=args.last)
print(render_summary(summary, log_path=log_path))
return 0
def cmd_report(args) -> int:
@@ -92,7 +187,28 @@ def build_parser() -> argparse.ArgumentParser:
sub.add_parser("gui", help="launch the desktop GUI (needs PySide6)").set_defaults(func=cmd_gui)
sub.add_parser("sources", help="list detected sensor sources").set_defaults(func=cmd_sources)
sub.add_parser("record", help="crash-capture logger (coming soon)").set_defaults(func=cmd_record)
rec = sub.add_parser("record", help="crash-capture logger (M3)")
rec_sub = rec.add_subparsers(dest="record_cmd", required=True)
run_p = rec_sub.add_parser("run", help="run the capture loop in the foreground (systemd-friendly)")
run_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
run_p.add_argument("-o", "--out", default=None, help="log file path")
run_p.set_defaults(func=cmd_record_run)
start_p = rec_sub.add_parser("start", help="start recording in the background")
start_p.add_argument("-n", "--interval", type=float, default=None, help="sampling interval (s)")
start_p.add_argument("-o", "--out", default=None, help="log file path")
start_p.set_defaults(func=cmd_record_start)
rec_sub.add_parser("stop", help="stop background recording").set_defaults(func=cmd_record_stop)
rec_sub.add_parser("status", help="show recorder status").set_defaults(func=cmd_record_status)
report_p = rec_sub.add_parser("report", help="summarize the captured log (post-crash)")
report_p.add_argument("--last", type=int, default=10, help="recent samples to show")
report_p.add_argument("--log", default=None, help="path to a capture log")
report_p.set_defaults(func=cmd_record_report)
sub.add_parser("report", help="health report (coming soon)").set_defaults(func=cmd_report)
return p
+9 -1
View File
@@ -19,8 +19,16 @@ STATE_DIR = _xdg("XDG_STATE_HOME", ".local/state")
LOG_DIR = DATA_DIR / "logs"
CONFIG_FILE = CONFIG_DIR / "config.toml"
# Crash-capture logger (M3)
LOG_FILE = LOG_DIR / "capture.jsonl"
STATUS_FILE = STATE_DIR / "recorder.json"
PID_FILE = STATE_DIR / "recorder.pid"
SPAWN_LOG = STATE_DIR / "recorder.out"
DEFAULTS: dict = {
"interval": 1.0, # sampling interval in seconds (default ≤1 Hz, low overhead — NFR)
"interval": 1.0, # sampling interval in seconds (default ≤1 Hz — NFR)
"log_max_bytes": 20_000_000, # rotate a log segment past this size
"log_backups": 10, # keep this many rotated segments (bounds disk use)
}
+177
View File
@@ -0,0 +1,177 @@
"""Crash-capture log (M3): rotating, fsync-per-sample JSONL writer + reader + summary.
On-disk format is JSON Lines, one record per line:
sample : {"ts": <float>, "readings": [[source, metric, value, unit, label], ...]}
event : {"ts": <float>, "event": <str>, "detail": <str>}
Every line is flushed and fsync'd, so the readings right before a hard lock survive.
A torn final line (interrupted mid-write by a crash) is tolerated on read.
"""
from __future__ import annotations
import json
import os
import time
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from .sample import Reading, Sample
class CrashLogWriter:
"""Append samples/events as JSONL, fsync per line, rotate by size."""
def __init__(self, path, max_bytes: int = 20_000_000, backups: int = 10) -> None:
self.path = Path(path)
self.max_bytes = int(max_bytes)
self.backups = int(backups)
self.path.parent.mkdir(parents=True, exist_ok=True)
self._fh = open(self.path, "a", encoding="utf-8")
def _write(self, obj: dict) -> None:
self._fh.write(json.dumps(obj, separators=(",", ":"), ensure_ascii=False))
self._fh.write("\n")
self._fh.flush()
os.fsync(self._fh.fileno()) # survive a hard lock
if self.max_bytes and self._fh.tell() >= self.max_bytes:
self._rotate()
def write_sample(self, sample: Sample) -> None:
rows = [[r.source, r.metric, r.value, r.unit, r.label] for r in sample.readings]
self._write({"ts": round(sample.ts, 3), "readings": rows})
def write_event(self, kind: str, detail: str = "") -> None:
self._write({"ts": round(time.time(), 3), "event": kind, "detail": detail})
def _rotate(self) -> None:
# Mirror logging.handlers.RotatingFileHandler: shift base.i -> base.i+1.
self._fh.close()
base = str(self.path)
for i in range(self.backups - 1, 0, -1):
src = Path(f"{base}.{i}")
dst = Path(f"{base}.{i + 1}")
if src.exists():
if dst.exists():
dst.unlink()
src.rename(dst)
if self.backups > 0:
first = Path(f"{base}.1")
if first.exists():
first.unlink()
self.path.rename(first)
self._fh = open(self.path, "a", encoding="utf-8")
def close(self) -> None:
try:
self._fh.close()
except Exception:
pass
def _segment_files(path) -> list[Path]:
"""All log segments oldest→newest: base.N … base.1, base."""
base = Path(path)
numbered: list[tuple[int, Path]] = []
for p in base.parent.glob(base.name + ".*"):
suffix = p.name[len(base.name) + 1:]
if suffix.isdigit():
numbered.append((int(suffix), p))
numbered.sort(reverse=True) # highest number = oldest
files = [p for _, p in numbered]
if base.exists():
files.append(base)
return files
def iter_records(path, include_backups: bool = True):
"""Yield parsed records oldest→newest, tolerating a torn final line."""
files = _segment_files(path) if include_backups else [Path(path)]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except ValueError:
continue
except OSError:
continue
def record_to_sample(rec: dict) -> Sample:
readings = [Reading(s, m, v, u, label) for s, m, v, u, label in rec.get("readings", [])]
return Sample(ts=rec.get("ts", 0.0), readings=readings)
def headline(sample: Sample) -> dict:
"""Extract the few at-a-glance values used by status/report displays."""
def find(source: str, metric: str, label: str | None = None):
for r in sample.readings:
if r.source == source and r.metric == metric and (label is None or r.label == label):
return r.value
return None
cpu_pkg = None
cpu_temps = []
for r in sample.readings:
if r.source == "cpu" and r.metric == "temp" and r.value is not None:
cpu_temps.append(r.value)
low = r.label.lower()
if cpu_pkg is None and (low.startswith("package") or "tctl" in low or "tdie" in low):
cpu_pkg = r.value
if cpu_pkg is None and cpu_temps:
cpu_pkg = max(cpu_temps)
return {
"gpu_temp": find("gpu", "temp", ""),
"gpu_util": find("gpu", "util"),
"gpu_power": find("gpu", "power"),
"cpu_temp": cpu_pkg,
"mem_pct": find("memory", "used_pct"),
}
@dataclass
class Summary:
start: float | None
end: float | None
samples: int
maxima: dict # reading.key -> (value, unit, ts)
events: list # [(ts, kind, detail), ...]
last: list # [Sample, ...] most recent
def summarize(path, last_n: int = 10) -> Summary:
start = end = None
count = 0
maxima: dict = {}
events: list = []
recent: deque = deque(maxlen=last_n)
for rec in iter_records(path):
ts = rec.get("ts")
if "event" in rec:
events.append((ts, rec.get("event", ""), rec.get("detail", "")))
continue
if "readings" not in rec:
continue
count += 1
if start is None:
start = ts
end = ts
sample = record_to_sample(rec)
recent.append(sample)
for r in sample.readings:
if r.value is None:
continue
current = maxima.get(r.key)
if current is None or r.value > current[0]:
maxima[r.key] = (r.value, r.unit, ts)
return Summary(start, end, count, maxima, events, list(recent))
+71
View File
@@ -0,0 +1,71 @@
"""Background-process control for the crash-capture recorder (shared by CLI + GUI).
Both front-ends start/stop/inspect the same `systemd`-style detached recorder via the
PID and status files, so behaviour is identical however you drive it.
"""
from __future__ import annotations
import json
import os
import signal
import subprocess
import sys
from .. import config
def pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
except OSError:
return False
return True
def running_pid() -> int | None:
try:
pid = int(config.PID_FILE.read_text().strip())
except (OSError, ValueError):
return None
return pid if pid_alive(pid) else None
def read_status() -> dict | None:
try:
return json.loads(config.STATUS_FILE.read_text())
except (OSError, ValueError):
return None
def start_background(interval: float | None = None, out: str | None = None) -> int | None:
"""Spawn a detached `record run`. Returns the child pid, or None if already running."""
if running_pid():
return None
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
cmd = [sys.executable, "-m", "rigdoctor", "record", "run"]
if interval:
cmd += ["--interval", str(interval)]
if out:
cmd += ["--out", out]
out_fh = open(config.SPAWN_LOG, "a")
proc = subprocess.Popen(
cmd,
stdout=out_fh,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
return proc.pid
def stop_background() -> bool:
"""Signal the running recorder to stop. Returns False if it wasn't running."""
pid = running_pid()
if not pid:
return False
try:
os.kill(pid, signal.SIGTERM)
except OSError:
return False
return True
+93
View File
@@ -0,0 +1,93 @@
"""Crash-capture recorder (M3): the sampling loop that writes a crash-safe log.
Runs in the foreground (so it works as a `systemd --user` ExecStart and under
manual `record run`). Stop it by calling stop() — typically from a SIGTERM/SIGINT
handler installed by the CLI.
"""
from __future__ import annotations
import json
import os
import threading
import time
from pathlib import Path
from .crashlog import CrashLogWriter, headline
from .sampler import Sampler
from .sources import available_sources
class Recorder:
def __init__(
self,
interval: float,
log_path,
max_bytes: int = 20_000_000,
backups: int = 10,
status_path=None,
sampler: Sampler | None = None,
) -> None:
self.interval = interval
self.sampler = sampler or Sampler(available_sources())
self.writer = CrashLogWriter(log_path, max_bytes, backups)
self.log_path = Path(log_path)
self.status_path = Path(status_path) if status_path else None
self.samples = 0
self._stop = threading.Event()
self._gpu_lost = False
self._started = time.time()
def stop(self) -> None:
self._stop.set()
def run(self) -> None:
self.writer.write_event("session-start", f"interval={self.interval:g}s")
self._write_status(running=True)
try:
while not self._stop.is_set():
t0 = time.monotonic()
sample = self.sampler.sample()
self.writer.write_sample(sample)
self.samples += 1
self._detect_gpu_lost(sample)
self._write_status(running=True, sample=sample)
self._stop.wait(max(0.0, self.interval - (time.monotonic() - t0)))
finally:
self.writer.write_event("session-stop", f"samples={self.samples}")
self.writer.close()
self._write_status(running=False)
def _detect_gpu_lost(self, sample) -> None:
lost = any(
r.source == "gpu" and r.metric == "status" and r.label == "query-timeout"
for r in sample.readings
)
if lost and not self._gpu_lost:
self._gpu_lost = True
self.writer.write_event("gpu-lost", "nvidia-smi query timed out — GPU may be hung/lost")
elif not lost and self._gpu_lost:
self._gpu_lost = False
self.writer.write_event("gpu-recovered", "GPU responding again")
def _write_status(self, running: bool, sample=None) -> None:
if self.status_path is None:
return
data = {
"running": running,
"pid": os.getpid(),
"log": str(self.log_path),
"started": self._started,
"samples": self.samples,
"updated": time.time(),
"gpu_lost": self._gpu_lost,
}
if sample is not None:
data["latest"] = headline(sample)
try:
self.status_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.status_path.with_suffix(self.status_path.suffix + ".tmp")
tmp.write_text(json.dumps(data))
tmp.replace(self.status_path) # atomic
except OSError:
pass
+6 -4
View File
@@ -16,12 +16,12 @@ from PySide6.QtWidgets import (
)
from .dashboard import Dashboard
from .recorder_page import RecorderPage
from .theme import ACCENT, MUTED
from .worker import SamplerWorker
_NAV_ITEMS = ["Dashboard", "Logs", "Health", "Inventory"]
_PLACEHOLDERS = {
"Logs": "Captured crash logs will appear here once the logger (M3) lands.",
"Health": "The health report (M4) — log scan + plain-language findings — lands here.",
"Inventory": "System inventory (M5) — CPU/GPU/board/RAM/drivers — lands here.",
}
@@ -46,9 +46,11 @@ class MainWindow(QMainWindow):
content_layout.setContentsMargins(0, 0, 0, 0)
self._stack = QStackedWidget()
self.dashboard = Dashboard()
self._stack.addWidget(self.dashboard)
for name in _NAV_ITEMS[1:]:
self._stack.addWidget(self._placeholder_page(name, _PLACEHOLDERS[name]))
self.recorder_page = RecorderPage()
self._stack.addWidget(self.dashboard) # 0 Dashboard
self._stack.addWidget(self.recorder_page) # 1 Logs
self._stack.addWidget(self._placeholder_page("Health", _PLACEHOLDERS["Health"])) # 2
self._stack.addWidget(self._placeholder_page("Inventory", _PLACEHOLDERS["Inventory"])) # 3
content_layout.addWidget(self._stack)
layout.addWidget(self._build_sidebar())
+185
View File
@@ -0,0 +1,185 @@
"""Recording & Logs page (M3 in the GUI): start/stop/status + post-crash report.
Drives the same background recorder as the CLI via core.reccontrol, so the GUI and
`rigdoctor record …` are interchangeable.
"""
from __future__ import annotations
import time
from PySide6.QtCore import Qt, QTimer, QUrl
from PySide6.QtGui import QDesktopServices, QFont
from PySide6.QtWidgets import (
QDoubleSpinBox,
QFrame,
QHBoxLayout,
QLabel,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from .. import config
from ..core import reccontrol
from ..core.crashlog import summarize
from ..render import format_headline, render_summary
from .theme import GOOD, MUTED, WARN
def _panel(title: str) -> tuple[QFrame, QVBoxLayout]:
frame = QFrame()
frame.setObjectName("Card")
layout = QVBoxLayout(frame)
layout.setContentsMargins(16, 14, 16, 14)
layout.setSpacing(10)
label = QLabel(title)
label.setStyleSheet("font-weight: 700; background: transparent;")
layout.addWidget(label)
return frame, layout
def _fmt_time(value, fmt="%Y-%m-%d %H:%M:%S") -> str:
return time.strftime(fmt, time.localtime(value)) if value else ""
class RecorderPage(QWidget):
def __init__(self) -> None:
super().__init__()
self.setObjectName("Page")
root = QVBoxLayout(self)
root.setContentsMargins(20, 18, 20, 18)
root.setSpacing(16)
title = QLabel("Recording")
title.setObjectName("PageTitle")
root.addWidget(title)
# --- Status + controls -------------------------------------------------
status_card, status_layout = _panel("Status")
self._state = QLabel("○ Not recording")
self._state.setStyleSheet(f"color: {MUTED}; font-weight: 700; background: transparent;")
status_layout.addWidget(self._state)
self._info = QLabel("")
self._info.setObjectName("Muted")
status_layout.addWidget(self._info)
self._latest = QLabel("")
status_layout.addWidget(self._latest)
self._warn = QLabel("")
self._warn.setStyleSheet(f"color: {WARN}; font-weight: 600; background: transparent;")
self._warn.setVisible(False)
status_layout.addWidget(self._warn)
controls = QHBoxLayout()
controls.setSpacing(8)
controls.addWidget(QLabel("Interval (s)"))
self._interval = QDoubleSpinBox()
self._interval.setRange(0.1, 10.0)
self._interval.setSingleStep(0.1)
self._interval.setValue(float(config.DEFAULTS["interval"]))
controls.addWidget(self._interval)
self._start_btn = QPushButton("Start recording")
self._start_btn.setObjectName("PrimaryButton")
self._start_btn.clicked.connect(self._on_start)
self._stop_btn = QPushButton("Stop")
self._stop_btn.clicked.connect(self._on_stop)
controls.addWidget(self._start_btn)
controls.addWidget(self._stop_btn)
controls.addStretch(1)
folder_btn = QPushButton("Open log folder")
folder_btn.clicked.connect(self._open_folder)
controls.addWidget(folder_btn)
status_layout.addLayout(controls)
root.addWidget(status_card)
# --- Report ------------------------------------------------------------
report_card = QFrame()
report_card.setObjectName("Card")
report_layout = QVBoxLayout(report_card)
report_layout.setContentsMargins(16, 14, 16, 14)
report_layout.setSpacing(10)
header = QHBoxLayout()
report_title = QLabel("Post-crash report")
report_title.setStyleSheet("font-weight: 700; background: transparent;")
header.addWidget(report_title)
header.addStretch(1)
refresh_btn = QPushButton("Refresh")
refresh_btn.clicked.connect(self._load_report)
header.addWidget(refresh_btn)
report_layout.addLayout(header)
self._report = QTextEdit()
self._report.setObjectName("Report")
self._report.setReadOnly(True)
self._report.setFont(QFont("monospace", 10))
self._report.setLineWrapMode(QTextEdit.LineWrapMode.NoWrap)
report_layout.addWidget(self._report)
root.addWidget(report_card, 1)
# Poll recorder status once a second (reflects CLI-driven sessions too).
self._timer = QTimer(self)
self._timer.setInterval(1000)
self._timer.timeout.connect(self._refresh_status)
self._timer.start()
self._refresh_status()
self._load_report()
# --- actions ---------------------------------------------------------------
def _on_start(self) -> None:
self._start_btn.setEnabled(False)
reccontrol.start_background(interval=self._interval.value())
QTimer.singleShot(600, self._refresh_status)
def _on_stop(self) -> None:
self._stop_btn.setEnabled(False)
reccontrol.stop_background()
QTimer.singleShot(600, self._refresh_status)
QTimer.singleShot(900, self._load_report)
def _open_folder(self) -> None:
config.LOG_DIR.mkdir(parents=True, exist_ok=True)
QDesktopServices.openUrl(QUrl.fromLocalFile(str(config.LOG_DIR)))
# --- refresh ---------------------------------------------------------------
def _refresh_status(self) -> None:
pid = reccontrol.running_pid()
status = reccontrol.read_status()
running = pid is not None
if running:
self._state.setText(f"● Recording (pid {pid})")
self._state.setStyleSheet(f"color: {GOOD}; font-weight: 700; background: transparent;")
else:
self._state.setText("○ Not recording")
self._state.setStyleSheet(f"color: {MUTED}; font-weight: 700; background: transparent;")
self._start_btn.setEnabled(not running)
self._stop_btn.setEnabled(running)
self._interval.setEnabled(not running)
if status:
self._info.setText(
f"Samples: {status.get('samples', 0)} "
f"Started: {_fmt_time(status.get('started'))} "
f"Updated: {_fmt_time(status.get('updated'), '%H:%M:%S')}\n"
f"Log: {status.get('log', config.LOG_FILE)}"
)
latest = status.get("latest")
self._latest.setText(format_headline(latest) if latest else "")
if status.get("gpu_lost"):
self._warn.setText("⚠ A GPU-lost event was recorded this session")
self._warn.setVisible(True)
else:
self._warn.setVisible(False)
else:
self._info.setText("No recording yet. Press “Start recording”.")
self._latest.setText("")
self._warn.setVisible(False)
def _load_report(self) -> None:
summary = summarize(config.LOG_FILE, last_n=10)
self._report.setPlainText(render_summary(summary, log_path=config.LOG_FILE))
+19
View File
@@ -88,4 +88,23 @@ QScrollBar::handle:vertical {{ background: {CARD_BORDER}; border-radius: 5px; mi
QScrollBar::handle:vertical:hover {{ background: #3a414d; }}
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {{ height: 0; }}
QScrollBar::add-page:vertical, QScrollBar::sub-page:vertical {{ background: transparent; }}
QPushButton {{
background: #262b34; color: {TEXT}; border: 1px solid {CARD_BORDER};
border-radius: 8px; padding: 7px 14px;
}}
QPushButton:hover {{ background: #2f3540; }}
QPushButton:disabled {{ color: #5b626c; background: #1c2026; border-color: #23272f; }}
QPushButton#PrimaryButton {{ background: {ACCENT}; color: #06222e; border: none; font-weight: 700; }}
QPushButton#PrimaryButton:hover {{ background: #5cc8fb; }}
QPushButton#PrimaryButton:disabled {{ background: #27424f; color: #5f7c8a; }}
QDoubleSpinBox, QSpinBox {{
background: #262b34; color: {TEXT}; border: 1px solid {CARD_BORDER};
border-radius: 6px; padding: 4px 6px;
}}
QTextEdit#Report {{
background: #0d0f13; color: #cfd3da; border: 1px solid {CARD_BORDER}; border-radius: 8px;
}}
"""
+103 -7
View File
@@ -2,21 +2,29 @@
from __future__ import annotations
import time
from .core.crashlog import Summary, headline
from .core.sample import Reading, Sample
_GROUP_ORDER = ["gpu", "cpu", "memory", "storage"]
_GROUP_TITLES = {"gpu": "GPU", "cpu": "CPU", "memory": "Memory", "storage": "Storage"}
def format_raw(value: float | None, unit: str) -> str:
"""Format a value + unit for display."""
if value is None:
return "N/A"
if unit == "°C":
return f"{value:.1f} °C"
if unit:
return f"{value:g} {unit}"
return f"{value:g}"
def format_value(r: Reading) -> str:
"""Format a reading's value + unit for display (shared by CLI and GUI)."""
if r.value is None:
return "N/A"
if r.unit == "°C":
return f"{r.value:.1f} °C"
if r.unit:
return f"{r.value:g} {r.unit}"
return f"{r.value:g}"
return format_raw(r.value, r.unit)
def metric_label(r: Reading) -> str:
@@ -41,3 +49,91 @@ def render_snapshot(sample: Sample) -> str:
lines = [title] + [_fmt(r) for r in groups[key]]
blocks.append("\n".join(lines))
return "\n\n".join(blocks)
def format_headline(h: dict) -> str:
"""One-line headline summary from a headline() dict."""
def g(value, unit):
return format_raw(value, unit) if value is not None else ""
return (
f"GPU {g(h.get('gpu_temp'), '°C')} {g(h.get('gpu_util'), '%')} {g(h.get('gpu_power'), 'W')}"
f" · CPU {g(h.get('cpu_temp'), '°C')} · MEM {g(h.get('mem_pct'), '%')}"
)
def _fmt_duration(seconds: float) -> str:
seconds = int(seconds)
h, rem = divmod(seconds, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m}m {s}s"
if m:
return f"{m}m {s}s"
return f"{s}s"
# Metrics worth surfacing as session peaks (by metric name within reading.key).
_PEAK_METRICS = ("temp", "power", "util", "mem_util", "fan", "used_pct")
_SOURCE_ORDER = {"gpu": 0, "cpu": 1, "memory": 2, "storage": 3}
def _aggregate_peaks(maxima: dict) -> list[tuple[str, str, float, str, float, str]]:
"""Collapse per-label maxima to the single worst value per (source, metric).
Returns rows of (source, metric, value, unit, ts, label) in display order.
"""
agg: dict[tuple[str, str], tuple[float, str, float, str]] = {}
for key, (value, unit, ts) in maxima.items():
parts = key.split(".")
if len(parts) < 2 or parts[1] not in _PEAK_METRICS:
continue
source, metric = parts[0], parts[1]
label = ".".join(parts[2:])
current = agg.get((source, metric))
if current is None or value > current[0]:
agg[(source, metric)] = (value, unit, ts, label)
rows = [(s, m, v, u, ts, lbl) for (s, m), (v, u, ts, lbl) in agg.items()]
rows.sort(key=lambda r: (_SOURCE_ORDER.get(r[0], 9), r[1]))
return rows
def render_summary(summary: Summary, log_path=None) -> str:
if summary.samples == 0 and not summary.events:
where = f" ({log_path})" if log_path else ""
return f"No capture data found{where}. Start one with: rigdoctor record start"
lines: list[str] = ["Crash-capture report", ""]
if summary.start and summary.end:
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(summary.start))
end = time.strftime("%H:%M:%S", time.localtime(summary.end))
lines.append(f" Window : {start}{end} ({_fmt_duration(summary.end - summary.start)})")
lines.append(f" Samples : {summary.samples}")
if log_path:
lines.append(f" Log : {log_path}")
if summary.events:
lines += ["", "Events"]
for ts, kind, detail in summary.events:
stamp = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else "--:--:--"
mark = "" if "lost" in kind else " "
suffix = f"{detail}" if detail else ""
lines.append(f" {mark} {stamp} {kind}{suffix}")
peaks = _aggregate_peaks(summary.maxima)
if peaks:
lines += ["", "Peaks (session maximum)"]
for source, metric, value, unit, ts, label in peaks:
stamp = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else ""
detail = f" ({label})" if label else ""
name = f"{source} {metric}"
lines.append(f" {name:<16} {format_raw(value, unit):>10} at {stamp}{detail}")
if summary.last:
lines += ["", f"Last {len(summary.last)} samples (most recent last)"]
for sample in summary.last:
stamp = time.strftime("%H:%M:%S", time.localtime(sample.ts)) if sample.ts else "--:--:--"
lines.append(f" {stamp} {format_headline(headline(sample))}")
return "\n".join(lines)