Commit e82a7899 authored by Roman Alifanov's avatar Roman Alifanov

fixing squares due to ansi + preparing for some feature

parent b4f23755
import subprocess import subprocess
import pty
import os
import re
import threading import threading
import selectors
from gi.repository import GLib from gi.repository import GLib
...@@ -11,6 +15,9 @@ class CommandRunner: ...@@ -11,6 +15,9 @@ class CommandRunner:
self.on_done = on_done # Callback function to be called after completion self.on_done = on_done # Callback function to be called after completion
def run_command(self, command, dialog): def run_command(self, command, dialog):
# Define the set of "yes" and "no" characters
yes_no_chars = 'yYдДnNнН'
self.dialog = dialog self.dialog = dialog
self.textbuffer = dialog.get_child().get_child().get_child().get_buffer() self.textbuffer = dialog.get_child().get_child().get_child().get_buffer()
...@@ -19,30 +26,81 @@ class CommandRunner: ...@@ -19,30 +26,81 @@ class CommandRunner:
GLib.idle_add(dialog.get_child().get_child().get_child().scroll_to_mark, GLib.idle_add(dialog.get_child().get_child().get_child().scroll_to_mark,
self.textbuffer.get_insert(), 0, False, 0.0, 1.0) self.textbuffer.get_insert(), 0, False, 0.0, 1.0)
def read_output(source): def remove_ansi_escape_sequences(text):
result = []
in_escape = False
for char in text:
if char == '\x1b': # Начало
in_escape = True
elif in_escape and char in 'mK':
in_escape = False # Конец
elif not in_escape:
result.append(char)
return ''.join(result)
def read_output(fd):
while True: while True:
line = source.readline() try:
if line: output = os.read(fd, 1024).decode('utf-8', errors='replace')
GLib.idle_add(append_log, line) if output:
cleaned_output = remove_ansi_escape_sequences(output)
append_log(cleaned_output)
# Check for the pattern in the output (letters in any order)
pattern = rf'\[([{yes_no_chars}])/([{yes_no_chars}])\]'
match = re.search(pattern, cleaned_output)
if match:
first_option, second_option = match.groups()
print(f"First option: {first_option}, Second option: {second_option}")
# Check which one is destructive (nNнН)
destructive_option = None
if first_option in 'nNнН':
destructive_option = first_option
elif second_option in 'nNнН':
destructive_option = second_option
if destructive_option:
print(f"Destructive option '{destructive_option}' detected!")
#os.write(fd, b'y\n')
else: else:
break break
except OSError as e:
# Break the loop on I/O errors, assuming the process has terminated
break
def process_runner(): def process_runner():
master_fd, slave_fd = pty.openpty()
try: try:
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, process = subprocess.Popen(command, shell=True,
stderr=subprocess.PIPE, text=True) stdout=slave_fd, stderr=slave_fd, text=True)
threading.Thread(target=read_output, daemon=True, args=(process.stdout,)).start() os.close(slave_fd)
threading.Thread(target=read_output, daemon=True, args=(process.stderr,)).start()
process.wait() # Use selectors to manage I/O
sel = selectors.DefaultSelector()
sel.register(master_fd, selectors.EVENT_READ)
GLib.idle_add(dialog.force_close) while True:
events = sel.select()
for key, _ in events:
if key.fileobj == master_fd:
read_output(master_fd)
if self.on_done: # Check if process has finished
GLib.idle_add(self.on_done) # Call the callback function after completion if process.poll() is not None:
break
except subprocess.CalledProcessError as e: finally:
GLib.idle_add(append_log, f"Ошибка: {str(e)}\n") sel.unregister(master_fd)
os.close(master_fd)
process.wait()
GLib.idle_add(dialog.force_close) GLib.idle_add(dialog.force_close)
if self.on_done: if self.on_done:
GLib.idle_add(self.on_done) # Call the callback function after completion GLib.idle_add(self.on_done) # Call the callback function after completion
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment