Files
rigdoctor/src/rigdoctor/render.py
T
jessey 2e6a981120
release / release (push) Successful in 13s
Release 0.0.5: health report (M4), installer (M9), update check (M13)
M4 — health report (the 0.0.4 CHANGELOG entry, folded into this release):
- core/health.py: scan journalctl (Xid/panic/OOM/MCE/AER/thermal), SMART,
  NVIDIA driver mismatch, journald persistence, live temps -> findings
- CLI `rigdoctor report` (text/JSON); GUI Health tab; scanner tests

M9 — installer (first cut):
- core/{catalog,sysenv,installer}.py; `rigdoctor install [--check] [-y]`
- GUI Setup tab: detect distro/GPU, show optional components, one-click
  install of missing apt packages via pkexec/sudo

M13 — update check (check half):
- core/updates.py; sidebar shows up-to-date / "Update to v…" / unavailable

Plus tests, version bump to 0.0.5, CHANGELOG, and doc status updates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 17:36:11 +02:00

159 lines
5.7 KiB
Python

"""Human-readable rendering of a Sample for the terminal."""
from __future__ import annotations
import time
from .core.crashlog import Summary, headline
from .core.sample import Reading, Sample
_GROUP_ORDER = ["gpu", "cpu", "memory", "storage"]
_GROUP_TITLES = {"gpu": "GPU", "cpu": "CPU", "memory": "Memory", "storage": "Storage"}
def format_raw(value: float | None, unit: str) -> str:
"""Format a value + unit for display."""
if value is None:
return "N/A"
if unit == "°C":
return f"{value:.1f} °C"
if unit:
return f"{value:g} {unit}"
return f"{value:g}"
def format_value(r: Reading) -> str:
"""Format a reading's value + unit for display (shared by CLI and GUI)."""
return format_raw(r.value, r.unit)
def metric_label(r: Reading) -> str:
"""Human label for a reading's metric (e.g. 'temp memory')."""
return f"{r.metric} {r.label}".strip()
def _fmt(r: Reading) -> str:
if r.metric == "name": # GPU/device identity line
return f" {r.label}"
return f" {metric_label(r):<22} {format_value(r)}"
def render_snapshot(sample: Sample) -> str:
groups = sample.by_source()
ordered = [k for k in _GROUP_ORDER if k in groups]
ordered += [k for k in groups if k not in _GROUP_ORDER]
blocks: list[str] = []
for key in ordered:
title = _GROUP_TITLES.get(key, key.title())
lines = [title] + [_fmt(r) for r in groups[key]]
blocks.append("\n".join(lines))
return "\n\n".join(blocks)
def format_headline(h: dict) -> str:
"""One-line headline summary from a headline() dict."""
def g(value, unit):
return format_raw(value, unit) if value is not None else ""
return (
f"GPU {g(h.get('gpu_temp'), '°C')} {g(h.get('gpu_util'), '%')} {g(h.get('gpu_power'), 'W')}"
f" · CPU {g(h.get('cpu_temp'), '°C')} · MEM {g(h.get('mem_pct'), '%')}"
)
def _fmt_duration(seconds: float) -> str:
seconds = int(seconds)
h, rem = divmod(seconds, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m}m {s}s"
if m:
return f"{m}m {s}s"
return f"{s}s"
# Metrics worth surfacing as session peaks (by metric name within reading.key).
_PEAK_METRICS = ("temp", "power", "util", "mem_util", "fan", "used_pct")
_SOURCE_ORDER = {"gpu": 0, "cpu": 1, "memory": 2, "storage": 3}
def _aggregate_peaks(maxima: dict) -> list[tuple[str, str, float, str, float, str]]:
"""Collapse per-label maxima to the single worst value per (source, metric).
Returns rows of (source, metric, value, unit, ts, label) in display order.
"""
agg: dict[tuple[str, str], tuple[float, str, float, str]] = {}
for key, (value, unit, ts) in maxima.items():
parts = key.split(".")
if len(parts) < 2 or parts[1] not in _PEAK_METRICS:
continue
source, metric = parts[0], parts[1]
label = ".".join(parts[2:])
current = agg.get((source, metric))
if current is None or value > current[0]:
agg[(source, metric)] = (value, unit, ts, label)
rows = [(s, m, v, u, ts, lbl) for (s, m), (v, u, ts, lbl) in agg.items()]
rows.sort(key=lambda r: (_SOURCE_ORDER.get(r[0], 9), r[1]))
return rows
_SEV_LABEL = {"critical": "CRITICAL", "warning": "WARNING", "info": "INFO", "ok": "OK"}
def render_health(findings: list) -> str:
if not findings:
return "Health report: no findings."
crit = sum(1 for f in findings if f.severity == "critical")
warn = sum(1 for f in findings if f.severity == "warning")
lines = ["Health report", "", f" {crit} critical · {warn} warning · {len(findings)} checks", ""]
for f in findings:
lines.append(f"[{_SEV_LABEL.get(f.severity, '?')}] {f.category}: {f.title}")
if f.detail:
lines.append(f" {f.detail}")
if f.suggestion:
lines.append(f"{f.suggestion}")
lines.append("")
return "\n".join(lines).rstrip()
def render_summary(summary: Summary, log_path=None) -> str:
if summary.samples == 0 and not summary.events:
where = f" ({log_path})" if log_path else ""
return f"No capture data found{where}. Start one with: rigdoctor record start"
lines: list[str] = ["Crash-capture report", ""]
if summary.start and summary.end:
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(summary.start))
end = time.strftime("%H:%M:%S", time.localtime(summary.end))
lines.append(f" Window : {start}{end} ({_fmt_duration(summary.end - summary.start)})")
lines.append(f" Samples : {summary.samples}")
if log_path:
lines.append(f" Log : {log_path}")
if summary.events:
lines += ["", "Events"]
for ts, kind, detail in summary.events:
stamp = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else "--:--:--"
mark = "" if "lost" in kind else " "
suffix = f"{detail}" if detail else ""
lines.append(f" {mark} {stamp} {kind}{suffix}")
peaks = _aggregate_peaks(summary.maxima)
if peaks:
lines += ["", "Peaks (session maximum)"]
for source, metric, value, unit, ts, label in peaks:
stamp = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else ""
detail = f" ({label})" if label else ""
name = f"{source} {metric}"
lines.append(f" {name:<16} {format_raw(value, unit):>10} at {stamp}{detail}")
if summary.last:
lines += ["", f"Last {len(summary.last)} samples (most recent last)"]
for sample in summary.last:
stamp = time.strftime("%H:%M:%S", time.localtime(sample.ts)) if sample.ts else "--:--:--"
lines.append(f" {stamp} {format_headline(headline(sample))}")
return "\n".join(lines)