Files
rigdoctor/src/rigdoctor/core/crashlog.py
T
jessey ce5f830393
release / release (push) Successful in 2m13s
Release 0.0.2: M3 logger (CLI + GUI), GUI-first, CI release workflow
Crash-capture logger (M3):
- crash-safe JSONL (fsync per sample), size-based rotation, GPU-lost/recovered
  markers, atomic status file
- CLI: record run/start/stop/status/report (run = systemd-ready entrypoint)
- shared core.reccontrol so CLI + GUI drive the same recorder
- crashlog tests (writer, rotation, reader, summary, recorder)

GUI:
- Recording/Logs page: start/stop/interval controls, live status, post-crash report
- shared render helpers (format_raw/headline, render_summary)

Docs/decisions:
- GUI-first (D17); CLI keeps full parity
- D8 revised: user-local self-updating install primary, .deb optional
- planned: M12 session sharing (D16), M13 no-root auto-update from public repo (D18)
- versioning + CHANGELOG convention (D19)

Infra:
- .gitea/workflows/release.yml: build wheel+sdist and publish a Gitea release
  v<version> on push to main
- align version to the 0.0.x release line; bump to 0.0.2

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 17:16:41 +02:00

178 lines
5.8 KiB
Python

"""Crash-capture log (M3): rotating, fsync-per-sample JSONL writer + reader + summary.
On-disk format is JSON Lines, one record per line:
sample : {"ts": <float>, "readings": [[source, metric, value, unit, label], ...]}
event : {"ts": <float>, "event": <str>, "detail": <str>}
Every line is flushed and fsync'd, so the readings right before a hard lock survive.
A torn final line (interrupted mid-write by a crash) is tolerated on read.
"""
from __future__ import annotations
import json
import os
import time
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from .sample import Reading, Sample
class CrashLogWriter:
"""Append samples/events as JSONL, fsync per line, rotate by size."""
def __init__(self, path, max_bytes: int = 20_000_000, backups: int = 10) -> None:
self.path = Path(path)
self.max_bytes = int(max_bytes)
self.backups = int(backups)
self.path.parent.mkdir(parents=True, exist_ok=True)
self._fh = open(self.path, "a", encoding="utf-8")
def _write(self, obj: dict) -> None:
self._fh.write(json.dumps(obj, separators=(",", ":"), ensure_ascii=False))
self._fh.write("\n")
self._fh.flush()
os.fsync(self._fh.fileno()) # survive a hard lock
if self.max_bytes and self._fh.tell() >= self.max_bytes:
self._rotate()
def write_sample(self, sample: Sample) -> None:
rows = [[r.source, r.metric, r.value, r.unit, r.label] for r in sample.readings]
self._write({"ts": round(sample.ts, 3), "readings": rows})
def write_event(self, kind: str, detail: str = "") -> None:
self._write({"ts": round(time.time(), 3), "event": kind, "detail": detail})
def _rotate(self) -> None:
# Mirror logging.handlers.RotatingFileHandler: shift base.i -> base.i+1.
self._fh.close()
base = str(self.path)
for i in range(self.backups - 1, 0, -1):
src = Path(f"{base}.{i}")
dst = Path(f"{base}.{i + 1}")
if src.exists():
if dst.exists():
dst.unlink()
src.rename(dst)
if self.backups > 0:
first = Path(f"{base}.1")
if first.exists():
first.unlink()
self.path.rename(first)
self._fh = open(self.path, "a", encoding="utf-8")
def close(self) -> None:
try:
self._fh.close()
except Exception:
pass
def _segment_files(path) -> list[Path]:
"""All log segments oldest→newest: base.N … base.1, base."""
base = Path(path)
numbered: list[tuple[int, Path]] = []
for p in base.parent.glob(base.name + ".*"):
suffix = p.name[len(base.name) + 1:]
if suffix.isdigit():
numbered.append((int(suffix), p))
numbered.sort(reverse=True) # highest number = oldest
files = [p for _, p in numbered]
if base.exists():
files.append(base)
return files
def iter_records(path, include_backups: bool = True):
"""Yield parsed records oldest→newest, tolerating a torn final line."""
files = _segment_files(path) if include_backups else [Path(path)]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except ValueError:
continue
except OSError:
continue
def record_to_sample(rec: dict) -> Sample:
readings = [Reading(s, m, v, u, label) for s, m, v, u, label in rec.get("readings", [])]
return Sample(ts=rec.get("ts", 0.0), readings=readings)
def headline(sample: Sample) -> dict:
"""Extract the few at-a-glance values used by status/report displays."""
def find(source: str, metric: str, label: str | None = None):
for r in sample.readings:
if r.source == source and r.metric == metric and (label is None or r.label == label):
return r.value
return None
cpu_pkg = None
cpu_temps = []
for r in sample.readings:
if r.source == "cpu" and r.metric == "temp" and r.value is not None:
cpu_temps.append(r.value)
low = r.label.lower()
if cpu_pkg is None and (low.startswith("package") or "tctl" in low or "tdie" in low):
cpu_pkg = r.value
if cpu_pkg is None and cpu_temps:
cpu_pkg = max(cpu_temps)
return {
"gpu_temp": find("gpu", "temp", ""),
"gpu_util": find("gpu", "util"),
"gpu_power": find("gpu", "power"),
"cpu_temp": cpu_pkg,
"mem_pct": find("memory", "used_pct"),
}
@dataclass
class Summary:
start: float | None
end: float | None
samples: int
maxima: dict # reading.key -> (value, unit, ts)
events: list # [(ts, kind, detail), ...]
last: list # [Sample, ...] most recent
def summarize(path, last_n: int = 10) -> Summary:
start = end = None
count = 0
maxima: dict = {}
events: list = []
recent: deque = deque(maxlen=last_n)
for rec in iter_records(path):
ts = rec.get("ts")
if "event" in rec:
events.append((ts, rec.get("event", ""), rec.get("detail", "")))
continue
if "readings" not in rec:
continue
count += 1
if start is None:
start = ts
end = ts
sample = record_to_sample(rec)
recent.append(sample)
for r in sample.readings:
if r.value is None:
continue
current = maxima.get(r.key)
if current is None or r.value > current[0]:
maxima[r.key] = (r.value, r.unit, ts)
return Summary(start, end, count, maxima, events, list(recent))