Commit d0cb7e45 authored by Roman Alifanov's avatar Roman Alifanov

command_runner.py: refactoring

parent 86bcf940
......@@ -8,116 +8,116 @@ import selectors
from gi.repository import GLib
class CommandRunner:
def __init__(self, on_done=None):
def __init__(self):
self.dialog = None
self.textbuffer = None
self.yes_no_chars = 'yYдДnNнН' # Define the set of "yes" and "no" characters
self.on_done = on_done # Callback function to be called after completion
def append_log(self, text):
GLib.idle_add(self.textbuffer.insert_at_cursor, text)
GLib.idle_add(self.dialog.logdialog_textview.scroll_to_mark,
self.textbuffer.get_insert(), 0, False, 0.0, 1.0)
def run_command(self, command, dialog):
# Define the set of "yes" and "no" characters
yes_no_chars = 'yYдДnNнН'
def remove_ansi_escape_sequences(self, text):
result = []
in_escape = False
self.dialog = dialog
self.textbuffer = dialog.logdialog_textview.get_buffer()
for char in text:
if char == '\x1b': # Начало
in_escape = True
elif in_escape and char in 'mK':
in_escape = False # Конец
elif not in_escape:
result.append(char)
return ''.join(result)
def handle_question(self, cleaned_output, fd):
# Check for the pattern in the output (letters in any order)
pattern = rf'\[([{self.yes_no_chars}])/([{self.yes_no_chars}])\]'
def append_log(text):
GLib.idle_add(self.textbuffer.insert_at_cursor, text)
GLib.idle_add(dialog.logdialog_textview.scroll_to_mark,
self.textbuffer.get_insert(), 0, False, 0.0, 1.0)
match = re.search(pattern, cleaned_output)
if match:
first_option, second_option = match.groups()
def remove_ansi_escape_sequences(text):
result = []
in_escape = False
print(f"First option: {first_option}, Second option: {second_option}")
self.dialog.question_label.set_label(cleaned_output)
for char in text:
if char == '\x1b': # Начало
in_escape = True
elif in_escape and char in 'mK':
in_escape = False # Конец
elif not in_escape:
result.append(char)
for css in ["suggested-action", "destructive-action"]:
self.dialog.question_button_1.remove_css_class(css)
self.dialog.question_button_2.remove_css_class(css)
# Check which one is destructive (nNнН)
self.dialog.question_button_1.connect("activated", lambda _: os.write(fd, f"{first_option}\n".encode('utf-8')))
self.dialog.question_button_2.connect("activated", lambda _: os.write(fd, f"{second_option}\n".encode('utf-8')))
return ''.join(result)
self.dialog.question_button_1.set_title(first_option)
self.dialog.question_button_2.set_title(second_option)
if first_option in 'nNнН':
self.dialog.question_button_1.add_css_class("destructive-action")
self.dialog.question_button_2.add_css_class("suggested-action")
elif second_option in 'nNнН':
self.dialog.question_button_2.add_css_class("destructive-action")
self.dialog.question_button_1.add_css_class("suggested-action")
self.dialog.question_revealer.set_reveal_child(True)
def read_output(self, fd):
while True:
try:
output = os.read(fd, 1024).decode('utf-8', errors='replace')
if output:
cleaned_output = self.remove_ansi_escape_sequences(output)
self.append_log(cleaned_output)
self.dialog.question_revealer.set_reveal_child(False)
self.handle_question(cleaned_output, fd)
else:
break
except OSError:
# Break the loop on I/O errors, assuming the process has terminated
break
def process_runner(self, command):
process = None
sel = None
master_fd, slave_fd = pty.openpty()
try:
env = os.environ.copy()
env['TERM'] = 'xterm'
process = subprocess.Popen(command, shell=True,
stdout=slave_fd, stderr=slave_fd, stdin=slave_fd,
text=True, env=env)
os.close(slave_fd)
# Use selectors to manage I/O
sel = selectors.DefaultSelector()
sel.register(master_fd, selectors.EVENT_READ)
def read_output(fd):
while True:
try:
output = os.read(fd, 1024).decode('utf-8', errors='replace')
if output:
cleaned_output = remove_ansi_escape_sequences(output)
append_log(cleaned_output)
self.dialog.question_revealer.set_reveal_child(False)
# Check for the pattern in the output (letters in any order)
pattern = rf'\[([{yes_no_chars}])/([{yes_no_chars}])\]'
match = re.search(pattern, cleaned_output)
if match:
first_option, second_option = match.groups()
print(f"First option: {first_option}, Second option: {second_option}")
self.dialog.question_label.set_label(cleaned_output)
for css in ["suggested-action", "destructive-action"]:
self.dialog.question_button_1.remove_css_class(css)
self.dialog.question_button_2.remove_css_class(css)
# Check which one is destructive (nNнН)
self.dialog.question_button_1.connect("activated", lambda _: os.write(fd, f"{first_option}\n".encode('utf-8')))
self.dialog.question_button_2.connect("activated", lambda _: os.write(fd, f"{second_option}\n".encode('utf-8')))
self.dialog.question_button_1.set_title(first_option)
self.dialog.question_button_2.set_title(second_option)
if first_option in 'nNнН':
self.dialog.question_button_1.add_css_class("destructive-action")
self.dialog.question_button_2.add_css_class("suggested-action")
elif second_option in 'nNнН':
self.dialog.question_button_2.add_css_class("destructive-action")
self.dialog.question_button_1.add_css_class("suggested-action")
self.dialog.question_revealer.set_reveal_child(True)
else:
break
except OSError as e:
# Break the loop on I/O errors, assuming the process has terminated
events = sel.select()
for key, _ in events:
if key.fileobj == master_fd:
self.read_output(master_fd)
# Check if process has finished
if process.poll() is not None:
break
def process_runner():
master_fd, slave_fd = pty.openpty()
try:
env = os.environ.copy()
env['TERM'] = 'xterm'
process = subprocess.Popen(command, shell=True,
stdout=slave_fd, stderr=slave_fd, stdin=slave_fd,
text=True, env=env)
os.close(slave_fd)
# Use selectors to manage I/O
sel = selectors.DefaultSelector()
sel.register(master_fd, selectors.EVENT_READ)
while True:
events = sel.select()
for key, _ in events:
if key.fileobj == master_fd:
read_output(master_fd)
# Check if process has finished
if process.poll() is not None:
break
finally:
sel.unregister(master_fd)
os.close(master_fd)
process.wait()
GLib.idle_add(dialog.force_close)
if self.on_done:
GLib.idle_add(self.on_done) # Call the callback function after completion
threading.Thread(target=process_runner, daemon=True).start()
\ No newline at end of file
finally:
sel.unregister(master_fd)
os.close(master_fd)
process.wait()
GLib.idle_add(self.dialog.force_close)
if self.on_done:
GLib.idle_add(self.on_done) # Call the callback function after completion
def run_command(self, command, on_done, dialog):
self.dialog = dialog
self.on_done = on_done
self.textbuffer = dialog.logdialog_textview.get_buffer()
threading.Thread(target=self.process_runner, args=(command,), daemon=True).start()
......@@ -19,5 +19,5 @@ class LogDialog(Adw.Dialog):
def run(self, command, on_done):
self.present(self.win)
# Создание и передача функции обратного вызова для обновления UI
runner = CommandRunner(on_done=on_done)
runner.run_command(command, self)
runner = CommandRunner()
runner.run_command(command, on_done, dialog=self)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment