import subprocess import pty import os import re import threading import selectors from gi.repository import GLib class CommandRunner: def __init__(self, on_done=None): self.dialog = None self.textbuffer = None self.on_done = on_done # Callback function to be called after completion def run_command(self, command, dialog): # Define the set of "yes" and "no" characters yes_no_chars = 'yYдДnNнН' self.dialog = dialog self.textbuffer = dialog.logdialog_textview.get_buffer() def append_log(text): GLib.idle_add(self.textbuffer.insert_at_cursor, text) GLib.idle_add(dialog.logdialog_textview.scroll_to_mark, self.textbuffer.get_insert(), 0, False, 0.0, 1.0) def remove_ansi_escape_sequences(text): result = [] in_escape = False for char in text: if char == '\x1b': # Начало in_escape = True elif in_escape and char in 'mK': in_escape = False # Конец elif not in_escape: result.append(char) return ''.join(result) def read_output(fd): while True: try: output = os.read(fd, 1024).decode('utf-8', errors='replace') if output: cleaned_output = remove_ansi_escape_sequences(output) append_log(cleaned_output) # Check for the pattern in the output (letters in any order) pattern = rf'\[([{yes_no_chars}])/([{yes_no_chars}])\]' match = re.search(pattern, cleaned_output) if match: first_option, second_option = match.groups() print(f"First option: {first_option}, Second option: {second_option}") # Check which one is destructive (nNнН) destructive_option = None if first_option in 'nNнН': destructive_option = first_option elif second_option in 'nNнН': destructive_option = second_option if destructive_option: print(f"Destructive option '{destructive_option}' detected!") #os.write(fd, b'y\n') else: break except OSError as e: # Break the loop on I/O errors, assuming the process has terminated break def process_runner(): master_fd, slave_fd = pty.openpty() try: process = subprocess.Popen(command, shell=True, stdout=slave_fd, stderr=slave_fd, text=True) os.close(slave_fd) # Use selectors to manage I/O sel = selectors.DefaultSelector() sel.register(master_fd, selectors.EVENT_READ) while True: events = sel.select() for key, _ in events: if key.fileobj == master_fd: read_output(master_fd) # Check if process has finished if process.poll() is not None: break finally: sel.unregister(master_fd) os.close(master_fd) process.wait() GLib.idle_add(dialog.force_close) if self.on_done: GLib.idle_add(self.on_done) # Call the callback function after completion threading.Thread(target=process_runner, daemon=True).start()