Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
antispammer
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Иванова Мария Кирилловна
antispammer
Commits
143eae16
Commit
143eae16
authored
Jan 17, 2025
by
Иванова Мария Кирилловна
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
abd4a00a
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
285 additions
and
0 deletions
+285
-0
Bot.py
Bot.py
+285
-0
No files found.
Bot.py
0 → 100644
View file @
143eae16
import
asyncio
from
aiogram
import
Bot
,
Dispatcher
,
Router
from
aiogram.types
import
ChatMemberUpdated
,
Message
from
aiogram.exceptions
import
AiogramError
from
pymorphy2
import
MorphAnalyzer
import
random
import
time
import
unicodedata
# Токен вашего бота
BOT_TOKEN
=
"*****************"
# Инициализация бота и диспетчера
bot
=
Bot
(
token
=
BOT_TOKEN
)
dp
=
Dispatcher
()
router
=
Router
()
dp
.
include_router
(
router
)
# Для хранения информации о новых участниках
active_users
=
{}
# Список спамных слов и фраз
SPAM_KEYWORDS
=
[
# Коммерческий спам
"скидка"
,
"акция"
,
"бонус"
,
"бесплатно"
,
"купить"
,
"продать"
,
"продажа"
,
"дешево"
,
"доставка"
,
"магазин"
,
"реклама"
,
"распродажа"
,
"экономия"
,
"дешевле"
,
"недорого"
,
"спецпредложение"
,
"уникальное предложение"
,
"только сегодня"
,
# Работа
"удалённая работа"
,
"бакс"
,
"доллар"
,
"$"
,
"нужен человек"
,
"писать лс"
,
# Мошенничество
"лотерея"
,
"быстрый выигрыш"
,
"выигрыш"
,
"приз"
,
"деньги"
,
"заработать"
,
"заработок"
,
"легкий заработок"
,
"мгновенно"
,
"доход"
,
"доход онлайн"
,
"как заработать"
,
"схема заработка"
,
"пассивный доход"
,
"быстро заработать"
,
"богатство"
,
"инвестировать"
,
"инвестиция"
,
"биткоин"
,
"криптовалюта"
,
"forex"
,
"деньги на карта"
,
# Продажа услуг и мошеннические предложения
"гадалка"
,
"приворот"
,
"заговор"
,
"магия"
,
"ритуал"
,
"расклад таро"
,
"ясновидение"
,
"экстрасенс"
,
"предсказание"
,
"любовь"
,
"вернуть парня"
,
"снять порчу"
,
"порча"
,
# Фишинг и подозрительные ссылки
"кликнуть сюда"
,
"нажать сюда"
,
"перейти ссылка"
,
"узнать больше"
,
"узнать тут"
,
"суперпредложение"
,
"карта"
,
# Иностранный спам
"discount"
,
"free"
,
"sale"
,
"limited offer"
,
"bitcoin"
,
"crypto"
,
"click here"
,
"earn money"
,
"adult content"
,
"sex"
,
"xxx"
,
"viagra"
,
"pills"
,
"casino"
,
"porn"
,
"win"
,
"lottery"
,
"low price"
,
"buy now"
,
"sale today"
,
"shipping free"
,
# Продвижение подписок, лайков и сервисов
"подписка"
,
"раскрутка"
,
"продвижение"
,
"лайк"
,
"подписчик"
,
"просмотр"
,
"отзыв"
,
"рейтинг"
,
"рейтинг магазин"
,
"работа"
,
"вакансия"
,
"раскрутка аккаунт"
,
"реферальная ссылка"
,
"реферал"
,
# Политический спам
"митинг"
,
"партия"
,
"агитация"
,
"голосовать"
,
"выборы"
,
"кандидат"
,
"поддержка"
,
"голосование"
,
"поддерживать мы"
,
# Вредоносные URL
".tinyurl"
,
".bitly"
,
".ly"
,
".click"
,
".top"
,
".app"
,
".info"
,
".xyz"
,
".vip"
,
".tk"
,
".pw"
,
".cc"
,
".link"
,
# Общее (агрессивные призывы, массовая рассылка)
"поделиться"
,
"отправить"
,
"рассказать все"
,
"срочно"
,
"обязательно прочитать"
,
"пересылать"
,
"передать друг"
,
# Контент, связанный с азартными играми
"казино"
,
"ставка"
,
"азартная игра"
,
"рулетка"
,
"выигрыш в казино"
,
"слоты"
,
"казино онлайн"
,
"бесплатные фишки"
,
"ставки на спорт"
,
"играть"
,
"выигрывать"
,
"фриспин"
,
"получить выигрыш"
,
"casino"
,
# Эмодзи, часто используемые в спаме
"🎁"
,
"🔥"
,
"💸"
,
"💰"
,
"💎"
,
"🤑"
,
"🤩"
,
"⚡"
,
"⭐"
]
def
has_mixed_layout
(
text
):
"""
Проверяет, содержит ли текст слова, написанные на смешанной раскладке.
Например: "пр1в3т", "teст", "нeт".
"""
cyrillic_letters
=
"абвгдеёжзийклмнопрстуфхцчшщъыьэюя"
latin_letters
=
"abcdefghijklmnopqrstuvwxyz"
words
=
text
.
split
()
for
word
in
words
:
has_cyrillic
=
any
(
char
in
cyrillic_letters
for
char
in
word
.
lower
())
has_latin
=
any
(
char
in
latin_letters
for
char
in
word
.
lower
())
if
has_cyrillic
and
has_latin
:
return
True
# Найдено слово со смешанной раскладкой
return
False
morph
=
MorphAnalyzer
()
def
normalize_text_to_infinitive
(
text
):
"""
Приводит текст к нижнему регистру, нормализует слова до начальной формы,
удаляет лишние символы и обрабатывает смешанные раскладки.
"""
# Приведение текста к единой форме Unicode (NFKC)
text
=
unicodedata
.
normalize
(
"NFKC"
,
text
)
# Замена символов из разных раскладок (кириллица ↔ латиница)
#text = text.translate(str.maketrans(
# "AaBCcEeKHMOoPpTyXx",
# "АаВСсЕеКМНОоРрТуХх"
#))
# Удаление пунктуации и лишних символов
text
=
''
.
join
(
ch
for
ch
in
text
if
ch
.
isalnum
()
or
ch
.
isspace
())
# Приведение к нижнему регистру
words
=
text
.
lower
()
.
split
()
# Нормализация слов до начальной формы
normalized_words
=
[]
for
word
in
words
:
try
:
normalized_words
.
append
(
morph
.
parse
(
word
)[
0
]
.
normal_form
)
except
Exception
as
e
:
# Если слово не удалось обработать, добавляем его как есть
normalized_words
.
append
(
word
)
return
normalized_words
def
preprocess_spam_keywords
(
keywords
):
"""Приводит ключевые слова к нормализованной форме."""
return
set
(
' '
.
join
(
normalize_text_to_infinitive
(
keyword
))
for
keyword
in
keywords
)
# Нормализуем ключевые слова
SPAM_KEYWORDS
=
preprocess_spam_keywords
(
SPAM_KEYWORDS
)
def
is_spam
(
text
):
"""Проверяет, содержит ли текст спам."""
# Нормализуем текст
normalized_words
=
normalize_text_to_infinitive
(
text
)
# Объединяем нормализованные слова в выражения длиной 1, 2 и 3
phrases
=
set
()
for
n
in
range
(
1
,
4
):
phrases
.
update
(
" "
.
join
(
normalized_words
[
i
:
i
+
n
])
for
i
in
range
(
len
(
normalized_words
)
-
n
+
1
))
# Подсчитываем количество совпадений с ключевыми словами/выражениями
spam_count
=
sum
(
keyword
in
phrases
for
keyword
in
SPAM_KEYWORDS
)
# Возвращаем True, если найдено два или более совпадений
return
spam_count
>=
2
def
generate_math_problem
():
"""Генерация простого математического примера."""
a
=
random
.
randint
(
1
,
10
)
b
=
random
.
randint
(
1
,
10
)
return
f
"{a} + {b}"
,
a
+
b
@router.chat_member
()
async
def
welcome_new_user
(
event
:
ChatMemberUpdated
):
"""Обработчик новых участников группы."""
if
event
.
new_chat_member
.
status
==
"member"
and
event
.
old_chat_member
.
status
not
in
[
"member"
,
"administrator"
,
"creator"
]:
new_user
=
event
.
new_chat_member
.
user
math_problem
,
correct_answer
=
generate_math_problem
()
try
:
# Отправляем приветственное сообщение
welcome_message
=
await
bot
.
send_message
(
chat_id
=
event
.
chat
.
id
,
text
=
f
"Здравствуйте, {new_user.first_name}! Добро пожаловать в нашу группу!
\n\n
"
f
"Нам необходимо удостовериться, что вы человек. Вот математический пример: {math_problem}
\n
"
f
"Решите его, пожалуйста, в течение 1 минуты! В качестве ответа отправьте одно число."
)
# Сохраняем данные о пользователе
active_users
[
new_user
.
id
]
=
{
"message_ids"
:
[
welcome_message
.
message_id
],
# Список для хранения всех сообщений бота
"correct_answer"
:
correct_answer
,
"timestamp"
:
time
.
time
(),
"user_messages"
:
[],
# Сообщения пользователя
"user_answered"
:
False
,
"chat_id"
:
event
.
chat
.
id
,
}
print
(
f
"Приветственное сообщение отправлено пользователю {new_user.id} с примером {math_problem}"
)
except
Exception
as
e
:
print
(
f
"Не удалось отправить сообщение в группу {event.chat.id}. Ошибка: {e}"
)
# Ожидание ответа пользователя
await
check_answer
(
new_user
.
id
)
async
def
check_answer
(
user_id
):
"""Проверка ответа пользователя через 1 минуту."""
await
asyncio
.
sleep
(
60
)
user_data
=
active_users
.
get
(
user_id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
try
:
# Удаляем все сообщения
await
cleanup_messages
(
user_data
)
# Удаляем пользователя из группы
await
bot
.
ban_chat_member
(
chat_id
=
user_data
[
"chat_id"
],
user_id
=
user_id
)
print
(
f
"Пользователь {user_id} не ответил вовремя. Удален из группы."
)
except
AiogramError
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
async
def
cleanup_messages
(
user_data
):
"""Удаление всех сообщений пользователя и бота."""
try
:
# Удаляем все сообщения бота
for
message_id
in
user_data
[
"message_ids"
]:
try
:
print
(
f
"Удаляю сообщение бота: chat_id={user_data['chat_id']}, message_id={message_id}"
)
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
message_id
)
print
(
f
"Сообщение бота для пользователя {user_data['chat_id']} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения бота. Ошибка: {e}"
)
# Удаляем сообщения пользователя
for
user_message
in
user_data
[
"user_messages"
]:
try
:
if
user_message
:
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
user_message
.
message_id
)
print
(
f
"Сообщение пользователя {user_message.message_id} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения пользователя. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщений. Ошибка: {e}"
)
@router.message
()
async
def
handle_message
(
message
:
Message
):
"""Обработчик сообщений от пользователей для проверки ответа."""
# Проверка на смешанную раскладку
if
has_mixed_layout
(
message
.
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено сообщение с смешанной раскладкой от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения с смешанной раскладкой. Ошибка: {e}"
)
# Проверка на спам
if
is_spam
(
message
.
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено спамное сообщение от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении спамного сообщения. Ошибка: {e}"
)
user_data
=
active_users
.
get
(
message
.
from_user
.
id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
# Сохраняем сообщение пользователя
user_data
[
"user_messages"
]
.
append
(
message
)
if
message
.
text
.
isdigit
():
user_answer
=
int
(
message
.
text
)
if
user_answer
==
user_data
[
"correct_answer"
]:
user_data
[
"user_answered"
]
=
True
# Ответ правильный — сохраняем сообщение и удаляем все
correct_message
=
await
message
.
reply
(
"Благодарим!"
)
user_data
[
"message_ids"
]
.
append
(
correct_message
.
message_id
)
# Добавляем в список ID сообщений
print
(
f
"Пользователь {message.from_user.id} ответил правильно."
)
# Удаляем сообщения после правильного ответа
await
cleanup_messages
(
user_data
)
else
:
# Ответ неправильный, добавляем новое сообщение бота
incorrect_message
=
await
message
.
reply
(
"Неправильный ответ. Попробуйте еще раз."
)
user_data
[
"message_ids"
]
.
append
(
incorrect_message
.
message_id
)
print
(
f
"Отправлено сообщение о неправильном ответе для пользователя {message.from_user.id}"
)
else
:
# Если ответ не число
invalid_message
=
await
message
.
reply
(
"Пожалуйста, отправьте числовой ответ."
)
user_data
[
"message_ids"
]
.
append
(
invalid_message
.
message_id
)
print
(
f
"Отправлено сообщение с просьбой ввести числовой ответ для пользователя {message.from_user.id}"
)
async
def
main
():
"""Запуск бота."""
await
dp
.
start_polling
(
bot
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment