Release 0.0.6: token-gated updates (M13) with encrypted storage
release / release (push) Successful in 13s
release / release (push) Successful in 13s
- updates gated to Gitea account holders via a Personal Access Token (D18 revised: anonymous HTTP -> authenticated HTTP, since the instance requires sign-in for all anonymous access) - token stored encrypted in the OS keyring (secret-tool) when available, with a 0600-file fallback; $RIGDOCTOR_TOKEN override; auto-migrate file->keyring once libsecret-tools is installed - core/updates: token-aware fetch_latest + update_state (no-token/auth/network/ up-to-date/available) - CLI: rigdoctor login / logout / update [--check] - GUI: Setup "Update access" panel (token field, get-a-token, backend status); sidebar update states; libsecret-tools added to the installer catalog - token storage tests (file fallback + env override, keyring mocked) - version 0.0.6, CHANGELOG, docs Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
"""RigDoctor — modular hardware monitoring & crash diagnostics for Linux gamers."""
|
||||
|
||||
__version__ = "0.0.5"
|
||||
__version__ = "0.0.6"
|
||||
|
||||
@@ -212,6 +212,64 @@ def cmd_install(args) -> int:
|
||||
return rc
|
||||
|
||||
|
||||
def cmd_login(args) -> int:
|
||||
from getpass import getpass
|
||||
|
||||
from .core import updates
|
||||
|
||||
token = args.token
|
||||
if not token:
|
||||
print(f"Create a token (scope read:repository) at: {updates.TOKEN_PAGE}")
|
||||
try:
|
||||
token = getpass("Paste token: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
token = ""
|
||||
if not token:
|
||||
print("No token provided.")
|
||||
return 1
|
||||
config.save_token(token)
|
||||
state, tag = updates.update_state()
|
||||
if state == updates.AUTH:
|
||||
print("Token saved, but the server rejected it (check scope/permissions).")
|
||||
return 1
|
||||
if state in (updates.UP_TO_DATE, updates.AVAILABLE):
|
||||
print(f"Token saved and verified. Latest release: {tag}.")
|
||||
return 0
|
||||
print("Token saved (couldn't reach the server to verify right now).")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_logout(args) -> int:
|
||||
config.clear_token()
|
||||
print("Update token removed.")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_update(args) -> int:
|
||||
from .core import updates
|
||||
|
||||
state, tag = updates.update_state()
|
||||
if state == updates.NO_TOKEN:
|
||||
print("No update token. Run `rigdoctor login` after creating one at:")
|
||||
print(f" {updates.TOKEN_PAGE}")
|
||||
return 1
|
||||
if state == updates.AUTH:
|
||||
print("The update server rejected your token (check scope/permissions).")
|
||||
return 1
|
||||
if state == updates.NETWORK:
|
||||
print("Couldn't reach the update server.")
|
||||
return 1
|
||||
if state == updates.UP_TO_DATE:
|
||||
print(f"Up to date (v{__version__}).")
|
||||
return 0
|
||||
# AVAILABLE
|
||||
print(f"Update available: {tag} (current v{__version__}).")
|
||||
if args.check:
|
||||
return 0
|
||||
print("Self-update (apply) isn't wired yet — coming with the install script.")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_report(args) -> int:
|
||||
from dataclasses import asdict
|
||||
|
||||
@@ -250,6 +308,15 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
inst.add_argument("-y", "--yes", action="store_true", help="install without confirmation")
|
||||
inst.set_defaults(func=cmd_install)
|
||||
|
||||
login = sub.add_parser("login", help="save a Gitea token for updates (M13)")
|
||||
login.add_argument("--token", default=None, help="token (prompted if omitted)")
|
||||
login.set_defaults(func=cmd_login)
|
||||
sub.add_parser("logout", help="remove the saved update token").set_defaults(func=cmd_logout)
|
||||
|
||||
upd = sub.add_parser("update", help="check for / apply a newer version (M13)")
|
||||
upd.add_argument("--check", action="store_true", help="only report, don't apply")
|
||||
upd.set_defaults(func=cmd_update)
|
||||
|
||||
rec = sub.add_parser("record", help="crash-capture logger (M3)")
|
||||
rec_sub = rec.add_subparsers(dest="record_cmd", required=True)
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
APP = "rigdoctor"
|
||||
@@ -25,6 +27,112 @@ STATUS_FILE = STATE_DIR / "recorder.json"
|
||||
PID_FILE = STATE_DIR / "recorder.pid"
|
||||
SPAWN_LOG = STATE_DIR / "recorder.out"
|
||||
|
||||
# Update access token (M13) — gates updates to Gitea account holders (D18).
|
||||
# Stored in the OS keyring (Secret Service / GNOME Keyring) via `secret-tool` when
|
||||
# available — encrypted at rest, unlocked with the login session — else a 0600 file.
|
||||
TOKEN_FILE = CONFIG_DIR / "token"
|
||||
_SECRET_ATTRS = ["application", "rigdoctor", "type", "update-token"]
|
||||
|
||||
|
||||
def _secret_tool() -> str | None:
|
||||
return shutil.which("secret-tool")
|
||||
|
||||
|
||||
def keyring_available() -> bool:
|
||||
"""True if an encrypted OS keyring (secret-tool) is usable."""
|
||||
return _secret_tool() is not None
|
||||
|
||||
|
||||
def _keyring_store(token: str) -> bool:
|
||||
tool = _secret_tool()
|
||||
if not tool:
|
||||
return False
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
[tool, "store", "--label", "RigDoctor update token", *_SECRET_ATTRS],
|
||||
input=token, text=True, capture_output=True, timeout=20,
|
||||
)
|
||||
return proc.returncode == 0
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def _keyring_lookup() -> str | None:
|
||||
tool = _secret_tool()
|
||||
if not tool:
|
||||
return None
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
[tool, "lookup", *_SECRET_ATTRS], text=True, capture_output=True, timeout=20
|
||||
)
|
||||
if proc.returncode == 0 and proc.stdout.strip():
|
||||
return proc.stdout.strip()
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _keyring_clear() -> None:
|
||||
tool = _secret_tool()
|
||||
if not tool:
|
||||
return
|
||||
try:
|
||||
subprocess.run([tool, "clear", *_SECRET_ATTRS], capture_output=True, timeout=20)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
|
||||
|
||||
def load_token() -> str | None:
|
||||
"""Token from $RIGDOCTOR_TOKEN, then the OS keyring, then a 0600 file."""
|
||||
env = os.environ.get("RIGDOCTOR_TOKEN")
|
||||
if env and env.strip():
|
||||
return env.strip()
|
||||
from_keyring = _keyring_lookup()
|
||||
if from_keyring:
|
||||
return from_keyring
|
||||
try:
|
||||
token = TOKEN_FILE.read_text().strip()
|
||||
return token or None
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
|
||||
def save_token(token: str) -> None:
|
||||
"""Save to the OS keyring if possible (encrypted); else a 0600 file."""
|
||||
token = token.strip()
|
||||
if _keyring_store(token):
|
||||
try: # don't leave a plaintext copy once it's in the keyring
|
||||
TOKEN_FILE.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return
|
||||
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
TOKEN_FILE.write_text(token + "\n")
|
||||
try:
|
||||
TOKEN_FILE.chmod(0o600)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def clear_token() -> None:
|
||||
_keyring_clear()
|
||||
try:
|
||||
TOKEN_FILE.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def token_backend() -> str:
|
||||
"""Where the active token lives: 'env' | 'keyring' | 'file' | 'none'."""
|
||||
env = os.environ.get("RIGDOCTOR_TOKEN")
|
||||
if env and env.strip():
|
||||
return "env"
|
||||
if _keyring_lookup() is not None:
|
||||
return "keyring"
|
||||
if TOKEN_FILE.exists():
|
||||
return "file"
|
||||
return "none"
|
||||
|
||||
DEFAULTS: dict = {
|
||||
"interval": 1.0, # sampling interval in seconds (default ≤1 Hz — NFR)
|
||||
"log_max_bytes": 20_000_000, # rotate a log segment past this size
|
||||
|
||||
@@ -41,4 +41,8 @@ COMPONENTS: tuple[Component, ...] = (
|
||||
"libnotify", "Desktop notifications", "Monitoring",
|
||||
"Desktop alert notifications (M8)", ("libnotify-bin",), "notify-send",
|
||||
),
|
||||
Component(
|
||||
"libsecret", "Encrypted token storage", "Updates",
|
||||
"Store the update token in the OS keyring, encrypted (M13)", ("libsecret-tools",), "secret-tool",
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1,21 +1,32 @@
|
||||
"""Update check (M13, check half): ask the Gitea releases API for the latest version.
|
||||
"""Update check (M13): ask the Gitea releases API for the latest version.
|
||||
|
||||
Stdlib-only (urllib). Self-update isn't built yet; this only *detects* a newer
|
||||
release. Any failure (network, or the instance requiring sign-in for the API)
|
||||
returns None so callers can degrade gracefully.
|
||||
Stdlib-only (urllib). The Gitea instance requires sign-in, so updates are gated to
|
||||
account holders via a Personal Access Token (D18): set $RIGDOCTOR_TOKEN or save one
|
||||
with `rigdoctor login`. Self-update (apply) is built on top of this; this module
|
||||
handles detection and exposes a clear state for the UI.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
from .. import __version__
|
||||
from ..config import load_token
|
||||
|
||||
GITEA_BASE = "https://git.jesseyvanofferen.com"
|
||||
REPO = "jessey/rigdoctor"
|
||||
LATEST_API = f"{GITEA_BASE}/api/v1/repos/{REPO}/releases/latest"
|
||||
RELEASES_PAGE = f"{GITEA_BASE}/{REPO}/releases"
|
||||
TOKEN_PAGE = f"{GITEA_BASE}/user/settings/applications"
|
||||
|
||||
# Update states
|
||||
NO_TOKEN = "no-token"
|
||||
AUTH = "auth"
|
||||
NETWORK = "network"
|
||||
UP_TO_DATE = "up-to-date"
|
||||
AVAILABLE = "available"
|
||||
|
||||
|
||||
def _parse(version: str) -> tuple[int, ...]:
|
||||
@@ -29,13 +40,36 @@ def is_newer(latest: str, current: str = __version__) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def check_latest(timeout: float = 4.0) -> str | None:
|
||||
"""Return the latest release tag (e.g. 'v0.0.5'), or None if it can't be determined."""
|
||||
def fetch_latest(timeout: float = 5.0) -> tuple[str | None, str | None]:
|
||||
"""Return (tag, error). error is one of NO_TOKEN / AUTH / NETWORK, or None on success."""
|
||||
token = load_token()
|
||||
if not token:
|
||||
return (None, NO_TOKEN)
|
||||
req = urllib.request.Request(
|
||||
LATEST_API,
|
||||
headers={"Accept": "application/json", "Authorization": f"token {token}"},
|
||||
)
|
||||
try:
|
||||
req = urllib.request.Request(LATEST_API, headers={"Accept": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 (https only)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 (https)
|
||||
data = json.load(resp)
|
||||
tag = data.get("tag_name")
|
||||
return tag or None
|
||||
return (data.get("tag_name") or None, None)
|
||||
except urllib.error.HTTPError as exc:
|
||||
return (None, AUTH if exc.code in (401, 403) else NETWORK)
|
||||
except Exception:
|
||||
return None
|
||||
return (None, NETWORK)
|
||||
|
||||
|
||||
def check_latest(timeout: float = 5.0) -> str | None:
|
||||
"""Convenience: latest tag or None (ignores error reason)."""
|
||||
tag, _ = fetch_latest(timeout)
|
||||
return tag
|
||||
|
||||
|
||||
def update_state(timeout: float = 5.0) -> tuple[str, str | None]:
|
||||
"""Return (state, tag). state in NO_TOKEN/AUTH/NETWORK/UP_TO_DATE/AVAILABLE."""
|
||||
tag, error = fetch_latest(timeout)
|
||||
if error:
|
||||
return (error, None)
|
||||
if tag and is_newer(tag):
|
||||
return (AVAILABLE, tag)
|
||||
return (UP_TO_DATE, tag)
|
||||
|
||||
@@ -123,17 +123,22 @@ class MainWindow(QMainWindow):
|
||||
return bar
|
||||
|
||||
def _check_updates(self) -> None:
|
||||
self._update_checked.emit(updates.check_latest())
|
||||
self._update_checked.emit(updates.update_state())
|
||||
|
||||
def _show_update_state(self, latest) -> None:
|
||||
if not latest:
|
||||
def _show_update_state(self, result) -> None:
|
||||
state, tag = result
|
||||
self._update_btn.setVisible(False)
|
||||
if state == updates.NO_TOKEN:
|
||||
self._update_label.setText("connect to update server")
|
||||
elif state == updates.AUTH:
|
||||
self._update_label.setText("update access denied")
|
||||
elif state == updates.NETWORK:
|
||||
self._update_label.setText("update check unavailable")
|
||||
return
|
||||
if updates.is_newer(latest, __version__):
|
||||
self._update_label.setText(f'<span style="color:{GOOD};">{latest} available</span>')
|
||||
self._update_btn.setText(f"Update to {latest}")
|
||||
elif state == updates.AVAILABLE:
|
||||
self._update_label.setText(f'<span style="color:{GOOD};">{tag} available</span>')
|
||||
self._update_btn.setText(f"Update to {tag}")
|
||||
self._update_btn.setVisible(True)
|
||||
else:
|
||||
else: # UP_TO_DATE
|
||||
self._update_label.setText("up-to-date")
|
||||
|
||||
def _placeholder_page(self, title: str, description: str) -> QWidget:
|
||||
|
||||
@@ -4,11 +4,13 @@ from __future__ import annotations
|
||||
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtCore import Qt, QUrl, Signal
|
||||
from PySide6.QtGui import QDesktopServices
|
||||
from PySide6.QtWidgets import (
|
||||
QFrame,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QLineEdit,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QTextEdit,
|
||||
@@ -16,8 +18,9 @@ from PySide6.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from ..core import installer, sysenv
|
||||
from .theme import GOOD, MUTED
|
||||
from .. import config
|
||||
from ..core import installer, sysenv, updates
|
||||
from .theme import GOOD, MUTED, WARN
|
||||
|
||||
|
||||
def _panel(title: str) -> tuple[QFrame, QVBoxLayout]:
|
||||
@@ -33,13 +36,23 @@ def _panel(title: str) -> tuple[QFrame, QVBoxLayout]:
|
||||
return frame, layout
|
||||
|
||||
|
||||
_BACKEND_DESC = {
|
||||
"env": "token from $RIGDOCTOR_TOKEN",
|
||||
"keyring": "token stored in the OS keyring (encrypted)",
|
||||
"file": "token stored in a 0600 file — install libsecret-tools to encrypt it",
|
||||
"none": "no token saved",
|
||||
}
|
||||
|
||||
|
||||
class SetupPage(QWidget):
|
||||
_installed = Signal(int, str)
|
||||
_upd_state = Signal(object)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setObjectName("Page")
|
||||
self._installed.connect(self._on_installed)
|
||||
self._upd_state.connect(self._on_upd_state)
|
||||
|
||||
root = QVBoxLayout(self)
|
||||
root.setContentsMargins(20, 18, 20, 18)
|
||||
@@ -71,6 +84,27 @@ class SetupPage(QWidget):
|
||||
comp_layout.addLayout(controls)
|
||||
root.addWidget(comp_card)
|
||||
|
||||
# Update access (M13): token gating updates to Gitea account holders.
|
||||
upd_card, upd_layout = _panel("Update access")
|
||||
self._upd_status = QLabel("")
|
||||
self._upd_status.setObjectName("Muted")
|
||||
self._upd_status.setWordWrap(True)
|
||||
upd_layout.addWidget(self._upd_status)
|
||||
token_row = QHBoxLayout()
|
||||
self._token_input = QLineEdit()
|
||||
self._token_input.setEchoMode(QLineEdit.EchoMode.Password)
|
||||
self._token_input.setPlaceholderText("Paste a Gitea token (scope: read:repository)")
|
||||
save_btn = QPushButton("Save token")
|
||||
save_btn.setObjectName("PrimaryButton")
|
||||
save_btn.clicked.connect(self._save_token)
|
||||
get_btn = QPushButton("Get a token")
|
||||
get_btn.clicked.connect(lambda: QDesktopServices.openUrl(QUrl(updates.TOKEN_PAGE)))
|
||||
token_row.addWidget(self._token_input, 1)
|
||||
token_row.addWidget(save_btn)
|
||||
token_row.addWidget(get_btn)
|
||||
upd_layout.addLayout(token_row)
|
||||
root.addWidget(upd_card)
|
||||
|
||||
self._output = QTextEdit()
|
||||
self._output.setObjectName("Report")
|
||||
self._output.setReadOnly(True)
|
||||
@@ -80,6 +114,7 @@ class SetupPage(QWidget):
|
||||
root.addStretch(1)
|
||||
|
||||
self._refresh()
|
||||
self._refresh_update_status()
|
||||
|
||||
def _refresh(self) -> None:
|
||||
self._env.setText(
|
||||
@@ -126,3 +161,40 @@ class SetupPage(QWidget):
|
||||
self._output.setPlainText(out[-4000:])
|
||||
self._install_btn.setText("Install missing")
|
||||
self._refresh()
|
||||
# If libsecret-tools was just installed, move a file token into the keyring.
|
||||
if config.token_backend() == "file" and config.keyring_available():
|
||||
token = config.load_token()
|
||||
if token:
|
||||
config.save_token(token)
|
||||
self._refresh_update_status()
|
||||
|
||||
# --- update access (token) ------------------------------------------------
|
||||
def _save_token(self) -> None:
|
||||
token = self._token_input.text().strip()
|
||||
if not token:
|
||||
return
|
||||
config.save_token(token)
|
||||
self._token_input.clear()
|
||||
self._refresh_update_status()
|
||||
|
||||
def _refresh_update_status(self) -> None:
|
||||
self._upd_status.setText(f"{_BACKEND_DESC[config.token_backend()]} · checking…")
|
||||
threading.Thread(target=self._check_update, daemon=True).start()
|
||||
|
||||
def _check_update(self) -> None:
|
||||
self._upd_state.emit((config.token_backend(), updates.update_state()))
|
||||
|
||||
def _on_upd_state(self, result) -> None:
|
||||
backend, (state, tag) = result
|
||||
msg = {
|
||||
updates.NO_TOKEN: "paste a token below to enable updates",
|
||||
updates.AUTH: "token rejected — check its scope/permissions",
|
||||
updates.NETWORK: "couldn't reach the update server",
|
||||
updates.UP_TO_DATE: f"up to date ({tag})" if tag else "up to date",
|
||||
updates.AVAILABLE: f"update available: {tag}",
|
||||
}[state]
|
||||
color = GOOD if state == updates.AVAILABLE else (WARN if state == updates.AUTH else MUTED)
|
||||
self._upd_status.setText(
|
||||
f"<span style='color:{MUTED}'>{_BACKEND_DESC[backend]}</span> · "
|
||||
f"<span style='color:{color}'>{msg}</span>"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user