Initial import: CephDeploy — PyQt6 GUI для развёртывания Ceph-кластера

Десктопное приложение на PyQt6 + SQLAlchemy для автоматизации установки и управления Ceph-кластерами через Ansible и cephadm. Страницы: - Кластеры — CRUD профилей, список серверов - Сканер сети — TCP+SSH поиск хостов по CIDR, добавление в кластер - Развёртывание — precheck, генерация inventory/playbook, запуск ansible-playbook через QProcess, кнопка очистки с автопредложением после неудачного развёртывания - Состояние — живой дашборд ceph -s / ceph df / ceph osd tree через cephadm shell по SSH - OSD — назначение дисков, диалог добавления с lsblk-опросом и фильтром по состоянию (чистый / с данными / смонтирован) - Журнал — история запусков, просмотр и скачивание лога - Отчёт — HTML-экспорт конфигурации через Jinja2 - Настройки — QFormLayout для AppConfig Стек: Python 3.13, PyQt6, SQLAlchemy 2.x, paramiko, Jinja2, ansible-core. Целевая платформа: ALT Linux (apt-rpm) и Debian/Ubuntu. Test-env: docker-compose стенд из 3 systemd-контейнеров с podman + cephadm + chrony для локального тестирования развёртывания.
parents
# Python
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.mypy_cache/
.pytest_cache/
.ruff_cache/
# Виртуальные окружения
.venv/
venv/
env/
# PyInstaller
build/
dist/
*.spec.bak
# Локальная БД CephDeploy — содержит пользовательские кластеры,
# и должна оставаться приватной
cephdeploy.db
cephdeploy.db-wal
cephdeploy.db-shm
cephdeploy.db-journal
# Локальные конфиги
.env
.env.local
# Временные каталоги развёртывания (если вдруг попали в проект)
cephdeploy_*/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
#!/usr/bin/env bash
# Скрипт сборки CephDeploy
# Использование: ./build.sh [--rpm]
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR"
VERSION="0.1.0"
APP_NAME="cephdeploy"
DIST_DIR="$SCRIPT_DIR/dist"
BUILD_DIR="$SCRIPT_DIR/build"
echo "=== CephDeploy build v${VERSION} ==="
# ── 1. Сборка PyInstaller-бандла ──────────────────────────────────────────
echo "→ Запуск PyInstaller..."
pyinstaller --clean --noconfirm cephdeploy.spec
BUNDLE_DIR="$DIST_DIR/$APP_NAME"
echo "→ Бандл собран: $BUNDLE_DIR"
# ── 2. Создание портативного архива ───────────────────────────────────────
ARCHIVE="$DIST_DIR/${APP_NAME}-${VERSION}-linux-x86_64.tar.gz"
tar -czf "$ARCHIVE" -C "$DIST_DIR" "$APP_NAME"
echo "→ Архив: $ARCHIVE"
# ── 3. Обёртка-запускатор (launcher script) ───────────────────────────────
LAUNCHER="$DIST_DIR/${APP_NAME}-${VERSION}-linux-x86_64.run"
cat > "$LAUNCHER" <<'LAUNCHER_EOF'
#!/usr/bin/env bash
# Самораспаковывающийся запускатор CephDeploy
TMPDIR=$(mktemp -d /tmp/cephdeploy_XXXXXX)
trap 'rm -rf "$TMPDIR"' EXIT
SKIP=$(awk '/^__ARCHIVE__$/{print NR+1; exit}' "$0")
tail -n +${SKIP} "$0" | tar -xz -C "$TMPDIR"
DISPLAY=${DISPLAY:-:0} "$TMPDIR/cephdeploy/cephdeploy" "$@"
exit 0
__ARCHIVE__
LAUNCHER_EOF
cat "$ARCHIVE" >> "$LAUNCHER"
chmod +x "$LAUNCHER"
echo "→ Self-run: $LAUNCHER"
# ── 4. RPM-пакет (если передан флаг --rpm) ────────────────────────────────
if [[ "${1:-}" == "--rpm" ]]; then
echo "→ Сборка RPM..."
RPM_BUILD="$SCRIPT_DIR/rpmbuild"
mkdir -p "$RPM_BUILD"/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
cp "$ARCHIVE" "$RPM_BUILD/SOURCES/"
cat > "$RPM_BUILD/SPECS/${APP_NAME}.spec" <<SPEC_EOF
Name: ${APP_NAME}
Version: ${VERSION}
Release: 1%{?dist}
Summary: Приложение для развёртывания Ceph-кластера
License: MIT
Source0: ${APP_NAME}-${VERSION}-linux-x86_64.tar.gz
Requires: ansible-core >= 2.12, openssh-clients
%description
CephDeploy — десктопное приложение для автоматизированной
установки и мониторинга кластера Ceph Reef.
Разработано в ООО Этерсофт.
%prep
%setup -q -n ${APP_NAME}
%install
install -d %{buildroot}/opt/${APP_NAME}
cp -a . %{buildroot}/opt/${APP_NAME}/
install -d %{buildroot}%{_bindir}
cat > %{buildroot}%{_bindir}/${APP_NAME} <<'EOF'
#!/bin/sh
exec /opt/${APP_NAME}/${APP_NAME} "\$@"
EOF
chmod 755 %{buildroot}%{_bindir}/${APP_NAME}
install -d %{buildroot}%{_datadir}/applications
cat > %{buildroot}%{_datadir}/applications/${APP_NAME}.desktop <<'EOF'
[Desktop Entry]
Name=CephDeploy
Comment=Управление кластером Ceph
Exec=/opt/${APP_NAME}/${APP_NAME}
Icon=${APP_NAME}
Terminal=false
Type=Application
Categories=System;Network;
EOF
%files
/opt/${APP_NAME}/
%{_bindir}/${APP_NAME}
%{_datadir}/applications/${APP_NAME}.desktop
%changelog
* $(date '+%a %b %d %Y') CephDeploy Build <build@etersoft.ru> - ${VERSION}-1
- Первая сборка
SPEC_EOF
rpmbuild --define "_topdir $RPM_BUILD" -bb "$RPM_BUILD/SPECS/${APP_NAME}.spec"
RPM_FILE=$(find "$RPM_BUILD/RPMS" -name "*.rpm" | head -1)
cp "$RPM_FILE" "$DIST_DIR/"
echo "→ RPM: $DIST_DIR/$(basename "$RPM_FILE")"
fi
echo ""
echo "=== Готово ==="
echo "Портативный архив: $ARCHIVE"
echo "Self-run launcher: $LAUNCHER"
[[ "${1:-}" == "--rpm" ]] && echo "RPM-пакет: $DIST_DIR/${APP_NAME}-${VERSION}-*.rpm"
echo ""
echo "Запуск портативной версии:"
echo " tar -xzf ${APP_NAME}-${VERSION}-linux-x86_64.tar.gz && ./${APP_NAME}/${APP_NAME}"
# -*- mode: python ; coding: utf-8 -*-
"""
PyInstaller spec для CephDeploy.
Сборка: pyinstaller cephdeploy.spec
"""
import sys
from pathlib import Path
ROOT = Path(SPECPATH)
a = Analysis(
[str(ROOT / 'main.py')],
pathex=[str(ROOT)],
binaries=[],
datas=[
(str(ROOT / 'templates'), 'templates'),
],
hiddenimports=[
# SQLAlchemy диалект SQLite
'sqlalchemy.dialects.sqlite',
'sqlalchemy.dialects.sqlite.pysqlite',
# PyQt6
'PyQt6.QtCore',
'PyQt6.QtGui',
'PyQt6.QtWidgets',
'PyQt6.sip',
# Jinja2
'jinja2.ext',
# Paramiko
'paramiko',
'paramiko.transport',
'paramiko.auth_handler',
'paramiko.ecdsakey',
'paramiko.ed25519key',
# Cryptography (paramiko dep)
'cryptography',
'cryptography.hazmat.primitives.asymmetric.ed25519',
# Стандартная библиотека
'ipaddress',
'concurrent.futures',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'tkinter', 'unittest', 'xmlrpc', 'pydoc',
],
noarchive=False,
optimize=2,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='cephdeploy',
debug=False,
bootloader_ignore_signals=False,
strip=True,
upx=False,
console=False, # без терминала
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None,
)
coll = COLLECT(
exe,
a.binaries,
a.datas,
strip=True,
upx=False,
upx_exclude=[],
name='cephdeploy',
)
"""
Настройки приложения — хранение в JSON, доступ через AppConfig.
"""
from __future__ import annotations
import json
from pathlib import Path
_CONFIG_PATH = Path.home() / ".config" / "cephdeploy" / "settings.json"
_DEFAULTS: dict = {
"ssh_user": "amegami",
"ssh_key_path": "~/.ssh/id_ed25519",
"scan_tcp_timeout": 2,
"scan_ssh_timeout": 8,
"ansible_bin": "ansible-playbook",
"status_refresh_interval": 30,
}
class AppConfig:
_data: dict = {}
_loaded: bool = False
@classmethod
def _ensure(cls) -> None:
if not cls._loaded:
cls.load()
@classmethod
def load(cls) -> None:
raw: dict = {}
if _CONFIG_PATH.exists():
try:
raw = json.loads(_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
raw = {}
cls._data = {**_DEFAULTS, **raw}
cls._loaded = True
@classmethod
def get(cls, key: str):
cls._ensure()
return cls._data.get(key, _DEFAULTS.get(key))
@classmethod
def set_value(cls, key: str, value) -> None:
cls._ensure()
cls._data[key] = value
@classmethod
def save(cls) -> None:
cls._ensure()
_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
_CONFIG_PATH.write_text(
json.dumps(cls._data, indent=2, ensure_ascii=False),
encoding="utf-8",
)
@classmethod
def all(cls) -> dict:
cls._ensure()
return dict(cls._data)
"""
Сканер сети для поиска потенциальных серверов Ceph.
Алгоритм:
1. Для каждого IP из подсети: TCP-подключение к порту 22 (2 сек таймаут)
2. Если порт открыт: резолв hostname через reverse DNS
3. Если включена проверка авторизации: SSH-подключение paramiko,
сбор информации об ОС, CPU, RAM и дисках
"""
from __future__ import annotations
import ipaddress
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
import paramiko
from PyQt6.QtCore import QThread, pyqtSignal
_TCP_TIMEOUT = 2.0 # таймаут TCP-проверки порта 22
_SSH_TIMEOUT = 8.0 # таймаут SSH-сессии
_MAX_WORKERS = 64 # параллельных проверок
# ---------------------------------------------------------------------------
# Результат проверки хоста
# ---------------------------------------------------------------------------
@dataclass
class HostInfo:
ip: str
hostname: str = ""
ssh_open: bool = False
ssh_auth_ok: bool = False
os_name: str = ""
cpu_count: int = 0
ram_mb: int = 0
disks: list[dict] = field(default_factory=list) # [{name, size, rota, type}]
error: str = ""
def display_hostname(self) -> str:
return self.hostname if self.hostname and self.hostname != self.ip else self.ip
def to_dict(self) -> dict:
return {
"ip": self.ip,
"hostname": self.hostname,
"ssh_open": self.ssh_open,
"ssh_auth_ok": self.ssh_auth_ok,
"os_name": self.os_name,
"cpu_count": self.cpu_count,
"ram_mb": self.ram_mb,
"disks": self.disks,
"error": self.error,
}
# ---------------------------------------------------------------------------
# Низкоуровневые проверки (запускаются в потоке пула)
# ---------------------------------------------------------------------------
def _tcp_check(ip: str, port: int = 22, timeout: float = _TCP_TIMEOUT) -> bool:
try:
with socket.create_connection((ip, port), timeout=timeout):
return True
except OSError:
return False
def _resolve_hostname(ip: str) -> str:
try:
name, _, _ = socket.gethostbyaddr(ip)
return name
except socket.herror:
return ip
def _ssh_gather(
info: HostInfo,
ssh_user: str,
ssh_key_path: str,
) -> None:
"""Подключается по SSH и собирает информацию о хосте. Изменяет info на месте."""
key_path = Path(ssh_key_path).expanduser()
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
info.ip,
username=ssh_user,
key_filename=str(key_path),
timeout=_SSH_TIMEOUT,
banner_timeout=_SSH_TIMEOUT,
auth_timeout=_SSH_TIMEOUT,
look_for_keys=False,
allow_agent=False,
)
info.ssh_auth_ok = True
def run(cmd: str) -> str:
_, stdout, stderr = client.exec_command(cmd, timeout=10)
return stdout.read().decode(errors="replace").strip()
# Hostname — fallback, если обратный DNS не сработал
if not info.hostname or info.hostname == info.ip:
hn = run("hostname -f 2>/dev/null || hostname")
if hn:
info.hostname = hn
# ОС
os_raw = run(
"grep -oP '(?<=^PRETTY_NAME=\")[^\"]+' /etc/os-release 2>/dev/null"
" || cat /etc/issue 2>/dev/null | head -1"
)
info.os_name = os_raw or "Linux"
# CPU
cpu_raw = run("nproc --all 2>/dev/null || grep -c ^processor /proc/cpuinfo")
try:
info.cpu_count = int(cpu_raw)
except ValueError:
pass
# RAM (МБ)
mem_raw = run("awk '/MemTotal/{print $2}' /proc/meminfo")
try:
info.ram_mb = int(mem_raw) // 1024
except ValueError:
pass
# Диски (только физические, без разделов)
disk_raw = run(
"lsblk -d -n -o NAME,SIZE,ROTA,TYPE 2>/dev/null"
)
for line in disk_raw.splitlines():
parts = line.split()
if len(parts) >= 4 and parts[3] == "disk":
info.disks.append(
{
"name": f"/dev/{parts[0]}",
"size": parts[1],
"rota": parts[2] == "1", # True = HDD, False = SSD
"type": "hdd" if parts[2] == "1" else "ssd",
}
)
except paramiko.AuthenticationException:
info.error = "Ошибка авторизации SSH"
except paramiko.SSHException as exc:
info.error = f"SSH: {exc}"
except OSError as exc:
info.error = str(exc)
finally:
client.close()
def _scan_host(
ip: str,
ssh_user: str,
ssh_key_path: str,
check_auth: bool,
) -> HostInfo:
info = HostInfo(ip=ip)
if not _tcp_check(ip):
return info
info.ssh_open = True
info.hostname = _resolve_hostname(ip)
if check_auth:
_ssh_gather(info, ssh_user, ssh_key_path)
return info
# ---------------------------------------------------------------------------
# QThread-обёртка
# ---------------------------------------------------------------------------
class ScanWorker(QThread):
"""
Сканирует подсеть в фоновом потоке.
Сигналы:
host_found(HostInfo) — найден доступный хост (ssh_open=True)
progress(current, total) — прогресс сканирования
finished_scan(int) — сканирование завершено, кол-во найденных хостов
error(str) — критическая ошибка (неверный CIDR и т.п.)
"""
host_found = pyqtSignal(object) # HostInfo
progress = pyqtSignal(int, int) # (scanned, total)
finished_scan = pyqtSignal(int) # кол-во найденных
error = pyqtSignal(str)
def __init__(
self,
subnet: str,
ssh_user: str = "amegami",
ssh_key_path: str = "~/.ssh/id_ed25519",
check_auth: bool = True,
parent=None,
) -> None:
super().__init__(parent)
self.subnet = subnet
self.ssh_user = ssh_user
self.ssh_key_path = ssh_key_path
self.check_auth = check_auth
self._cancel = False
def cancel(self) -> None:
self._cancel = True
def run(self) -> None:
try:
network = ipaddress.ip_network(self.subnet, strict=False)
except ValueError as exc:
self.error.emit(f"Неверный CIDR: {exc}")
return
hosts = list(network.hosts())
total = len(hosts)
found = 0
scanned = 0
with ThreadPoolExecutor(max_workers=min(_MAX_WORKERS, total)) as pool:
futures = {
pool.submit(
_scan_host,
str(ip),
self.ssh_user,
self.ssh_key_path,
self.check_auth,
): str(ip)
for ip in hosts
}
for future in as_completed(futures):
if self._cancel:
pool.shutdown(wait=False, cancel_futures=True)
break
scanned += 1
self.progress.emit(scanned, total)
try:
info: HostInfo = future.result()
except Exception as exc:
continue
if info.ssh_open:
found += 1
self.host_found.emit(info)
self.finished_scan.emit(found)
"""
Разрешение путей к ресурсам — работает как из исходников, так и из PyInstaller-бандла.
"""
from __future__ import annotations
import sys
from pathlib import Path
def get_templates_dir() -> Path:
"""Возвращает путь к каталогу templates."""
if hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS) / "templates"
return Path(__file__).resolve().parent.parent / "templates"
def get_db_path() -> Path:
"""
Путь к SQLite-базе данных.
В бандле и при установке — ~/.local/share/cephdeploy/cephdeploy.db
В режиме разработки — рядом с проектом.
"""
if hasattr(sys, "_MEIPASS"):
data_dir = Path.home() / ".local" / "share" / "cephdeploy"
else:
# Режим разработки: корень проекта
data_dir = Path(__file__).resolve().parent.parent
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir / "cephdeploy.db"
"""
Инициализация базы данных: движок SQLAlchemy и фабрика сессий.
"""
from __future__ import annotations
from pathlib import Path
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
from core.resources import get_db_path
from db.models import Base
_DB_PATH = get_db_path()
engine = create_engine(
f"sqlite:///{_DB_PATH}",
connect_args={"check_same_thread": False},
echo=False,
)
# Включаем WAL-режим и foreign keys для SQLite
@event.listens_for(engine, "connect")
def _sqlite_pragmas(dbapi_conn, _):
cur = dbapi_conn.cursor()
cur.execute("PRAGMA journal_mode=WAL")
cur.execute("PRAGMA foreign_keys=ON")
cur.close()
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
def init_db() -> None:
"""Создаёт все таблицы, если их ещё нет."""
Base.metadata.create_all(bind=engine)
"""
ORM-модели SQLAlchemy для CephDeploy.
"""
from __future__ import annotations
import enum
from datetime import datetime
from sqlalchemy import (
BigInteger,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class ServerRole(str, enum.Enum):
MON = "mon"
MGR = "mgr"
OSD = "osd"
MDS = "mds"
RGW = "rgw"
ALL = "all" # универсальный узел
class DeviceType(str, enum.Enum):
HDD = "hdd"
SSD = "ssd"
NVME = "nvme"
class OSDRole(str, enum.Enum):
DATA = "data"
WAL = "wal"
DB = "db"
class DeployStatus(str, enum.Enum):
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
# ---------------------------------------------------------------------------
class Cluster(Base):
__tablename__ = "clusters"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(128), nullable=False, unique=True)
ceph_version: Mapped[str] = mapped_column(String(32), default="reef")
description: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now, nullable=False
)
servers: Mapped[list[Server]] = relationship(
"Server", back_populates="cluster", cascade="all, delete-orphan"
)
deployment_runs: Mapped[list[DeploymentRun]] = relationship(
"DeploymentRun", back_populates="cluster", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Cluster id={self.id} name={self.name!r}>"
class Server(Base):
__tablename__ = "servers"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
cluster_id: Mapped[int] = mapped_column(
Integer, ForeignKey("clusters.id", ondelete="CASCADE"), nullable=False
)
hostname: Mapped[str] = mapped_column(String(253), nullable=False)
ip_address: Mapped[str] = mapped_column(String(45), nullable=False)
role: Mapped[ServerRole] = mapped_column(
Enum(ServerRole), default=ServerRole.OSD, nullable=False
)
ssh_user: Mapped[str] = mapped_column(String(64), default="amegami")
ssh_key_path: Mapped[str] = mapped_column(
String(512), default="~/.ssh/id_ed25519"
)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now, nullable=False
)
cluster: Mapped[Cluster] = relationship("Cluster", back_populates="servers")
osd_devices: Mapped[list[OSDDevice]] = relationship(
"OSDDevice", back_populates="server", cascade="all, delete-orphan"
)
network_interfaces: Mapped[list[NetworkInterface]] = relationship(
"NetworkInterface", back_populates="server", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Server id={self.id} hostname={self.hostname!r} role={self.role}>"
class OSDDevice(Base):
__tablename__ = "osd_devices"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
server_id: Mapped[int] = mapped_column(
Integer, ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
)
device_path: Mapped[str] = mapped_column(String(256), nullable=False)
device_type: Mapped[DeviceType] = mapped_column(
Enum(DeviceType), default=DeviceType.HDD, nullable=False
)
osd_role: Mapped[OSDRole] = mapped_column(
Enum(OSDRole), default=OSDRole.DATA, nullable=False
)
server: Mapped[Server] = relationship("Server", back_populates="osd_devices")
def __repr__(self) -> str:
return (
f"<OSDDevice id={self.id} path={self.device_path!r} "
f"type={self.device_type} role={self.osd_role}>"
)
class NetworkInterface(Base):
__tablename__ = "network_interfaces"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
server_id: Mapped[int] = mapped_column(
Integer, ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
)
iface_name: Mapped[str] = mapped_column(String(64), nullable=False)
purpose: Mapped[str] = mapped_column(
String(32), default="cluster"
) # "cluster" | "public"
speed_gbit: Mapped[int | None] = mapped_column(Integer, nullable=True)
server: Mapped[Server] = relationship(
"Server", back_populates="network_interfaces"
)
def __repr__(self) -> str:
return f"<NetworkInterface id={self.id} iface={self.iface_name!r}>"
class DeploymentRun(Base):
__tablename__ = "deployment_runs"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
cluster_id: Mapped[int] = mapped_column(
Integer, ForeignKey("clusters.id", ondelete="CASCADE"), nullable=False
)
started_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now, nullable=False
)
finished_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
status: Mapped[DeployStatus] = mapped_column(
Enum(DeployStatus), default=DeployStatus.RUNNING, nullable=False
)
log_path: Mapped[str | None] = mapped_column(String(512), nullable=True)
cluster: Mapped[Cluster] = relationship(
"Cluster", back_populates="deployment_runs"
)
def __repr__(self) -> str:
return (
f"<DeploymentRun id={self.id} cluster_id={self.cluster_id} "
f"status={self.status}>"
)
"""
CRUD-операции для всех ORM-моделей CephDeploy.
Все методы принимают сессию явно — создание сессии на стороне вызывающего кода.
"""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from db.models import (
Cluster,
DeploymentRun,
DeployStatus,
NetworkInterface,
OSDDevice,
Server,
)
# ---------------------------------------------------------------------------
# Cluster
# ---------------------------------------------------------------------------
def create_cluster(
session: Session,
name: str,
ceph_version: str = "reef",
description: str | None = None,
) -> Cluster:
cluster = Cluster(name=name, ceph_version=ceph_version, description=description)
session.add(cluster)
session.flush()
return cluster
def get_cluster(session: Session, cluster_id: int) -> Optional[Cluster]:
return session.get(Cluster, cluster_id)
def get_cluster_by_name(session: Session, name: str) -> Optional[Cluster]:
return session.scalar(select(Cluster).where(Cluster.name == name))
def list_clusters(session: Session) -> list[Cluster]:
return list(session.scalars(select(Cluster).order_by(Cluster.created_at.desc())))
def update_cluster(
session: Session,
cluster_id: int,
*,
name: str | None = None,
ceph_version: str | None = None,
description: str | None = None,
) -> Optional[Cluster]:
cluster = get_cluster(session, cluster_id)
if cluster is None:
return None
if name is not None:
cluster.name = name
if ceph_version is not None:
cluster.ceph_version = ceph_version
if description is not None:
cluster.description = description
session.flush()
return cluster
def delete_cluster(session: Session, cluster_id: int) -> bool:
cluster = get_cluster(session, cluster_id)
if cluster is None:
return False
session.delete(cluster)
session.flush()
return True
# ---------------------------------------------------------------------------
# Server
# ---------------------------------------------------------------------------
def create_server(
session: Session,
cluster_id: int,
hostname: str,
ip_address: str,
role: str = "osd",
ssh_user: str = "amegami",
ssh_key_path: str = "~/.ssh/id_ed25519",
) -> Server:
server = Server(
cluster_id=cluster_id,
hostname=hostname,
ip_address=ip_address,
role=role,
ssh_user=ssh_user,
ssh_key_path=ssh_key_path,
)
session.add(server)
session.flush()
return server
def get_server(session: Session, server_id: int) -> Optional[Server]:
return session.get(Server, server_id)
def list_servers(session: Session, cluster_id: int) -> list[Server]:
return list(
session.scalars(
select(Server)
.where(Server.cluster_id == cluster_id)
.options(
selectinload(Server.osd_devices),
selectinload(Server.network_interfaces),
)
.order_by(Server.id)
)
)
def delete_server(session: Session, server_id: int) -> bool:
server = get_server(session, server_id)
if server is None:
return False
session.delete(server)
session.flush()
return True
# ---------------------------------------------------------------------------
# OSDDevice
# ---------------------------------------------------------------------------
def add_osd_device(
session: Session,
server_id: int,
device_path: str,
device_type: str = "hdd",
osd_role: str = "data",
) -> OSDDevice:
device = OSDDevice(
server_id=server_id,
device_path=device_path,
device_type=device_type,
osd_role=osd_role,
)
session.add(device)
session.flush()
return device
def list_osd_devices(session: Session, server_id: int) -> list[OSDDevice]:
return list(
session.scalars(
select(OSDDevice)
.where(OSDDevice.server_id == server_id)
.order_by(OSDDevice.id)
)
)
def delete_osd_device(session: Session, device_id: int) -> bool:
device = session.get(OSDDevice, device_id)
if device is None:
return False
session.delete(device)
session.flush()
return True
# ---------------------------------------------------------------------------
# NetworkInterface
# ---------------------------------------------------------------------------
def add_network_interface(
session: Session,
server_id: int,
iface_name: str,
purpose: str = "cluster",
speed_gbit: int | None = None,
) -> NetworkInterface:
iface = NetworkInterface(
server_id=server_id,
iface_name=iface_name,
purpose=purpose,
speed_gbit=speed_gbit,
)
session.add(iface)
session.flush()
return iface
def list_network_interfaces(
session: Session, server_id: int
) -> list[NetworkInterface]:
return list(
session.scalars(
select(NetworkInterface)
.where(NetworkInterface.server_id == server_id)
.order_by(NetworkInterface.id)
)
)
# ---------------------------------------------------------------------------
# DeploymentRun
# ---------------------------------------------------------------------------
def create_deployment_run(
session: Session,
cluster_id: int,
log_path: str | None = None,
) -> DeploymentRun:
run = DeploymentRun(
cluster_id=cluster_id,
status=DeployStatus.RUNNING,
log_path=log_path,
)
session.add(run)
session.flush()
return run
def finish_deployment_run(
session: Session,
run_id: int,
status: DeployStatus,
log_path: str | None = None,
) -> Optional[DeploymentRun]:
run = session.get(DeploymentRun, run_id)
if run is None:
return None
run.finished_at = datetime.now()
run.status = status
if log_path is not None:
run.log_path = log_path
session.flush()
return run
def list_deployment_runs(
session: Session, cluster_id: int, limit: int = 50
) -> list[DeploymentRun]:
return list(
session.scalars(
select(DeploymentRun)
.where(DeploymentRun.cluster_id == cluster_id)
.order_by(DeploymentRun.started_at.desc())
.limit(limit)
)
)
def get_last_deployment_run(
session: Session, cluster_id: int
) -> Optional[DeploymentRun]:
return session.scalar(
select(DeploymentRun)
.where(DeploymentRun.cluster_id == cluster_id)
.order_by(DeploymentRun.started_at.desc())
.limit(1)
)
"""
CephDeploy — точка входа приложения.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Добавляем корень проекта в sys.path, чтобы импорты работали
# независимо от текущего рабочего каталога
sys.path.insert(0, str(Path(__file__).resolve().parent))
from PyQt6.QtGui import QPalette, QColor
from PyQt6.QtWidgets import QApplication
from db import init_db
from ui.main_window import MainWindow
def _setup_dark_palette(app: QApplication) -> None:
"""Базовая тёмная палитра Qt (дополняется QSS в MainWindow)."""
palette = QPalette()
palette.setColor(QPalette.ColorRole.Window, QColor(26, 31, 41))
palette.setColor(QPalette.ColorRole.WindowText, QColor(192, 200, 216))
palette.setColor(QPalette.ColorRole.Base, QColor(22, 27, 34))
palette.setColor(QPalette.ColorRole.AlternateBase, QColor(30, 33, 40))
palette.setColor(QPalette.ColorRole.Text, QColor(192, 200, 216))
palette.setColor(QPalette.ColorRole.Button, QColor(30, 33, 40))
palette.setColor(QPalette.ColorRole.ButtonText, QColor(192, 200, 216))
palette.setColor(QPalette.ColorRole.Highlight, QColor(46, 74, 122))
palette.setColor(QPalette.ColorRole.HighlightedText, QColor(255, 255, 255))
app.setPalette(palette)
def main() -> None:
app = QApplication(sys.argv)
app.setApplicationName("CephDeploy")
app.setOrganizationName("Etersoft")
_setup_dark_palette(app)
# Инициализация БД (создаём таблицы при первом запуске)
try:
init_db()
except Exception as exc:
from PyQt6.QtWidgets import QMessageBox
QMessageBox.critical(
None,
"Ошибка базы данных",
f"Не удалось инициализировать SQLite:\n{exc}",
)
sys.exit(1)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
---
# Ansible-плейбук для развёртывания Ceph {{ cluster.version }} (cephadm)
# Кластер: {{ cluster.name }}
# Сгенерировано CephDeploy
- name: Подготовка узлов
hosts: all
become: true
gather_facts: true
tasks:
# ── Установка cephadm и зависимостей ─────────────────────────────
- name: Установить cephadm (Debian/Ubuntu)
ansible.builtin.apt:
name:
- cephadm
- python3-pip
- lvm2
state: present
update_cache: true
when: ansible_os_family == "Debian"
- name: Установить cephadm (ALT Linux)
community.general.apt_rpm:
package:
- cephadm
- python3-pip
- lvm2
state: present
update_cache: true
when: ansible_os_family == "Altlinux"
- name: Установить cephadm (RHEL/CentOS/Rocky)
ansible.builtin.dnf:
name:
- cephadm
- python3-pip
- lvm2
state: present
when: ansible_os_family == "RedHat"
# ── Проверка наличия chronyc и firewalld без зависимости от pkg-facts
- name: Проверить наличие chronyc
ansible.builtin.command: which chronyc
register: chronyc_check
failed_when: false
changed_when: false
- name: Проверить наличие firewalld
ansible.builtin.command: which firewalld
register: firewalld_check
failed_when: false
changed_when: false
- name: Синхронизировать время (chronyc makestep)
ansible.builtin.command: chronyc makestep
when: chronyc_check.rc == 0
register: chrony_result
failed_when: false
changed_when: chrony_result.rc == 0
- name: Отключить firewalld
ansible.builtin.service:
name: firewalld
state: stopped
enabled: false
when: firewalld_check.rc == 0
failed_when: false
- name: Bootstrap первого MON-узла
hosts: {{ bootstrap_host.hostname }}
become: true
tasks:
- name: Запустить cephadm bootstrap
ansible.builtin.command: >
cephadm bootstrap
--mon-ip {{ bootstrap_host.ip_address }}
--initial-dashboard-user admin
--initial-dashboard-password admin
--skip-monitoring-stack
--allow-overwrite
args:
creates: /etc/ceph/ceph.conf
register: bootstrap_result
- name: Вывод результата bootstrap
ansible.builtin.debug:
var: bootstrap_result.stdout_lines
when: bootstrap_result.stdout_lines is defined
- name: Прочитать публичный ключ cephadm-оркестратора
ansible.builtin.slurp:
src: /etc/ceph/ceph.pub
register: ceph_pub
# Cephadm-оркестратор SSH-ится на хосты как root со своим ключом
# /etc/ceph/ceph.pub. Чтобы `ceph orch host add` и развёртывание
# демонов работали, этот ключ должен быть в ~root/.ssh/authorized_keys
# на каждом узле, включая bootstrap-узел.
- name: Распространение публичного ключа cephadm на все узлы
hosts: all
become: true
tasks:
- name: Убедиться, что /root/.ssh существует
ansible.builtin.file:
path: /root/.ssh
state: directory
owner: root
group: root
mode: '0700'
{% raw %}
- name: Установить ceph.pub в authorized_keys root
ansible.builtin.lineinfile:
path: /root/.ssh/authorized_keys
line: "{{ (hostvars[groups['mons'][0]]['ceph_pub']['content'] | b64decode).strip() }}"
create: true
owner: root
group: root
mode: '0600'
{% endraw %}
- name: Добавление остальных узлов в кластер
hosts: {{ bootstrap_host.hostname }}
become: true
tasks:
- name: Подождать готовности оркестратора
ansible.builtin.command: ceph orch status
register: orch_status
retries: 10
delay: 10
until: "'available' in orch_status.stdout"
failed_when: false
{% for s in servers if s.hostname != bootstrap_host.hostname %}
{#
В БД CephDeploy `hostname` может быть равен IP (если reverse DNS не отработал).
cephadm-оркестратор же ожидает реальное имя машины и откажется добавлять
хост с жалобой «hostname "foo" does not match expected hostname "ip"».
Поэтому подставляем `ansible_hostname`, собранный в предыдущем play.
ansible_host — это IP, который использует SSH (из inventory).
#}
{% set hv = "{{ hostvars['" ~ s.hostname ~ "'].ansible_hostname | default('" ~ s.hostname ~ "') }}" %}
- name: Добавить хост {{ s.hostname }}
ansible.builtin.command: >
ceph orch host add {{ hv }} {{ s.ip_address }}
register: add_host_result
failed_when: false
changed_when: add_host_result.rc == 0
- name: Результат добавления {{ s.hostname }}
ansible.builtin.debug:
var: add_host_result.stdout
{% endfor %}
- name: Развёртывание OSD-дисков
hosts: {{ bootstrap_host.hostname }}
become: true
tasks:
{% for s in servers %}
{% set osd_hv = "{{ hostvars['" ~ s.hostname ~ "'].ansible_hostname | default('" ~ s.hostname ~ "') }}" %}
{% for osd in s.osds %}
- name: Добавить OSD {{ osd.path }} на {{ s.hostname }} ({{ osd.type }}/{{ osd.role }})
ansible.builtin.command: >
ceph orch daemon add osd {{ osd_hv }}:{{ osd.path }}
register: osd_result
failed_when: false
changed_when: "'Created osd' in (osd_result.stdout | default(''))"
{% endfor %}
{% endfor %}
- name: Статус кластера
ansible.builtin.command: ceph -s
register: ceph_status
failed_when: false
- name: Вывод статуса
ansible.builtin.debug:
var: ceph_status.stdout_lines
when: ceph_status.stdout_lines is defined
---
# Очистка cephadm-кластеров и конфигов на всех узлах.
# Используется CephDeploy после неудачного развёртывания или по кнопке «Очистить».
# Кластер: {{ cluster.name }}
{% raw %}
- name: Очистка cephadm-кластера
hosts: all
become: true
gather_facts: false
tasks:
- name: Проверить наличие cephadm
ansible.builtin.command: which cephadm
register: cephadm_check
failed_when: false
changed_when: false
- name: Получить fsid всех локальных cephadm-кластеров
ansible.builtin.shell: |
set -o pipefail
cephadm ls 2>/dev/null | grep -oP '(?<="fsid": ")[a-f0-9-]+' | sort -u
args:
executable: /bin/bash
register: fsids
when: cephadm_check.rc == 0
failed_when: false
changed_when: false
- name: Удалить каждый найденный кластер (rm-cluster)
ansible.builtin.command: >
cephadm rm-cluster --force --fsid {{ item }} --zap-osds
loop: "{{ fsids.stdout_lines | default([]) }}"
when:
- cephadm_check.rc == 0
- fsids.stdout_lines is defined
- fsids.stdout_lines | length > 0
failed_when: false
changed_when: true
- name: Остановить любые оставшиеся ceph-*@ юниты
ansible.builtin.shell: |
set -o pipefail
for u in $(systemctl list-units --all --no-legend 'ceph-*@*' 2>/dev/null | awk '{print $1}'); do
systemctl stop "$u" 2>/dev/null || true
systemctl disable "$u" 2>/dev/null || true
done
args:
executable: /bin/bash
changed_when: true
failed_when: false
- name: Удалить ceph-конфиги и данные
ansible.builtin.file:
path: "{{ item }}"
state: absent
loop:
- /etc/ceph
- /var/lib/ceph
- /var/log/ceph
failed_when: false
- name: Убрать оставшиеся podman/docker-контейнеры ceph-*
ansible.builtin.shell: |
if command -v podman >/dev/null 2>&1; then
for c in $(podman ps -aq --filter name=ceph- 2>/dev/null); do
podman rm -f "$c" >/dev/null 2>&1 || true
done
fi
if command -v docker >/dev/null 2>&1; then
for c in $(docker ps -aq --filter name=ceph- 2>/dev/null); do
docker rm -f "$c" >/dev/null 2>&1 || true
done
fi
args:
executable: /bin/bash
changed_when: true
failed_when: false
{% endraw %}
[all:vars]
ansible_python_interpreter=/usr/bin/python3
[all]
{% for s in servers %}
{{ s.hostname }} ansible_host={{ s.ip_address }} ansible_user={{ s.ssh_user }} ansible_ssh_private_key_file={{ s.ssh_key_path }}
{% endfor %}
[mons]
{% for s in servers if s.role in ('mon', 'all') %}
{{ s.hostname }}
{% endfor %}
[mgrs]
{% for s in servers if s.role in ('mgr', 'all') %}
{{ s.hostname }}
{% endfor %}
[osds]
{% for s in servers if s.role in ('osd', 'all') %}
{{ s.hostname }}
{% endfor %}
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="utf-8">
<title>Отчёт кластера {{ cluster.name }}</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: 'Segoe UI', Arial, sans-serif; font-size: 14px;
background: #f4f6f9; color: #1a2030; }
.page { max-width: 960px; margin: 40px auto; padding: 0 24px 60px; }
h1 { font-size: 26px; color: #1565c0; border-bottom: 3px solid #1565c0;
padding-bottom: 10px; margin-bottom: 8px; }
.meta { color: #607d8b; font-size: 12px; margin-bottom: 32px; }
h2 { font-size: 16px; color: #1565c0; margin: 28px 0 10px;
padding-left: 10px; border-left: 4px solid #1565c0; }
table { width: 100%; border-collapse: collapse; background: #fff;
border-radius: 6px; overflow: hidden;
box-shadow: 0 1px 4px rgba(0,0,0,.12); margin-bottom: 8px; }
th { background: #1565c0; color: #fff; padding: 9px 12px;
text-align: left; font-size: 12px; font-weight: 600; }
td { padding: 8px 12px; border-bottom: 1px solid #e8ecf0; font-size: 13px; }
tr:last-child td { border-bottom: none; }
tr:nth-child(even) td { background: #f9fafc; }
.badge { display: inline-block; border-radius: 3px; padding: 2px 8px;
font-size: 11px; font-weight: bold; }
.ok { background: #e8f5e9; color: #2e7d32; }
.err { background: #ffebee; color: #b71c1c; }
.warn{ background: #fff8e1; color: #e65100; }
.run { background: #e3f2fd; color: #1565c0; }
.hdd { background: #eceff1; color: #455a64; }
.ssd { background: #e8f5e9; color: #2e7d32; }
.nvme{ background: #e8eaf6; color: #3949ab; }
.footer { margin-top: 40px; font-size: 11px; color: #9e9e9e; text-align: center; }
</style>
</head>
<body>
<div class="page">
<h1>🐙 CephDeploy — Отчёт кластера</h1>
<p class="meta">
Кластер: <strong>{{ cluster.name }}</strong> &nbsp;|&nbsp;
Версия Ceph: <strong>{{ cluster.version }}</strong> &nbsp;|&nbsp;
Создан: {{ cluster.created_at }} &nbsp;|&nbsp;
Сформирован: {{ generated_at }}
</p>
<!-- Серверы -->
<h2>Серверы ({{ servers | length }})</h2>
{% if servers %}
<table>
<thead>
<tr><th>Hostname</th><th>IP-адрес</th><th>Роль</th>
<th>SSH-пользователь</th><th>OSD-дисков</th></tr>
</thead>
<tbody>
{% for s in servers %}
<tr>
<td>{{ s.hostname }}</td>
<td>{{ s.ip_address }}</td>
<td><span class="badge run">{{ s.role }}</span></td>
<td>{{ s.ssh_user }}</td>
<td>{{ s.osd_count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p style="color:#9e9e9e">Серверов нет.</p>
{% endif %}
<!-- OSD -->
<h2>OSD-устройства</h2>
{% set total_osds = namespace(n=0) %}
{% for s in servers %}{% set total_osds.n = total_osds.n + s.osd_count %}{% endfor %}
{% if total_osds.n > 0 %}
<table>
<thead>
<tr><th>Сервер</th><th>Устройство</th><th>Тип</th><th>Роль OSD</th></tr>
</thead>
<tbody>
{% for s in servers %}
{% for osd in s.osds %}
<tr>
<td>{{ s.hostname }}</td>
<td><code>{{ osd.path }}</code></td>
<td>
<span class="badge {{ osd.type }}">{{ osd.type | upper }}</span>
</td>
<td>{{ osd.role | upper }}</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
{% else %}
<p style="color:#9e9e9e">OSD-устройства не назначены.</p>
{% endif %}
<!-- История развёртывания -->
<h2>История развёртывания (последние {{ runs | length }})</h2>
{% if runs %}
<table>
<thead>
<tr><th>#</th><th>Начало</th><th>Завершение</th>
<th>Длительность</th><th>Статус</th></tr>
</thead>
<tbody>
{% for r in runs %}
<tr>
<td>{{ r.id }}</td>
<td>{{ r.started_at }}</td>
<td>{{ r.finished_at or '—' }}</td>
<td>{{ r.duration }}</td>
<td>
{% if r.status == 'success' %}<span class="badge ok">✔ Успех</span>
{% elif r.status == 'failed' %}<span class="badge err">✘ Ошибка</span>
{% elif r.status == 'cancelled' %}<span class="badge warn">⏹ Отменено</span>
{% else %}<span class="badge run">⟳ Выполняется</span>{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p style="color:#9e9e9e">Запусков развёртывания ещё не было.</p>
{% endif %}
<div class="footer">Сформировано CephDeploy v0.1.0 &nbsp;·&nbsp; ООО Этерсофт</div>
</div>
</body>
</html>
# Тестовый «сервер» для CephDeploy на systemd-образе.
# systemd работает как PID 1, что позволяет cephadm нормально
# поднимать юниты (chrony, ceph-*, podman и т.д.).
# syntax=docker/dockerfile:1
FROM jrei/systemd-debian:12
ENV DEBIAN_FRONTEND=noninteractive
# Удаляем юниты, которые бесполезны в контейнере и любят падать
RUN find /etc/systemd/system /lib/systemd/system -path '*.wants/*' \
\( -name '*systemd-network*' \
-o -name '*systemd-resolved*' \
-o -name '*systemd-udevd*' \
-o -name '*getty*' \) -delete 2>/dev/null || true
RUN apt-get update && apt-get install -y --no-install-recommends \
openssh-server \
sudo \
python3 \
python3-apt \
util-linux \
procps \
iproute2 \
ca-certificates \
curl \
chrony \
lvm2 \
podman \
catatonit \
cephadm \
&& rm -rf /var/lib/apt/lists/*
# Пользователь amegami с sudo без пароля
RUN useradd -m -s /bin/bash amegami \
&& echo "amegami ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/amegami \
&& chmod 440 /etc/sudoers.d/amegami
# SSH-ключ
RUN mkdir -p /home/amegami/.ssh && chmod 700 /home/amegami/.ssh
COPY authorized_keys /home/amegami/.ssh/authorized_keys
RUN chmod 600 /home/amegami/.ssh/authorized_keys \
&& chown -R amegami:amegami /home/amegami/.ssh \
&& ssh-keygen -A \
&& mkdir -p /run/sshd \
&& systemctl enable ssh.service
# Podman при запуске ceph-демонов пытается делать statfs /run/udev
# (даже если udev не запущен). Создаём директорию через tmpfiles.d,
# чтобы systemd восстанавливал её при каждом старте.
RUN echo 'd /run/udev 0755 root root -' > /etc/tmpfiles.d/cephdeploy.conf
# Loop-диски для имитации OSD — создаются через systemd-юнит
COPY setup-loop-disks.sh /usr/local/bin/setup-loop-disks.sh
RUN chmod +x /usr/local/bin/setup-loop-disks.sh
COPY setup-loop-disks.service /etc/systemd/system/setup-loop-disks.service
RUN systemctl enable setup-loop-disks.service
EXPOSE 22
# Запускаем systemd как PID 1
CMD ["/sbin/init"]
# -*- mode: ruby -*-
# Тестовый кластер Ceph: 1 MON + 2 OSD
# Требует: vagrant + vagrant-libvirt
# Установка плагина: vagrant plugin install vagrant-libvirt
#
# Запуск: vagrant up
# Стоп: vagrant halt
# Удаление: vagrant destroy -f
# SSH: vagrant ssh mon-node
NODES = [
{ name: "mon-node", ip: "192.168.122.11", ram: 2048, cpus: 2, osd_disks: 0 },
{ name: "osd-node1", ip: "192.168.122.12", ram: 3072, cpus: 2, osd_disks: 3 },
{ name: "osd-node2", ip: "192.168.122.13", ram: 3072, cpus: 2, osd_disks: 3 },
]
Vagrant.configure("2") do |config|
# ALT Linux 10 (или замените на доступный box)
config.vm.box = "generic/debian12" # заменить на altlinux-box если есть
config.vm.synced_folder ".", "/vagrant", disabled: true
NODES.each do |node|
config.vm.define node[:name] do |vm_config|
vm_config.vm.hostname = node[:name]
vm_config.vm.network "private_network", ip: node[:ip]
vm_config.vm.provider :libvirt do |libvirt|
libvirt.memory = node[:ram]
libvirt.cpus = node[:cpus]
libvirt.driver = "kvm"
# Дополнительные диски для OSD (по 5 ГБ каждый)
node[:osd_disks].times do |i|
libvirt.storage :file,
size: "5G",
type: "raw",
bus: "virtio",
device: "vd#{('b'.ord + i).chr}"
end
end
# Настройка SSH-доступа для amegami
vm_config.vm.provision "shell", inline: <<-SHELL
set -e
id amegami 2>/dev/null || useradd -m -s /bin/bash amegami
echo "amegami ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/amegami
mkdir -p /home/amegami/.ssh
echo "#{File.read(File.expand_path("~/.ssh/id_ed25519.pub")).strip}" \
>> /home/amegami/.ssh/authorized_keys
sort -u /home/amegami/.ssh/authorized_keys -o /home/amegami/.ssh/authorized_keys
chmod 700 /home/amegami/.ssh
chmod 600 /home/amegami/.ssh/authorized_keys
chown -R amegami:amegami /home/amegami/.ssh
SHELL
end
end
end
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIN39+VqA2hajA6KuES4Jlka7vg6l67k+av+Og130ftYh amegami@lin-test.office.etersoft.ru
version: "3.9"
# Тестовый кластер из 3 узлов для CephDeploy (Debian 12 + systemd).
# Запуск: docker compose up -d --build
# Стоп: docker compose down
# Добавить в сканер CephDeploy: подсеть 172.20.0.0/24, пользователь amegami.
networks:
ceph-test:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
services:
mon-node:
build:
context: .
network: host
hostname: mon-node
container_name: ceph-mon
networks:
ceph-test:
ipv4_address: 172.20.0.11
privileged: true
cgroup: host
tmpfs:
- /run
- /run/lock
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup:rw
- mon_data:/var/lib/ceph
- mon_disks:/var/lib/ceph-disks
stop_signal: SIGRTMIN+3
ports:
- "2201:22"
restart: unless-stopped
osd-node1:
build:
context: .
network: host
hostname: osd-node1
container_name: ceph-osd1
networks:
ceph-test:
ipv4_address: 172.20.0.12
privileged: true
cgroup: host
tmpfs:
- /run
- /run/lock
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup:rw
- osd1_data:/var/lib/ceph
- osd1_disks:/var/lib/ceph-disks
stop_signal: SIGRTMIN+3
ports:
- "2202:22"
restart: unless-stopped
osd-node2:
build:
context: .
network: host
hostname: osd-node2
container_name: ceph-osd2
networks:
ceph-test:
ipv4_address: 172.20.0.13
privileged: true
cgroup: host
tmpfs:
- /run
- /run/lock
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup:rw
- osd2_data:/var/lib/ceph
- osd2_disks:/var/lib/ceph-disks
stop_signal: SIGRTMIN+3
ports:
- "2203:22"
restart: unless-stopped
volumes:
mon_data:
osd1_data:
osd2_data:
mon_disks:
osd1_disks:
osd2_disks:
#!/usr/bin/env bash
# Управление тестовым кластером CephDeploy
#
# Использование:
# ./manage.sh start — запустить 3 тестовых узла
# ./manage.sh stop — остановить
# ./manage.sh status — проверить SSH-доступность
# ./manage.sh destroy — удалить контейнеры и тома
# ./manage.sh clean-keys — убрать старые ключи из known_hosts
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR"
NODES=(172.20.0.11 172.20.0.12 172.20.0.13)
SSH_KEY="${HOME}/.ssh/id_ed25519"
case "${1:-help}" in
start)
echo "→ Запуск тестового кластера..."
DOCKER_BUILDKIT=1 docker compose up -d
echo ""
echo "Ожидание SSH..."
sleep 3
for ip in "${NODES[@]}"; do
if ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 \
-i "$SSH_KEY" amegami@"$ip" "echo ok" &>/dev/null; then
echo " ✔ $ip доступен"
else
echo " ✘ $ip недоступен"
fi
done
echo ""
echo "В CephDeploy задайте подсеть: 172.20.0.0/24"
;;
stop)
docker compose stop
echo "Кластер остановлен."
;;
status)
echo "Статус контейнеров:"
docker compose ps
echo ""
echo "SSH-доступность:"
for ip in "${NODES[@]}"; do
name=$(ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 \
-i "$SSH_KEY" amegami@"$ip" "hostname" 2>/dev/null || echo "недоступен")
echo " $ip$name"
done
;;
destroy)
read -p "Удалить контейнеры и тома? [y/N] " ans
[[ "$ans" =~ ^[Yy]$ ]] || exit 0
docker compose down -v
echo "Тестовая среда удалена."
;;
clean-keys)
for ip in "${NODES[@]}"; do
ssh-keygen -R "$ip" 2>/dev/null && echo "Ключ $ip удалён из known_hosts"
done
;;
*)
echo "Использование: $0 {start|stop|status|destroy|clean-keys}"
;;
esac
[Unit]
Description=Create loop devices for Ceph OSD testing
After=local-fs.target
Before=ssh.service
[Service]
Type=oneshot
ExecStart=/usr/local/bin/setup-loop-disks.sh
# systemd в docker-контейнере иногда не запускает user-sessions
# сам, из-за чего /run/nologin остаётся и SSH-логин для amegami
# блокируется. Запускаем его принудительно.
ExecStartPost=-/bin/systemctl start systemd-user-sessions.service
RemainAfterExit=yes
[Install]
WantedBy=multi-user.target
#!/bin/bash
# Создаёт 3 loop-устройства из файлов в /var/lib/ceph-disks для имитации OSD.
# В docker-контейнере ядро не создаёт новые loop-nodes автоматически через
# LOOP_CTL_GET_FREE, поэтому нужные ноды заводятся mknod'ом вручную.
DISK_DIR="/var/lib/ceph-disks"
mkdir -p "$DISK_DIR"
# Заранее создаём loop20..loop22, если их ещё нет (major 7 — блочные loop-устройства)
for n in 20 21 22; do
[ -e "/dev/loop$n" ] || mknod "/dev/loop$n" b 7 "$n"
done
for i in 1 2 3; do
IMG="$DISK_DIR/disk${i}.img"
if [ ! -f "$IMG" ]; then
echo "Creating $IMG (512 MB)..."
dd if=/dev/zero of="$IMG" bs=1M count=512 status=none
fi
# Уже привязан?
if losetup -a | grep -q " ($IMG)"; then
echo "$IMG already attached: $(losetup -j "$IMG" | head -1)"
continue
fi
# Привязываем к нашему выделенному номеру loop$((19+i))
LOOP_DEV="/dev/loop$((19 + i))"
if losetup "$LOOP_DEV" "$IMG" 2>/dev/null; then
echo "Attached $IMG to $LOOP_DEV"
elif LOOP_DEV=$(losetup -f --show "$IMG" 2>/dev/null); then
echo "Attached $IMG to $LOOP_DEV (auto-assigned)"
else
echo "WARN: failed to attach $IMG" >&2
fi
done
exit 0
"""
Базовый класс страницы с заголовком и кнопкой обновления.
Все страницы приложения наследуются от BasePage:
- единый стиль шапки
- кнопка ⟳ вызывает метод refresh()
- подклассы кладут свой контент в self.content_layout
"""
from __future__ import annotations
from PyQt6.QtCore import Qt, pyqtSignal
from PyQt6.QtGui import QFont
from PyQt6.QtWidgets import (
QHBoxLayout,
QLabel,
QPushButton,
QSizePolicy,
QVBoxLayout,
QWidget,
)
class RefreshButton(QPushButton):
def __init__(self, parent=None):
super().__init__("⟳ Обновить", parent)
self.setFixedHeight(30)
self.setStyleSheet(
"""
QPushButton {
background: #2a3040;
color: #8fbcbb;
border: 1px solid #3a4050;
border-radius: 5px;
padding: 0 14px;
font-size: 13px;
}
QPushButton:hover {
background: #2e4a7a;
color: #ffffff;
}
QPushButton:pressed {
background: #1e3060;
}
QPushButton:disabled {
color: #444c5c;
border-color: #2a3040;
}
"""
)
class PageHeader(QWidget):
"""Полоса заголовка: иконка + название + необязательный subtitle + кнопка ⟳."""
refresh_clicked = pyqtSignal()
def __init__(
self,
title: str,
subtitle: str = "",
show_refresh: bool = True,
parent=None,
) -> None:
super().__init__(parent)
self.setFixedHeight(56)
self.setStyleSheet("background: #1e2330; border-bottom: 1px solid #2e3340;")
layout = QHBoxLayout(self)
layout.setContentsMargins(20, 0, 16, 0)
# Заголовок + подзаголовок
text_col = QVBoxLayout()
text_col.setSpacing(0)
lbl_title = QLabel(title)
font = QFont()
font.setPointSize(14)
font.setBold(True)
lbl_title.setFont(font)
lbl_title.setStyleSheet("color: #e0e8f8;")
text_col.addWidget(lbl_title)
if subtitle:
lbl_sub = QLabel(subtitle)
lbl_sub.setStyleSheet("color: #5a6478; font-size: 11px;")
text_col.addWidget(lbl_sub)
layout.addLayout(text_col)
layout.addStretch()
if show_refresh:
self._btn_refresh = RefreshButton()
self._btn_refresh.clicked.connect(self.refresh_clicked)
layout.addWidget(self._btn_refresh)
def set_refreshing(self, active: bool) -> None:
"""Блокирует / разблокирует кнопку во время загрузки."""
if hasattr(self, "_btn_refresh"):
self._btn_refresh.setEnabled(not active)
self._btn_refresh.setText("⟳ Загрузка…" if active else "⟳ Обновить")
class BasePage(QWidget):
"""
Базовая страница приложения.
Структура:
┌──────────────────────────────┐
│ PageHeader (title + ⟳) │
├──────────────────────────────┤
│ content_area (QWidget) │ ← подкласс заполняет self.content_layout
└──────────────────────────────┘
Использование в подклассе:
class MyPage(BasePage):
def __init__(self):
super().__init__("Моя страница", "подзаголовок")
# добавить виджеты:
self.content_layout.addWidget(...)
def refresh(self):
# логика перезагрузки данных
...
"""
def __init__(
self,
title: str,
subtitle: str = "",
show_refresh: bool = True,
parent=None,
) -> None:
super().__init__(parent)
root = QVBoxLayout(self)
root.setContentsMargins(0, 0, 0, 0)
root.setSpacing(0)
self._header = PageHeader(title, subtitle, show_refresh)
self._header.refresh_clicked.connect(self.refresh)
root.addWidget(self._header)
# Область контента
self._content_area = QWidget()
self._content_area.setStyleSheet("background: #1a1f29;")
self.content_layout = QVBoxLayout(self._content_area)
self.content_layout.setContentsMargins(20, 16, 20, 16)
self.content_layout.setSpacing(12)
root.addWidget(self._content_area, stretch=1)
# ------------------------------------------------------------------
def refresh(self) -> None:
"""Переопределить в подклассе для перезагрузки данных."""
pass
def set_loading(self, active: bool) -> None:
"""Показывает состояние загрузки в кнопке обновления."""
self._header.set_refreshing(active)
"""
Страница «Отчёт» — экспорт конфигурации кластера в HTML.
"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import (
QComboBox,
QFileDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QMessageBox,
QPushButton,
QTextBrowser,
QVBoxLayout,
)
from core.resources import get_templates_dir
from db import SessionLocal
from db.repository import (
list_clusters,
list_deployment_runs,
list_osd_devices,
list_servers,
)
from ui.base_page import BasePage
_TEMPLATES_DIR = get_templates_dir()
_BOX_STYLE = (
"QGroupBox { color: #8fbcbb; font-weight: bold; "
"border: 1px solid #2e3340; border-radius: 6px; margin-top: 8px; }"
"QGroupBox::title { subcontrol-origin: margin; padding: 0 6px; }"
)
class ReportWidget(BasePage):
def __init__(self, parent=None) -> None:
super().__init__("📄 Отчёт", "Экспорт конфигурации кластера в HTML")
self._html: str = ""
self._cluster_name: str = ""
self._build_content()
def _build_content(self) -> None:
# ── Панель управления ─────────────────────────────────────────
ctrl_box = QGroupBox("Формирование отчёта")
ctrl_box.setStyleSheet(_BOX_STYLE)
ctrl_layout = QHBoxLayout(ctrl_box)
ctrl_layout.addWidget(QLabel("Кластер:"))
self._cluster_combo = QComboBox()
self._cluster_combo.setMinimumWidth(240)
ctrl_layout.addWidget(self._cluster_combo)
self._btn_gen = QPushButton("🔄 Сформировать")
self._btn_gen.setFixedHeight(30)
self._btn_gen.setEnabled(False)
self._btn_gen.setStyleSheet(
"QPushButton { background: #2a3040; color: #8fbcbb; "
"border: 1px solid #3a4050; border-radius: 5px; font-size: 13px; }"
"QPushButton:hover { background: #2e4a7a; color: #fff; }"
"QPushButton:disabled { color: #444; border-color: #2a3040; }"
)
self._btn_gen.clicked.connect(self._generate)
ctrl_layout.addWidget(self._btn_gen)
self._btn_save = QPushButton("💾 Сохранить HTML")
self._btn_save.setFixedHeight(30)
self._btn_save.setEnabled(False)
self._btn_save.setStyleSheet(
"QPushButton { background: #1565c0; color: #fff; "
"border-radius: 5px; font-size: 13px; font-weight: bold; }"
"QPushButton:hover { background: #1976d2; }"
"QPushButton:disabled { background: #333; color: #555; }"
)
self._btn_save.clicked.connect(self._save)
ctrl_layout.addWidget(self._btn_save)
ctrl_layout.addStretch()
self.content_layout.addWidget(ctrl_box)
# ── Предпросмотр ──────────────────────────────────────────────
preview_box = QGroupBox("Предпросмотр")
preview_box.setStyleSheet(_BOX_STYLE)
preview_layout = QVBoxLayout(preview_box)
self._browser = QTextBrowser()
self._browser.setOpenExternalLinks(False)
self._browser.setStyleSheet(
"QTextBrowser { background: #ffffff; border: 1px solid #2e3340; "
"border-radius: 4px; }"
)
preview_layout.addWidget(self._browser)
self.content_layout.addWidget(preview_box, stretch=1)
self._load_clusters()
# ------------------------------------------------------------------
def _load_clusters(self) -> None:
self._cluster_combo.blockSignals(True)
self._cluster_combo.clear()
with SessionLocal() as session:
clusters = list_clusters(session)
if clusters:
for c in clusters:
self._cluster_combo.addItem(
f"{c.name} [{c.ceph_version}]", userData=c.id
)
self._btn_gen.setEnabled(True)
else:
self._cluster_combo.addItem("— нет кластеров —", userData=None)
self._btn_gen.setEnabled(False)
self._cluster_combo.blockSignals(False)
def refresh(self) -> None:
self._load_clusters()
# ------------------------------------------------------------------
def _generate(self) -> None:
cluster_id = self._cluster_combo.currentData()
if cluster_id is None:
return
with SessionLocal() as session:
clusters = list_clusters(session)
cluster = next(c for c in clusters if c.id == cluster_id)
servers = list_servers(session, cluster_id)
servers_data = []
for srv in servers:
osds = list_osd_devices(session, srv.id)
servers_data.append({
"hostname": srv.hostname,
"ip_address": srv.ip_address,
"role": srv.role.value,
"ssh_user": srv.ssh_user,
"osd_count": len(osds),
"osds": [
{"path": d.device_path,
"type": d.device_type.value,
"role": d.osd_role.value}
for d in osds
],
})
runs_raw = list_deployment_runs(session, cluster_id, limit=20)
runs_data = []
for r in runs_raw:
if r.finished_at:
secs = int((r.finished_at - r.started_at).total_seconds())
dur = f"{secs // 60}м {secs % 60}с"
else:
dur = "—"
runs_data.append({
"id": r.id,
"started_at": r.started_at.strftime("%Y-%m-%d %H:%M:%S"),
"finished_at": (
r.finished_at.strftime("%Y-%m-%d %H:%M:%S")
if r.finished_at else None
),
"duration": dur,
"status": r.status.value,
})
env = Environment(
loader=FileSystemLoader(str(_TEMPLATES_DIR)),
trim_blocks=True,
lstrip_blocks=True,
)
self._html = env.get_template("report.html.j2").render(
cluster={
"name": cluster.name,
"version": cluster.ceph_version,
"created_at": cluster.created_at.strftime("%Y-%m-%d %H:%M"),
},
servers=servers_data,
runs=runs_data,
generated_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
self._cluster_name = cluster.name
self._browser.setHtml(self._html)
self._btn_save.setEnabled(True)
def _save(self) -> None:
if not self._html:
return
default = f"report_{self._cluster_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
path, _ = QFileDialog.getSaveFileName(
self, "Сохранить отчёт", default, "HTML файлы (*.html)"
)
if not path:
return
Path(path).write_text(self._html, encoding="utf-8")
QMessageBox.information(self, "Сохранено", f"Отчёт сохранён:\n{path}")
"""
Страница «Настройки» — глобальные параметры приложения.
"""
from __future__ import annotations
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import (
QDoubleSpinBox,
QFormLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QMessageBox,
QPushButton,
QSpinBox,
QVBoxLayout,
)
from core.config import AppConfig
from ui.base_page import BasePage
_BOX_STYLE = (
"QGroupBox { color: #8fbcbb; font-weight: bold; "
"border: 1px solid #2e3340; border-radius: 6px; margin-top: 8px; }"
"QGroupBox::title { subcontrol-origin: margin; padding: 0 6px; }"
)
_FIELD_STYLE = (
"QLineEdit, QSpinBox { background: #1e2330; color: #c0c8d8; "
"border: 1px solid #3a4050; border-radius: 4px; padding: 4px 8px; }"
"QLineEdit:focus, QSpinBox:focus { border-color: #4a90d9; }"
)
class SettingsWidget(BasePage):
def __init__(self, parent=None) -> None:
super().__init__("⚙️ Настройки", "Параметры приложения", show_refresh=False)
self._build_content()
self._load()
def _build_content(self) -> None:
form_style = "QLabel { color: #c0c8d8; }"
# ── SSH ───────────────────────────────────────────────────────
ssh_box = QGroupBox("SSH по умолчанию")
ssh_box.setStyleSheet(_BOX_STYLE)
ssh_form = QFormLayout(ssh_box)
ssh_form.setSpacing(10)
ssh_form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._ssh_user = QLineEdit()
self._ssh_user.setStyleSheet(_FIELD_STYLE)
self._ssh_user.setMaximumWidth(280)
ssh_form.addRow("Пользователь:", self._ssh_user)
self._ssh_key = QLineEdit()
self._ssh_key.setStyleSheet(_FIELD_STYLE)
self._ssh_key.setMaximumWidth(400)
self._ssh_key.setPlaceholderText("~/.ssh/id_ed25519")
ssh_form.addRow("Путь к SSH-ключу:", self._ssh_key)
self.content_layout.addWidget(ssh_box)
# ── Сканирование ──────────────────────────────────────────────
scan_box = QGroupBox("Сканирование сети")
scan_box.setStyleSheet(_BOX_STYLE)
scan_form = QFormLayout(scan_box)
scan_form.setSpacing(10)
scan_form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._tcp_timeout = QSpinBox()
self._tcp_timeout.setRange(1, 30)
self._tcp_timeout.setSuffix(" с")
self._tcp_timeout.setFixedWidth(90)
self._tcp_timeout.setStyleSheet(_FIELD_STYLE)
scan_form.addRow("Таймаут TCP:", self._tcp_timeout)
self._ssh_timeout = QSpinBox()
self._ssh_timeout.setRange(1, 60)
self._ssh_timeout.setSuffix(" с")
self._ssh_timeout.setFixedWidth(90)
self._ssh_timeout.setStyleSheet(_FIELD_STYLE)
scan_form.addRow("Таймаут SSH:", self._ssh_timeout)
self.content_layout.addWidget(scan_box)
# ── Ansible ───────────────────────────────────────────────────
ans_box = QGroupBox("Ansible")
ans_box.setStyleSheet(_BOX_STYLE)
ans_form = QFormLayout(ans_box)
ans_form.setSpacing(10)
ans_form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._ansible_bin = QLineEdit()
self._ansible_bin.setStyleSheet(_FIELD_STYLE)
self._ansible_bin.setMaximumWidth(400)
self._ansible_bin.setPlaceholderText("ansible-playbook")
ans_form.addRow("Путь к ansible-playbook:", self._ansible_bin)
self.content_layout.addWidget(ans_box)
# ── Мониторинг ────────────────────────────────────────────────
mon_box = QGroupBox("Мониторинг")
mon_box.setStyleSheet(_BOX_STYLE)
mon_form = QFormLayout(mon_box)
mon_form.setSpacing(10)
mon_form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._refresh_interval = QSpinBox()
self._refresh_interval.setRange(0, 300)
self._refresh_interval.setSuffix(" с")
self._refresh_interval.setSpecialValueText("выкл")
self._refresh_interval.setFixedWidth(90)
self._refresh_interval.setStyleSheet(_FIELD_STYLE)
mon_form.addRow("Авто-обновление статуса:", self._refresh_interval)
self.content_layout.addWidget(mon_box)
# ── Кнопки ───────────────────────────────────────────────────
btn_row = QHBoxLayout()
self._btn_save = QPushButton("💾 Сохранить")
self._btn_save.setFixedHeight(34)
self._btn_save.setStyleSheet(
"QPushButton { background: #1565c0; color: #fff; border-radius: 6px; "
"font-size: 13px; font-weight: bold; }"
"QPushButton:hover { background: #1976d2; }"
)
self._btn_save.clicked.connect(self._save)
self._btn_reset = QPushButton("↺ Сбросить")
self._btn_reset.setFixedHeight(34)
self._btn_reset.setStyleSheet(
"QPushButton { background: #2a3040; color: #8fbcbb; "
"border: 1px solid #3a4050; border-radius: 6px; font-size: 13px; }"
"QPushButton:hover { background: #2e4a7a; color: #fff; }"
)
self._btn_reset.clicked.connect(self._reset)
btn_row.addWidget(self._btn_save)
btn_row.addWidget(self._btn_reset)
btn_row.addStretch()
self.content_layout.addLayout(btn_row)
self.content_layout.addStretch()
# ------------------------------------------------------------------
def _load(self) -> None:
self._ssh_user.setText(AppConfig.get("ssh_user"))
self._ssh_key.setText(AppConfig.get("ssh_key_path"))
self._tcp_timeout.setValue(int(AppConfig.get("scan_tcp_timeout")))
self._ssh_timeout.setValue(int(AppConfig.get("scan_ssh_timeout")))
self._ansible_bin.setText(AppConfig.get("ansible_bin"))
self._refresh_interval.setValue(int(AppConfig.get("status_refresh_interval")))
def _save(self) -> None:
AppConfig.set_value("ssh_user", self._ssh_user.text().strip() or "amegami")
AppConfig.set_value("ssh_key_path", self._ssh_key.text().strip() or "~/.ssh/id_ed25519")
AppConfig.set_value("scan_tcp_timeout", self._tcp_timeout.value())
AppConfig.set_value("scan_ssh_timeout", self._ssh_timeout.value())
AppConfig.set_value("ansible_bin", self._ansible_bin.text().strip() or "ansible-playbook")
AppConfig.set_value("status_refresh_interval", self._refresh_interval.value())
try:
AppConfig.save()
QMessageBox.information(self, "Сохранено", "Настройки сохранены.")
except Exception as exc:
QMessageBox.critical(self, "Ошибка", f"Не удалось сохранить:\n{exc}")
def _reset(self) -> None:
AppConfig.load()
self._load()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment