ce5f830393
release / release (push) Successful in 2m13s
Crash-capture logger (M3): - crash-safe JSONL (fsync per sample), size-based rotation, GPU-lost/recovered markers, atomic status file - CLI: record run/start/stop/status/report (run = systemd-ready entrypoint) - shared core.reccontrol so CLI + GUI drive the same recorder - crashlog tests (writer, rotation, reader, summary, recorder) GUI: - Recording/Logs page: start/stop/interval controls, live status, post-crash report - shared render helpers (format_raw/headline, render_summary) Docs/decisions: - GUI-first (D17); CLI keeps full parity - D8 revised: user-local self-updating install primary, .deb optional - planned: M12 session sharing (D16), M13 no-root auto-update from public repo (D18) - versioning + CHANGELOG convention (D19) Infra: - .gitea/workflows/release.yml: build wheel+sdist and publish a Gitea release v<version> on push to main - align version to the 0.0.x release line; bump to 0.0.2 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
104 lines
3.8 KiB
Python
104 lines
3.8 KiB
Python
"""Tests for the M3 crash-capture log: writer, rotation, reader, summary, recorder."""
|
|
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from rigdoctor.core.crashlog import CrashLogWriter, iter_records, summarize
|
|
from rigdoctor.core.recorder import Recorder
|
|
from rigdoctor.core.sample import Reading, Sample
|
|
from rigdoctor.core.sampler import Sampler
|
|
from rigdoctor.core.sources.base import Source
|
|
|
|
|
|
class _FakeSource(Source):
|
|
name = "gpu"
|
|
|
|
def __init__(self, temp=50.0):
|
|
self._temp = temp
|
|
|
|
def probe(self):
|
|
return True
|
|
|
|
def read(self):
|
|
return [
|
|
Reading("gpu", "name", None, "", "Fake GPU"),
|
|
Reading("gpu", "temp", self._temp, "°C"),
|
|
Reading("gpu", "power", 100.0, "W"),
|
|
]
|
|
|
|
|
|
class CrashLogTests(unittest.TestCase):
|
|
def test_write_and_read_roundtrip(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = Path(d) / "capture.jsonl"
|
|
w = CrashLogWriter(path)
|
|
w.write_event("session-start")
|
|
w.write_sample(Sample(ts=1.0, readings=[Reading("gpu", "temp", 60.0, "°C")]))
|
|
w.write_event("gpu-lost", "timeout")
|
|
w.close()
|
|
|
|
records = list(iter_records(path))
|
|
self.assertEqual(records[0]["event"], "session-start")
|
|
self.assertEqual(records[1]["readings"][0], ["gpu", "temp", 60.0, "°C", ""])
|
|
self.assertEqual(records[2]["event"], "gpu-lost")
|
|
|
|
def test_rotation_bounds_segments(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = Path(d) / "capture.jsonl"
|
|
w = CrashLogWriter(path, max_bytes=200, backups=2)
|
|
for i in range(200):
|
|
w.write_sample(Sample(ts=float(i), readings=[Reading("gpu", "temp", float(i), "°C")]))
|
|
w.close()
|
|
# base + at most `backups` rotated segments
|
|
segments = list(Path(d).glob("capture.jsonl*"))
|
|
self.assertLessEqual(len(segments), 3)
|
|
self.assertTrue((Path(d) / "capture.jsonl").exists())
|
|
# rotation must not lose readability across segments
|
|
samples = [r for r in iter_records(path) if "readings" in r]
|
|
self.assertGreater(len(samples), 0)
|
|
|
|
def test_summary_tracks_peaks_and_events(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = Path(d) / "capture.jsonl"
|
|
w = CrashLogWriter(path)
|
|
w.write_sample(Sample(ts=1.0, readings=[Reading("gpu", "temp", 60.0, "°C")]))
|
|
w.write_sample(Sample(ts=2.0, readings=[Reading("gpu", "temp", 81.0, "°C")]))
|
|
w.write_event("gpu-lost", "timeout")
|
|
w.close()
|
|
|
|
s = summarize(path)
|
|
self.assertEqual(s.samples, 2)
|
|
self.assertEqual(s.maxima["gpu.temp"][0], 81.0)
|
|
self.assertEqual(s.events[0][1], "gpu-lost")
|
|
self.assertEqual(len(s.last), 2)
|
|
|
|
def test_recorder_writes_samples_and_stops(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = Path(d) / "capture.jsonl"
|
|
status = Path(d) / "status.json"
|
|
rec = Recorder(
|
|
interval=0.02,
|
|
log_path=path,
|
|
status_path=status,
|
|
sampler=Sampler([_FakeSource()]),
|
|
)
|
|
t = threading.Thread(target=rec.run)
|
|
t.start()
|
|
time.sleep(0.2)
|
|
rec.stop()
|
|
t.join(timeout=2)
|
|
|
|
self.assertFalse(t.is_alive())
|
|
self.assertGreater(rec.samples, 0)
|
|
self.assertTrue(status.exists())
|
|
kinds = [r.get("event") for r in iter_records(path) if "event" in r]
|
|
self.assertIn("session-start", kinds)
|
|
self.assertIn("session-stop", kinds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|