Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
T
telegram_bot_antispammer
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Иванова Мария Кирилловна
telegram_bot_antispammer
Commits
04ae6b6d
Commit
04ae6b6d
authored
Jan 20, 2025
by
Иванова Мария Кирилловна
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
first stage
parent
dada85ae
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
321 additions
and
0 deletions
+321
-0
bot.py
bot.py
+321
-0
No files found.
bot.py
0 → 100644
View file @
04ae6b6d
import
asyncio
from
aiogram
import
Bot
,
Dispatcher
,
Router
from
aiogram.types
import
ChatMemberUpdated
,
Message
from
aiogram.exceptions
import
AiogramError
from
pymorphy2
import
MorphAnalyzer
import
random
import
time
import
emoji
import
re
import
unicodedata
print
(
emoji
.
__version__
)
# Токен вашего бота
BOT_TOKEN
=
"7908227958:AAHyZjEr2x7SClN1W7hxGBgf3ivfVIIjM4A"
# Инициализация бота и диспетчера
bot
=
Bot
(
token
=
BOT_TOKEN
)
dp
=
Dispatcher
()
router
=
Router
()
dp
.
include_router
(
router
)
# Для хранения информации о новых участниках
active_users
=
{}
# Список спамных слов и фраз
SPAM_KEYWORDS
=
[
# Коммерческий спам
"скидка"
,
"акция"
,
"бонус"
,
"бесплатно"
,
"купить"
,
"продать"
,
"продажа"
,
"дешево"
,
"доставка"
,
"магазин"
,
"реклама"
,
"распродажа"
,
"экономия"
,
"дешевле"
,
"недорого"
,
"спецпредложение"
,
"уникальное предложение"
,
"только сегодня"
,
# Работа
"удалённая работа"
,
"бакс"
,
"доллар"
,
"$"
,
"нужен человек"
,
"писать в лс"
,
# Мошенничество
"лотерея"
,
"быстрый выигрыш"
,
"выигрыш"
,
"приз"
,
"деньги"
,
"заработать"
,
"заработок"
,
"зп"
,
"подработка"
,
"легкий заработок"
,
"мгновенно"
,
"доход"
,
"доход онлайн"
,
"как заработать"
,
"схема заработка"
,
"пассивный доход"
,
"быстро заработать"
,
"богатство"
,
"инвестировать"
,
"инвестиция"
,
"биткоин"
,
"криптовалюта"
,
"forex"
,
"деньги на карта"
,
"дивиденд"
,
"процент"
,
"депозит"
,
"выплата"
,
"невероятный доход"
,
# Продажа услуг и мошеннические предложения
"гадалка"
,
"приворот"
,
"заговор"
,
"магия"
,
"ритуал"
,
"расклад таро"
,
"ясновидение"
,
"экстрасенс"
,
"предсказание"
,
"любовь"
,
"вернуть парня"
,
"снять порчу"
,
"порча"
,
# Фишинг и подозрительные ссылки
"кликнуть сюда"
,
"нажать сюда"
,
"писать сюда"
,
"переходить здесь"
,
"переходить по ссылка"
,
"перейти по ссылка"
,
"узнать больше"
,
"узнать тут"
,
"суперпредложение"
,
"уникальный"
,
"карта"
,
# Иностранный спам
"discount"
,
"free"
,
"sale"
,
"limited offer"
,
"bitcoin"
,
"crypto"
,
"click here"
,
"earn money"
,
"adult content"
,
"sex"
,
"xxx"
,
"viagra"
,
"pills"
,
"casino"
,
"porn"
,
"win"
,
"lottery"
,
"low price"
,
"buy now"
,
"sale today"
,
"shipping free"
,
# Продвижение подписок, лайков и сервисов
"подписка"
,
"раскрутка"
,
"продвижение"
,
"лайк"
,
"подписчик"
,
"просмотр"
,
"отзыв"
,
"рейтинг"
,
"рейтинг магазин"
,
"работа"
,
"вакансия"
,
"раскрутка аккаунт"
,
"реферальная ссылка"
,
"реферал"
,
# Политический спам
"митинг"
,
"партия"
,
"агитация"
,
"голосовать"
,
"выборы"
,
"кандидат"
,
"поддержка"
,
"голосование"
,
"поддерживать мы"
,
# Вредоносные URL
".tinyurl"
,
".bitly"
,
".ly"
,
".click"
,
".top"
,
".app"
,
".info"
,
".xyz"
,
".vip"
,
".tk"
,
".pw"
,
".cc"
,
".link"
,
# Общее (агрессивные призывы, массовая рассылка)
"добрый время сутки"
,
"поделиться"
,
"отправить"
,
"рассказать все"
,
"срочно"
,
"обязательно прочитать"
,
"пересылать"
,
"передать друг"
,
# Контент, связанный с азартными играми
"казино"
,
"ставка"
,
"азартная игра"
,
"рулетка"
,
"выигрыш в казино"
,
"выигрывать"
,
"слоты"
,
"подарок"
,
"забрать подарок"
,
"получить подарок"
,
"казино онлайн"
,
"бесплатные фишки"
,
"ставки на спорт"
,
"играть"
,
"выигрывать"
,
"фриспин"
,
"получить выигрыш"
,
"casino"
,
# Эмодзи, часто используемые в спаме
"🎁"
,
"🔥"
,
"💸"
,
"💰"
,
"💎"
,
"🤑"
,
"🤩"
,
"⚡"
,
"⭐"
,
"💥"
]
def
has_mixed_layout
(
text
):
"""
Проверяет, содержит ли текст слова, написанные на смешанной раскладке.
Например: "пр1в3т", "teст", "нeт".
"""
cyrillic_letters
=
"абвгдеёжзийклмнопрстуфхцчшщъыьэюя"
latin_letters
=
"abcdefghijklmnopqrstuvwxyz"
words
=
text
.
split
()
for
word
in
words
:
has_cyrillic
=
any
(
char
in
cyrillic_letters
for
char
in
word
.
lower
())
has_latin
=
any
(
char
in
latin_letters
for
char
in
word
.
lower
())
if
has_cyrillic
and
has_latin
:
return
True
# Найдено слово со смешанной раскладкой
return
False
morph
=
MorphAnalyzer
()
def
normalize_text_to_infinitive
(
text
):
"""
Приводит текст к нижнему регистру, нормализует слова до начальной формы,
удаляет лишние символы и обрабатывает смешанные раскладки.
"""
if
not
text
:
# Проверяем, что текст не None или пустой
return
[]
# Приведение текста к единой форме Unicode (NFKC)
text
=
unicodedata
.
normalize
(
"NFKC"
,
text
)
# Удаление пунктуации и лишних символов
#text = ''.join(ch for ch in text if ch.isalnum() or ch.isspace())##############
text
=
''
.
join
(
ch
for
ch
in
text
if
ch
.
isalnum
()
or
ch
.
isspace
()
or
emoji
.
is_emoji
(
ch
))
text
=
re
.
sub
(
r'\s+'
,
' '
,
text
)
# Приведение к нижнему регистру
words
=
text
.
lower
()
.
split
()
# Нормализация слов до начальной формы
normalized_words
=
[]
for
word
in
words
:
try
:
normalized_words
.
append
(
morph
.
parse
(
word
)[
0
]
.
normal_form
)
except
Exception
as
e
:
# Если слово не удалось обработать, добавляем его как есть
normalized_words
.
append
(
word
)
return
normalized_words
def
preprocess_spam_keywords
(
keywords
):
"""Приводит ключевые слова к нормализованной форме."""
return
set
(
' '
.
join
(
normalize_text_to_infinitive
(
keyword
))
for
keyword
in
keywords
)
# Нормализуем ключевые слова
SPAM_KEYWORDS
=
preprocess_spam_keywords
(
SPAM_KEYWORDS
)
def
extract_emojis
(
text
):
return
text
.
join
(
c
for
c
in
text
if
unicodedata
.
category
(
c
)
.
startswith
(
'So'
))
def
is_spam
(
text
):
"""Проверяет, содержит ли текст спам."""
# Нормализуем текст
normalized_words
=
normalize_text_to_infinitive
(
text
)
emojis
=
extract_emojis
(
text
)
emoji_count
=
len
(
emojis
)
normalized_words
+=
emojis
# Объединяем нормализованные слова в выражения длиной 1, 2 и 3
phrases
=
set
()
for
n
in
range
(
1
,
4
):
phrases
.
update
(
" "
.
join
(
normalized_words
[
i
:
i
+
n
])
for
i
in
range
(
len
(
normalized_words
)
-
n
+
1
))
# Подсчитываем количество совпадений с ключевыми словами/выражениями
spam_count
=
sum
(
keyword
in
phrases
for
keyword
in
SPAM_KEYWORDS
)
spam_count
=
spam_count
+
emoji_count
# Возвращаем True, если найдено два или более совпадений
return
spam_count
>=
2
def
generate_math_problem
():
"""Генерация простого математического примера."""
a
=
random
.
randint
(
1
,
10
)
b
=
random
.
randint
(
1
,
10
)
return
f
"{a} + {b}"
,
a
+
b
@router.chat_member
()
async
def
welcome_new_user
(
event
:
ChatMemberUpdated
):
"""Обработчик новых участников группы."""
if
event
.
new_chat_member
.
status
==
"member"
and
event
.
old_chat_member
.
status
not
in
[
"member"
,
"administrator"
,
"creator"
]:
new_user
=
event
.
new_chat_member
.
user
math_problem
,
correct_answer
=
generate_math_problem
()
try
:
# Отправляем приветственное сообщение
welcome_message
=
await
bot
.
send_message
(
chat_id
=
event
.
chat
.
id
,
text
=
f
"Здравствуйте, {new_user.first_name}! Добро пожаловать в нашу группу!
\n\n
"
f
"Нам необходимо удостовериться, что вы человек. Вот математический пример: {math_problem}
\n
"
f
"Решите его, пожалуйста, в течение 1 минуты! В качестве ответа отправьте одно число."
)
# Сохраняем данные о пользователе
active_users
[
new_user
.
id
]
=
{
"message_ids"
:
[
welcome_message
.
message_id
],
# Список для хранения всех сообщений бота
"correct_answer"
:
correct_answer
,
"timestamp"
:
time
.
time
(),
"user_messages"
:
[],
# Сообщения пользователя
"user_answered"
:
False
,
"chat_id"
:
event
.
chat
.
id
,
}
print
(
f
"Приветственное сообщение отправлено пользователю {new_user.id} с примером {math_problem}"
)
except
Exception
as
e
:
print
(
f
"Не удалось отправить сообщение в группу {event.chat.id}. Ошибка: {e}"
)
# Ожидание ответа пользователя
await
check_answer
(
new_user
.
id
)
async
def
check_answer
(
user_id
):
"""Проверка ответа пользователя через 1 минуту."""
await
asyncio
.
sleep
(
60
)
user_data
=
active_users
.
get
(
user_id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
try
:
# Удаляем все сообщения
await
cleanup_messages
(
user_data
)
# Удаляем пользователя из группы
await
bot
.
ban_chat_member
(
chat_id
=
user_data
[
"chat_id"
],
user_id
=
user_id
)
print
(
f
"Пользователь {user_id} не ответил вовремя. Удален из группы."
)
except
AiogramError
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
async
def
cleanup_messages
(
user_data
):
"""Удаление всех сообщений пользователя и бота."""
try
:
# Удаляем все сообщения бота
for
message_id
in
user_data
[
"message_ids"
]:
try
:
print
(
f
"Удаляю сообщение бота: chat_id={user_data['chat_id']}, message_id={message_id}"
)
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
message_id
)
print
(
f
"Сообщение бота для пользователя {user_data['chat_id']} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения бота. Ошибка: {e}"
)
# Удаляем сообщения пользователя
for
user_message
in
user_data
[
"user_messages"
]:
try
:
if
user_message
:
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
user_message
.
message_id
)
print
(
f
"Сообщение пользователя {user_message.message_id} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения пользователя. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщений. Ошибка: {e}"
)
@router.message
()
async
def
handle_message
(
message
:
Message
):
"""Обработчик сообщений от пользователей для проверки ответа."""
text
=
message
.
text
or
message
.
caption
or
""
# Проверка на смешанную раскладку
if
text
and
has_mixed_layout
(
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено сообщение с смешанной раскладкой от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения с смешанной раскладкой. Ошибка: {e}"
)
# Проверка на мультимедиа (фото/видео)
if
message
.
photo
or
message
.
video
or
message
.
document
:
try
:
if
is_spam
(
text
):
# Проверяем подпись к мультимедиа
await
message
.
delete
()
print
(
f
"Удалено спамное мультимедиа сообщение от пользователя {message.from_user.id}."
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении мультимедиа сообщения. Ошибка: {e}"
)
# Проверка на кнопки/ссылки
if
message
.
reply_markup
and
hasattr
(
message
.
reply_markup
,
'inline_keyboard'
):
for
row
in
message
.
reply_markup
.
inline_keyboard
:
for
button
in
row
:
if
button
.
url
and
any
(
spam_word
in
button
.
url
.
lower
()
for
spam_word
in
SPAM_KEYWORDS
):
try
:
await
message
.
delete
()
print
(
f
"Удалено сообщение с подозрительной кнопкой от пользователя {message.from_user.id}."
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения с кнопкой. Ошибка: {e}"
)
# Проверка на спам
if
is_spam
(
message
.
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено спамное сообщение от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении спамного сообщения. Ошибка: {e}"
)
user_data
=
active_users
.
get
(
message
.
from_user
.
id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
# Сохраняем сообщение пользователя
user_data
[
"user_messages"
]
.
append
(
message
)
if
message
.
text
.
isdigit
():
user_answer
=
int
(
message
.
text
)
if
user_answer
==
user_data
[
"correct_answer"
]:
user_data
[
"user_answered"
]
=
True
# Ответ правильный — сохраняем сообщение и удаляем все
correct_message
=
await
message
.
reply
(
"Благодарим!"
)
user_data
[
"message_ids"
]
.
append
(
correct_message
.
message_id
)
# Добавляем в список ID сообщений
print
(
f
"Пользователь {message.from_user.id} ответил правильно."
)
# Удаляем сообщения после правильного ответа
await
cleanup_messages
(
user_data
)
else
:
# Ответ неправильный, добавляем новое сообщение бота
incorrect_message
=
await
message
.
reply
(
"Неправильный ответ. Попробуйте еще раз."
)
user_data
[
"message_ids"
]
.
append
(
incorrect_message
.
message_id
)
print
(
f
"Отправлено сообщение о неправильном ответе для пользователя {message.from_user.id}"
)
else
:
# Если ответ не число
invalid_message
=
await
message
.
reply
(
"Пожалуйста, отправьте числовой ответ."
)
user_data
[
"message_ids"
]
.
append
(
invalid_message
.
message_id
)
print
(
f
"Отправлено сообщение с просьбой ввести числовой ответ для пользователя {message.from_user.id}"
)
async
def
main
():
"""Запуск бота."""
await
dp
.
start_polling
(
bot
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment