asks: add ACL-based notifications and subtask details

parent 25aeddb6
import asyncio
from telegrinder.modules import logger
from telegrinder.tools.formatting import HTMLFormatter, link
from telegrinder.tools.formatting import HTMLFormatter, link, code_inline
from altrepo.taskoteka.types import TaskState
from config import tg_api, altrepo, TASK_URL
from database.func import DB
NOTIFY_STATES = {TaskState.TESTED, TaskState.FAILED, TaskState.DONE}
OWNER_NOTIFY_STATES = {TaskState.TESTED, TaskState.FAILED, TaskState.DONE}
ACL_NOTIFY_STATES = {
TaskState.BUILDING,
TaskState.TESTED,
TaskState.DONE,
TaskState.FAILED,
TaskState.EPERM,
}
def _format_subtasks(subtasks: dict) -> str:
if not subtasks:
return ""
max_num_len = max(len(num) for num in subtasks)
lines = []
for num, sub in subtasks.items():
padded = num + " " * (max_num_len - len(num))
name = sub.pkgname or "unknown"
version = f"{sub.version}-{sub.release}" if sub.version and sub.release else ""
entry = f"{name}/{version}" if version else name
lines.append(f"{HTMLFormatter(code_inline(padded))} | {entry}")
return "\n".join(lines)
def _format_owner_message(task_link, data, subtasks_text):
message = f"Таск {task_link}\nСтатус: {data.prev_state} -> {data.state}"
if subtasks_text:
message += f"\n\n{subtasks_text}"
return message
def _format_acl_message(task_link, owner, state, repo, subtasks_text):
message = (
f"В таске {task_link} ({owner}) присутствуют ваши пакеты!\n\nСтатус: {state}"
)
if state == str(TaskState.EPERM) and repo == "sisyphus":
message += "\nТребуется ваше подтверждение!"
if subtasks_text:
message += f"\n\n{subtasks_text}"
return message
async def _notify_users(maintainer: str, message: str):
users = DB.user.get_by_maintainer(maintainer)
for user in users:
if not DB.mailing.is_enabled(user, "task_events"):
continue
try:
await tg_api.send_message(chat_id=user.user_id, text=message)
except Exception:
pass
async def _listen():
......@@ -22,27 +74,61 @@ async def _listen():
if data.state == data.prev_state:
continue
if data.state not in NOTIFY_STATES:
is_owner_notify = data.state in OWNER_NOTIFY_STATES
is_acl_notify = data.state in ACL_NOTIFY_STATES
if not is_owner_notify and not is_acl_notify:
continue
try:
task = await altrepo.taskoteka.tasks(task_id=data.id)
repo = task.repo
subtasks = task.subtasks
except Exception:
repo = "unknown"
subtasks = {}
task_url = f"{TASK_URL}{data.id}"
task_link = HTMLFormatter(link(task_url, text=f"{repo}/{data.id}"))
message = f"Таск {task_link}\n\nСтатус: {data.prev_state} -> {data.state}"
users = DB.user.get_by_maintainer(data.owner)
for user in users:
if not DB.mailing.is_enabled(user, "task_events"):
continue
try:
await tg_api.send_message(chat_id=user.user_id, text=message)
except Exception:
pass
subtasks_text = _format_subtasks(subtasks)
# Уведомление owner
if is_owner_notify:
message = _format_owner_message(task_link, data, subtasks_text)
await _notify_users(data.owner, message)
# Уведомление мейнтейнерам
if is_acl_notify and subtasks:
pkg_names = [sub.pkgname for sub in subtasks.values() if sub.pkgname]
if pkg_names:
try:
acl_data = await altrepo.api.acl.by_packages(repo, pkg_names)
except Exception:
acl_data = None
if acl_data:
# leader -> список его пакетов в таске
leader_packages: dict[str, list[str]] = {}
for pkg in acl_data.packages:
if not pkg.members:
continue
leader = pkg.members[0]
if leader == data.owner:
continue
leader_packages.setdefault(leader, []).append(pkg.name)
for leader, pkgs in leader_packages.items():
pkg_set = set(pkgs)
filtered = {
num: sub
for num, sub in subtasks.items()
if sub.pkgname in pkg_set
}
message = _format_acl_message(
task_link, data.owner, data.state, repo,
_format_subtasks(filtered),
)
await _notify_users(leader, message)
def start_task_events_listener():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment