Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 856a3305ad | |||
| 3b1a2e7393 | |||
| 2989e8e23e | |||
| 670df23e06 | |||
| 2ee7763d00 | |||
| bd6cad5a42 | |||
| 7fa9b63661 | |||
| c443a8b9f8 | |||
| bbc22fa288 |
@@ -11,7 +11,20 @@ on:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: Install (core only)
|
||||
run: python -m pip install -e .
|
||||
- name: Run tests
|
||||
run: python -m unittest discover -s tests -v
|
||||
|
||||
release:
|
||||
needs: test # don't publish a release if the tests fail
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
name: tests
|
||||
run-name: Run test suite
|
||||
|
||||
# Runs the unittest suite on pull requests (once per PR). Pushes to main are covered by the
|
||||
# `test` job in release.yml, so we don't trigger on push here — that would double every run.
|
||||
# Two jobs:
|
||||
# core — stdlib-only install; the GUI tests skip (@skipUnless HAVE_QT). Bulletproof.
|
||||
# gui-smoke — installs the GUI extra + offscreen Qt libs and runs the same suite headless,
|
||||
# exercising the MainWindow/SetupWizard/DiagnosticDialog construction tests.
|
||||
# Make `tests / core (pull_request)` a required status check on `main` so a PR can't merge red.
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
core:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: Install (core only — no PySide6)
|
||||
run: python -m pip install -e .
|
||||
- name: Run tests (GUI tests skip without PySide6)
|
||||
run: python -m unittest discover -s tests -v
|
||||
|
||||
gui-smoke:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: System libraries for offscreen Qt
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y libegl1 libgl1 libxkbcommon0 libdbus-1-3 libglib2.0-0
|
||||
- name: Install (with GUI extra)
|
||||
run: python -m pip install -e ".[gui]"
|
||||
- name: Run tests (headless)
|
||||
env:
|
||||
QT_QPA_PLATFORM: offscreen
|
||||
run: python -m unittest discover -s tests -v
|
||||
@@ -5,6 +5,20 @@ All notable changes to RigDoctor are recorded here. Format follows
|
||||
(`MAJOR.MINOR.PATCH`, pre-1.0). `__version__` and `pyproject.toml` must match the git
|
||||
release tag (so the auto-updater, D18, can compare versions).
|
||||
|
||||
## [0.34.0] - 2026-05-22
|
||||
### Added
|
||||
- **Event-based alerts (M8).** Beyond temperature + GPU-lost, RigDoctor now notifies on
|
||||
**critical kernel events** — Xid (GPU error), out-of-memory kills, CPU machine-checks, PCIe
|
||||
AER errors, and disk I/O errors — scanned from the kernel log every ~30s while monitoring and
|
||||
fired one-shot (cooldown-gated, so no spam). A proactive warning the moment something goes
|
||||
wrong, not just on a temperature threshold. Included whenever desktop notifications are on.
|
||||
|
||||
## [0.33.0] - 2026-05-22
|
||||
### Added
|
||||
- **AI explanations stream live.** "Explain with AI" now fills token-by-token as the model
|
||||
generates (Ollama NDJSON + Claude SSE, both via stdlib `urllib`) instead of a multi-second
|
||||
freeze, then re-renders the finished answer as Markdown. `core/ai.explain_stream()`.
|
||||
|
||||
## [0.32.0] - 2026-05-22
|
||||
### Added
|
||||
- **More for diagnostics & reports:**
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "rigdoctor"
|
||||
version = "0.32.0"
|
||||
version = "0.34.0"
|
||||
description = "Modular hardware monitoring & crash diagnostics for Linux gamers."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
|
||||
|
||||
__version__ = "0.32.0"
|
||||
__version__ = "0.34.0"
|
||||
|
||||
@@ -150,6 +150,24 @@ def explain(findings_text: str, timeout: float = 120.0) -> tuple[bool, str]:
|
||||
return False, f"Unexpected response from the AI provider: {exc}"
|
||||
|
||||
|
||||
def explain_stream(findings_text: str, on_chunk, timeout: float = 180.0) -> tuple[bool, str]:
|
||||
"""Like :func:`explain`, but calls ``on_chunk(text_delta)`` as tokens arrive and returns
|
||||
``(ok, full_text)`` at the end. Caller MUST be a direct user action (D24)."""
|
||||
content = build_prompt(findings_text)
|
||||
try:
|
||||
if provider() == "claude":
|
||||
return _claude_stream(content, on_chunk, timeout)
|
||||
if provider() == "ollama":
|
||||
return _ollama_stream(content, on_chunk, timeout)
|
||||
return False, "No AI provider is configured (Settings → AI assistant)."
|
||||
except urllib.error.HTTPError as exc:
|
||||
return False, _http_error(exc)
|
||||
except (urllib.error.URLError, OSError, TimeoutError) as exc:
|
||||
return False, f"Couldn't reach the AI provider: {exc}"
|
||||
except (ValueError, KeyError, IndexError) as exc:
|
||||
return False, f"Unexpected response from the AI provider: {exc}"
|
||||
|
||||
|
||||
def _post(url: str, payload: dict, headers: dict, timeout: float) -> dict:
|
||||
req = urllib.request.Request(
|
||||
url, data=json.dumps(payload).encode("utf-8"),
|
||||
@@ -185,6 +203,65 @@ def _claude(content: str, timeout: float) -> tuple[bool, str]:
|
||||
return True, text.strip() or "(the model returned no text)"
|
||||
|
||||
|
||||
def _stream_request(url: str, payload: dict, headers: dict, timeout: float):
|
||||
req = urllib.request.Request(
|
||||
url, data=json.dumps(payload).encode("utf-8"),
|
||||
headers={"Content-Type": "application/json", **headers})
|
||||
return urllib.request.urlopen(req, timeout=timeout)
|
||||
|
||||
|
||||
def _ollama_stream(content: str, on_chunk, timeout: float) -> tuple[bool, str]:
|
||||
if not model():
|
||||
return False, "No Ollama model is set (Settings → AI assistant)."
|
||||
payload = {"model": model(), "system": SYSTEM_PROMPT, "prompt": content, "stream": True}
|
||||
parts: list[str] = []
|
||||
with _stream_request(endpoint().rstrip("/") + "/api/generate", payload, {}, timeout) as resp:
|
||||
for raw in resp: # newline-delimited JSON objects
|
||||
line = raw.decode("utf-8", "replace").strip()
|
||||
if not line:
|
||||
continue
|
||||
obj = json.loads(line)
|
||||
chunk = obj.get("response", "")
|
||||
if chunk:
|
||||
parts.append(chunk)
|
||||
on_chunk(chunk)
|
||||
if obj.get("done"):
|
||||
break
|
||||
return True, "".join(parts).strip() or "(the model returned an empty response)"
|
||||
|
||||
|
||||
def _claude_stream(content: str, on_chunk, timeout: float) -> tuple[bool, str]:
|
||||
key = config.load_ai_key()
|
||||
if not key:
|
||||
return False, "No Claude API key is set (Settings → AI assistant)."
|
||||
payload = {
|
||||
"model": model(), "max_tokens": CLAUDE_MAX_TOKENS, "system": SYSTEM_PROMPT,
|
||||
"messages": [{"role": "user", "content": content}], "stream": True,
|
||||
}
|
||||
headers = {"x-api-key": key, "anthropic-version": ANTHROPIC_VERSION}
|
||||
parts: list[str] = []
|
||||
with _stream_request(CLAUDE_ENDPOINT, payload, headers, timeout) as resp:
|
||||
for raw in resp: # SSE: parse `data:` lines, accumulate text deltas
|
||||
line = raw.decode("utf-8", "replace").strip()
|
||||
if not line.startswith("data:"):
|
||||
continue
|
||||
try:
|
||||
event = json.loads(line[5:].strip())
|
||||
except ValueError:
|
||||
continue
|
||||
etype = event.get("type")
|
||||
if etype == "content_block_delta" and event.get("delta", {}).get("type") == "text_delta":
|
||||
chunk = event["delta"].get("text", "")
|
||||
if chunk:
|
||||
parts.append(chunk)
|
||||
on_chunk(chunk)
|
||||
elif etype == "error":
|
||||
return False, event.get("error", {}).get("message", "stream error")
|
||||
elif etype == "message_stop":
|
||||
break
|
||||
return True, "".join(parts).strip() or "(the model returned no text)"
|
||||
|
||||
|
||||
def _http_error(exc: urllib.error.HTTPError) -> str:
|
||||
detail = ""
|
||||
try:
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Desktop alerts (M8): notify on overheat / GPU-lost / new version via notify-send.
|
||||
"""Desktop alerts (M8): notify on overheat / GPU-lost / critical kernel events / new version.
|
||||
|
||||
Edge-triggered: an alert fires when a condition becomes true (not every sample), and
|
||||
can fire again only after it has cleared and a cooldown has passed — so a hot GPU or a
|
||||
1-Hz sample loop doesn't spam notifications. Degrades to a no-op if notify-send is absent.
|
||||
Edge-triggered: a sustained condition (hot GPU, GPU-lost) fires once when it becomes true and
|
||||
can re-fire only after it clears + a cooldown; momentary **kernel events** (Xid, OOM-kill, MCE,
|
||||
PCIe AER, disk I/O errors) are scanned from the kernel log every `event_interval` seconds and
|
||||
fire one-shot (cooldown-gated). So a 1-Hz sample loop never spams. No-op if notify-send absent.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -57,13 +58,16 @@ def notify(title: str, message: str, urgency: str = "normal") -> bool:
|
||||
class AlertMonitor:
|
||||
"""Evaluate samples and raise edge-triggered desktop alerts."""
|
||||
|
||||
def __init__(self, gpu_temp: float = 90.0, cpu_temp: float = 95.0, cooldown: float = 300.0):
|
||||
def __init__(self, gpu_temp: float = 90.0, cpu_temp: float = 95.0, cooldown: float = 300.0,
|
||||
event_interval: float = 30.0):
|
||||
self.gpu_temp = gpu_temp
|
||||
self.cpu_temp = cpu_temp
|
||||
self.cooldown = cooldown
|
||||
self.event_interval = event_interval # how often to scan the kernel log
|
||||
self.enabled = True
|
||||
self._active: dict[str, bool] = {}
|
||||
self._last: dict[str, float] = {}
|
||||
self._last_kernel_scan = time.time() # only alert on events after the monitor starts
|
||||
|
||||
def _fire(self, key: str, title: str, message: str, urgency: str = "critical") -> None:
|
||||
if self._active.get(key):
|
||||
@@ -75,9 +79,39 @@ class AlertMonitor:
|
||||
self._last[key] = now
|
||||
notify(title, message, urgency)
|
||||
|
||||
def _notify_once(self, key: str, title: str, message: str, urgency: str = "critical") -> None:
|
||||
"""One-shot alert for a momentary event (cooldown-gated, no active latch)."""
|
||||
now = time.time()
|
||||
if now - self._last.get(key, 0.0) < self.cooldown:
|
||||
return
|
||||
self._last[key] = now
|
||||
notify(title, message, urgency)
|
||||
|
||||
def _clear(self, key: str) -> None:
|
||||
self._active[key] = False
|
||||
|
||||
def _scan_kernel_events(self) -> None:
|
||||
"""Periodically scan the kernel log for new critical events (Xid/OOM/MCE/PCIe/disk)."""
|
||||
now = time.time()
|
||||
if now - self._last_kernel_scan < self.event_interval:
|
||||
return
|
||||
since = self._last_kernel_scan
|
||||
self._last_kernel_scan = now
|
||||
try:
|
||||
from . import syslogs
|
||||
|
||||
text = syslogs.kernel_log(since=since)
|
||||
except Exception: # alerting must never crash the sample loop
|
||||
return
|
||||
if not text:
|
||||
return
|
||||
seen: set[str] = set()
|
||||
for label, line in syslogs.scan_critical(text):
|
||||
if label in seen: # one alert per category per scan
|
||||
continue
|
||||
seen.add(label)
|
||||
self._notify_once(f"kernel:{label}", label, line[:180])
|
||||
|
||||
def check(self, sample: Sample) -> None:
|
||||
if not self.enabled:
|
||||
return
|
||||
@@ -107,3 +141,5 @@ class AlertMonitor:
|
||||
self._fire("gpu_lost", "GPU not responding", "nvidia-smi query timed out — the GPU may have dropped")
|
||||
else:
|
||||
self._clear("gpu_lost")
|
||||
|
||||
self._scan_kernel_events() # Xid / OOM / MCE / PCIe / disk I/O from the kernel log
|
||||
|
||||
@@ -13,6 +13,7 @@ Best-effort and size-bounded: degrades silently if a tool is missing or access i
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
@@ -118,6 +119,29 @@ def display_log(since: float | None = None, max_bytes: int = _MAX) -> str:
|
||||
return _tail_file(log, max_bytes) if log else ""
|
||||
|
||||
|
||||
# Kernel-log patterns worth alerting on in real time (M8 event alerts). (label, regex).
|
||||
_CRITICAL = [
|
||||
("GPU error (Xid)", re.compile(r"NVRM:\s*Xid", re.I)),
|
||||
("Out of memory", re.compile(r"out of memory|oom-kill|killed process \d+", re.I)),
|
||||
("CPU machine-check", re.compile(r"\bmce:|machine check", re.I)),
|
||||
("PCIe error", re.compile(r"\bAER:|pcie bus error", re.I)),
|
||||
("Disk I/O error", re.compile(
|
||||
r"buffer i/o error|\bi/o error\b|critical medium error|ext4-fs error|"
|
||||
r"blk_update_request:.*error|ata\d+.*(?:failed|error)", re.I)),
|
||||
]
|
||||
|
||||
|
||||
def scan_critical(text: str) -> list[tuple[str, str]]:
|
||||
"""(label, line) for kernel lines matching a critical pattern (first match per line)."""
|
||||
events: list[tuple[str, str]] = []
|
||||
for line in text.splitlines():
|
||||
for label, pat in _CRITICAL:
|
||||
if pat.search(line):
|
||||
events.append((label, line.strip()))
|
||||
break
|
||||
return events
|
||||
|
||||
|
||||
def available() -> bool:
|
||||
return bool(shutil.which("journalctl") or shutil.which("coredumpctl")
|
||||
or shutil.which("nvidia-smi") or _xorg_log())
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtGui import QFont
|
||||
from PySide6.QtGui import QFont, QTextCursor
|
||||
from PySide6.QtWidgets import (
|
||||
QDialog,
|
||||
QFrame,
|
||||
@@ -24,11 +24,15 @@ from .widgets import finding_card
|
||||
|
||||
|
||||
class DiagnosticDialog(QDialog):
|
||||
_explained = Signal(object) # (ok, text) from a user-triggered AI explanation
|
||||
_chunk = Signal(str) # streamed token delta (worker thread -> GUI)
|
||||
_explained = Signal(object) # (ok, full_text) when the AI stream finishes
|
||||
|
||||
def __init__(self, result, parent=None) -> None:
|
||||
super().__init__(parent)
|
||||
self._result = result
|
||||
self._stream_view = None
|
||||
self._stream_status = None
|
||||
self._chunk.connect(self._on_chunk)
|
||||
self._explained.connect(self._on_explained)
|
||||
self.setWindowTitle(f"Diagnostic — {result.game}" if result.game else "Diagnostic")
|
||||
self.resize(660, 680)
|
||||
@@ -97,7 +101,7 @@ class DiagnosticDialog(QDialog):
|
||||
buttons.addWidget(close)
|
||||
root.addLayout(buttons)
|
||||
|
||||
# --- AI explanation (M14, D24) — runs only on this button press ----------------
|
||||
# --- AI explanation (M14, D24) — streamed; runs only on this button press ----------
|
||||
def _explain_with_ai(self) -> None:
|
||||
from ..core import ai
|
||||
|
||||
@@ -111,8 +115,11 @@ class DiagnosticDialog(QDialog):
|
||||
if confirm != QMessageBox.StandardButton.Yes:
|
||||
return
|
||||
self._explain_btn.setEnabled(False)
|
||||
self._explain_btn.setText("Asking the AI…")
|
||||
dialog = self._open_stream_dialog()
|
||||
threading.Thread(target=self._work_explain, daemon=True).start()
|
||||
dialog.exec() # streaming fills the view live via signals during this nested loop
|
||||
self._stream_view = self._stream_status = None
|
||||
self._explain_btn.setEnabled(True)
|
||||
|
||||
def _work_explain(self) -> None:
|
||||
from ..core import ai, gamelogs, syslogs
|
||||
@@ -143,7 +150,8 @@ class DiagnosticDialog(QDialog):
|
||||
if sys_logs:
|
||||
lines.append("\nSystem logs for this session (kernel + crashed processes):\n" + sys_logs)
|
||||
text = "\n".join(lines)
|
||||
ok, reply = ai.explain(text)
|
||||
|
||||
ok, reply = ai.explain_stream(text, on_chunk=lambda d: self._chunk.emit(d))
|
||||
if result.dir: # record exactly what was sent, the model, and the reply (M15)
|
||||
from ..core import diagstore
|
||||
diagstore.record_ai(
|
||||
@@ -152,11 +160,24 @@ class DiagnosticDialog(QDialog):
|
||||
response=reply if ok else f"[error] {reply}")
|
||||
self._explained.emit((ok, reply))
|
||||
|
||||
def _on_chunk(self, delta: str) -> None:
|
||||
if self._stream_view is None:
|
||||
return
|
||||
self._stream_view.moveCursor(QTextCursor.MoveOperation.End)
|
||||
self._stream_view.insertPlainText(delta) # live plain text as tokens arrive
|
||||
self._stream_view.ensureCursorVisible()
|
||||
|
||||
def _on_explained(self, result) -> None:
|
||||
ok, text = result
|
||||
self._explain_btn.setEnabled(True)
|
||||
self._explain_btn.setText("Explain with AI")
|
||||
self._show_explanation(text if ok else f"AI explanation failed:\n\n{text}")
|
||||
if self._stream_view is not None:
|
||||
if ok:
|
||||
self._stream_view.setMarkdown(text) # re-render the finished answer as Markdown
|
||||
else:
|
||||
self._stream_view.setPlainText(f"AI explanation failed:\n\n{text}")
|
||||
if self._stream_status is not None:
|
||||
self._stream_status.setText(
|
||||
"AI-generated suggestions — verify before acting, especially anything that changes "
|
||||
"settings or data." if ok else "The request failed.")
|
||||
|
||||
# --- Report bundle (M15) ------------------------------------------------------
|
||||
def _make_report(self) -> None:
|
||||
@@ -183,7 +204,8 @@ class DiagnosticDialog(QDialog):
|
||||
if box.clickedButton() is open_btn:
|
||||
QDesktopServices.openUrl(QUrl.fromLocalFile(str(out.parent)))
|
||||
|
||||
def _show_explanation(self, text: str) -> None:
|
||||
def _open_stream_dialog(self) -> QDialog:
|
||||
"""A live dialog the AI streams into; finalized to rendered Markdown when done."""
|
||||
from ..core import ai
|
||||
|
||||
dlg = QDialog(self)
|
||||
@@ -193,14 +215,15 @@ class DiagnosticDialog(QDialog):
|
||||
view = QTextEdit()
|
||||
view.setObjectName("Report")
|
||||
view.setReadOnly(True)
|
||||
view.setMarkdown(text) # the model replies in Markdown — render it
|
||||
lay.addWidget(view)
|
||||
note = QLabel("AI-generated suggestions — verify before acting, especially anything that changes settings or data.")
|
||||
note.setObjectName("Muted")
|
||||
note.setWordWrap(True)
|
||||
lay.addWidget(note)
|
||||
status = QLabel("Streaming from the model…")
|
||||
status.setObjectName("Muted")
|
||||
status.setWordWrap(True)
|
||||
lay.addWidget(status)
|
||||
close = QPushButton("Close")
|
||||
close.setObjectName("PrimaryButton")
|
||||
close.clicked.connect(dlg.accept)
|
||||
lay.addWidget(close, alignment=Qt.AlignmentFlag.AlignRight)
|
||||
dlg.exec()
|
||||
self._stream_view = view
|
||||
self._stream_status = status
|
||||
return dlg
|
||||
|
||||
@@ -114,7 +114,8 @@ class SetupPage(QWidget):
|
||||
grid.addWidget(QLabel("CPU temperature alert"), 1, 0)
|
||||
grid.addWidget(self._cpu_alert, 1, 1)
|
||||
alerts_layout.addLayout(grid)
|
||||
alerts_note = QLabel("GPU-lost and new-version alerts are included whenever notifications are enabled.")
|
||||
alerts_note = QLabel("GPU-lost, critical kernel events (Xid, out-of-memory, disk I/O, PCIe), "
|
||||
"and new-version alerts are included whenever notifications are enabled.")
|
||||
alerts_note.setObjectName("Muted")
|
||||
alerts_note.setWordWrap(True)
|
||||
alerts_layout.addWidget(alerts_note)
|
||||
|
||||
@@ -114,5 +114,51 @@ class ExplainTests(unittest.TestCase):
|
||||
self.assertEqual(headers["x-api-key"], "sk-ant-x")
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
"""A context-managed iterable of byte lines, like urlopen() returns."""
|
||||
def __init__(self, lines):
|
||||
self._lines = [l.encode("utf-8") for l in lines]
|
||||
def __enter__(self):
|
||||
return iter(self._lines)
|
||||
def __exit__(self, *a):
|
||||
return False
|
||||
|
||||
|
||||
class StreamTests(unittest.TestCase):
|
||||
def _cfg(self, **over):
|
||||
base = {"ai_provider": "", "ai_model": "", "ai_endpoint": "http://localhost:11434"}
|
||||
base.update(over)
|
||||
return base
|
||||
|
||||
def test_ollama_stream_accumulates_and_callbacks(self):
|
||||
lines = ['{"response": "It is ", "done": false}',
|
||||
'{"response": "the PSU.", "done": false}',
|
||||
'{"response": "", "done": true}']
|
||||
chunks = []
|
||||
with mock.patch.object(ai.config, "load_config",
|
||||
return_value=self._cfg(ai_provider="ollama", ai_model="qwen2.5:7b")), \
|
||||
mock.patch.object(ai, "_stream_request", return_value=_FakeResp(lines)):
|
||||
ok, full = ai.explain_stream("Xid 79", on_chunk=chunks.append)
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(full, "It is the PSU.")
|
||||
self.assertEqual(chunks, ["It is ", "the PSU."])
|
||||
|
||||
def test_claude_stream_parses_sse(self):
|
||||
lines = [
|
||||
'event: content_block_delta',
|
||||
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Failing "}}',
|
||||
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"disk."}}',
|
||||
'data: {"type":"message_stop"}',
|
||||
]
|
||||
chunks = []
|
||||
with mock.patch.object(ai.config, "load_config", return_value=self._cfg(ai_provider="claude")), \
|
||||
mock.patch.object(ai.config, "load_ai_key", return_value="sk-ant-x"), \
|
||||
mock.patch.object(ai, "_stream_request", return_value=_FakeResp(lines)):
|
||||
ok, full = ai.explain_stream("SMART 197", on_chunk=chunks.append)
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(full, "Failing disk.")
|
||||
self.assertEqual(chunks, ["Failing ", "disk."])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -34,5 +34,35 @@ class AlertTests(unittest.TestCase):
|
||||
m.assert_called_once()
|
||||
|
||||
|
||||
class KernelEventAlertTests(unittest.TestCase):
|
||||
@mock.patch.object(alerts, "notify")
|
||||
def test_kernel_event_fires_once_within_cooldown(self, m):
|
||||
mon = alerts.AlertMonitor(cooldown=300.0, event_interval=0.0)
|
||||
mon._last_kernel_scan = 0.0 # force a scan
|
||||
with mock.patch("rigdoctor.core.syslogs.kernel_log",
|
||||
return_value="NVRM: Xid (PCI:0000:01:00): 79, GPU has fallen off the bus"):
|
||||
mon._scan_kernel_events()
|
||||
mon._last_kernel_scan = 0.0 # force another scan — cooldown must suppress it
|
||||
mon._scan_kernel_events()
|
||||
self.assertEqual(m.call_count, 1)
|
||||
self.assertIn("Xid", m.call_args[0][0])
|
||||
|
||||
@mock.patch.object(alerts, "notify")
|
||||
def test_no_alert_when_kernel_log_empty(self, m):
|
||||
mon = alerts.AlertMonitor(event_interval=0.0)
|
||||
mon._last_kernel_scan = 0.0
|
||||
with mock.patch("rigdoctor.core.syslogs.kernel_log", return_value=""):
|
||||
mon._scan_kernel_events()
|
||||
m.assert_not_called()
|
||||
|
||||
@mock.patch.object(alerts, "notify")
|
||||
def test_scan_gated_by_interval(self, m):
|
||||
mon = alerts.AlertMonitor(event_interval=9999.0) # just constructed → not due yet
|
||||
with mock.patch("rigdoctor.core.syslogs.kernel_log", return_value="NVRM: Xid 79") as kl:
|
||||
mon._scan_kernel_events()
|
||||
kl.assert_not_called()
|
||||
m.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -72,6 +72,25 @@ class DisplayTests(unittest.TestCase):
|
||||
self.assertTrue(any(a.startswith("_COMM=") for a in cmd))
|
||||
|
||||
|
||||
class ScanCriticalTests(unittest.TestCase):
|
||||
def test_matches_each_category(self):
|
||||
text = "\n".join([
|
||||
"NVRM: Xid (PCI:0000:01:00): 79, GPU has fallen off the bus",
|
||||
"Out of memory: Killed process 1234 (PathOfExile)",
|
||||
"mce: [Hardware Error]: CPU 0",
|
||||
"pcieport 0000:00:01.0: AER: Corrected error received",
|
||||
"blk_update_request: I/O error, dev sda, sector 99",
|
||||
"this is a perfectly normal line",
|
||||
])
|
||||
labels = {label for label, _ in syslogs.scan_critical(text)}
|
||||
self.assertEqual(labels, {
|
||||
"GPU error (Xid)", "Out of memory", "CPU machine-check",
|
||||
"PCIe error", "Disk I/O error"})
|
||||
|
||||
def test_clean_log_no_events(self):
|
||||
self.assertEqual(syslogs.scan_critical("usb 1-2: new high-speed device\nsystemd: started"), [])
|
||||
|
||||
|
||||
class CollectTests(unittest.TestCase):
|
||||
def test_collect_combines_sections(self):
|
||||
with mock.patch.object(syslogs, "kernel_log", return_value="NVRM: Xid 79"), \
|
||||
|
||||
Reference in New Issue
Block a user