This commit is contained in:
2026-02-01 12:56:05 +01:00
parent f51adeecca
commit 0bdbcb1657
857 changed files with 0 additions and 97661 deletions

View File

@@ -0,0 +1,109 @@
import subprocess
import threading
import time
import os
import sys
import signal
from queue import Queue, Empty
import datetime
class ESP32Runner:
def __init__(self, mode="SIM", port="COM9"):
self.mode = mode.upper()
self.port = port
self.process = None
self.output_queue = Queue()
self.stop_event = threading.Event()
def _get_command(self):
"""Constructs the command based on SIM or REAL mode."""
if self.mode == "REAL":
# For real hardware, we use idf.py monitor
# We assume idf.py is in the PATH
cmd_str = f"idf.py -p {self.port} monitor"
else:
# Existing QEMU command
cmd_str = (
"qemu-system-xtensa -M esp32 -m 4M "
"-drive file=build/qemu_flash.bin,if=mtd,format=raw "
"-nographic -serial mon:stdio"
)
if os.name == 'posix':
# Wrap with stdbuf to force line-buffering through the pipes
return f"stdbuf -oL -eL {cmd_str}"
return cmd_str
def _flash_hardware(self):
"""Runs the ESP-IDF flash command and waits for completion."""
print(f"--- Flashing hardware on {self.port} ---")
flash_cmd = f"idf.py -p {self.port} flash"
# We run this synchronously because we can't monitor until flashing is done
result = subprocess.run(flash_cmd, shell=True)
if result.returncode != 0:
raise Exception("Failed to flash ESP32 hardware.")
print("--- Flash Complete ---")
def start(self):
# 1. If REAL mode, flash the chip first
if self.mode == "REAL":
self._flash_hardware()
# 2. Start the process (either QEMU or idf.py monitor)
cmd = self._get_command()
print(f"Starting execution mode: {self.mode}")
print(f"Command: {cmd}", flush=True)
kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"text": True,
"bufsize": 1,
"shell": True,
"env": os.environ.copy()
}
if os.name == 'posix':
kwargs.update(preexec_fn=os.setsid)
self.process = subprocess.Popen(cmd, **kwargs)
# 3. Start the background reader thread
self.thread = threading.Thread(target=self._reader_thread, daemon=True)
self.thread.start()
def _reader_thread(self):
"""Reads output and puts it into a queue."""
# Note: idf.py monitor adds some ANSI escape codes for colors
# You might want to strip those if you need raw text
for line in iter(self.process.stdout.readline, ''):
if line:
self.output_queue.put(line.strip())
if self.stop_event.is_set():
break
self.process.stdout.close()
def stop(self):
"""Safely stops the process group."""
self.stop_event.set()
if self.process:
print(f"Stopping {self.mode} process...")
if os.name == 'posix':
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
pass
else:
# Windows specific kill
subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)],
capture_output=True)
self.process.wait()
def get_line(self, timeout=0.1):
"""Returns a single line from the queue with an ISO timestamp."""
try:
line = self.output_queue.get(timeout=timeout)
timestamp = datetime.datetime.now().isoformat()
return f"[{timestamp}] {line}"
except Empty:
return None

View File

@@ -0,0 +1,45 @@
import sys
import os
# Get the absolute path to the folder you want to add
# Example: Adding a folder named 'my_components' located in the current directory
folder_path = os.path.abspath(os.path.join("components", "system_tests"))
if folder_path not in sys.path:
sys.path.append(folder_path)
from scan_serial import ESP32Runner
import time
def test_system_initialize():
runner = ESP32Runner(mode="REAL", port="/dev/ttyUSB0")
runner.start()
# Force print to terminal immediately
print("--- serial Started ---", flush=True)
try:
start_time = time.time()
# Loop for 30 seconds
while time.time() - start_time < 30:
# Wait up to 1 second for a line
line = runner.get_line(timeout=1.0)
if line:
print(line, flush=True) # flush=True is critical
if "System initialization complete - entering main loop" in line:
print("SUCCESS CRITERIA MET!", flush=True)
return 0
# Check if the process died unexpectedly
if runner.process.poll() is not None:
print(f"Process exited with code: {runner.process.returncode}", flush=True)
return 1
finally:
runner.stop()
print("Done.", flush=True)
return 1
# --- Main Logic ---
if __name__ == "__main__":
exit_code = test_system_initialize()
sys.exit(exit_code)

View File

@@ -0,0 +1,45 @@
import sys
import os
# Get the absolute path to the folder you want to add
# Example: Adding a folder named 'my_components' located in the current directory
folder_path = os.path.abspath(os.path.join("components", "system_tests"))
if folder_path not in sys.path:
sys.path.append(folder_path)
from scan_serial import ESP32Runner
import time
def test_system_initialize():
runner = ESP32Runner(mode="SIM", port="COM9")
runner.start()
# Force print to terminal immediately
print("--- QEMU Runner Started ---", flush=True)
try:
start_time = time.time()
# Loop for 30 seconds
while time.time() - start_time < 30:
# Wait up to 1 second for a line
line = runner.get_line(timeout=1.0)
if line:
print(line, flush=True) # flush=True is critical
if "System initialization complete - entering main loop" in line:
print("SUCCESS CRITERIA MET!", flush=True)
return 0
# Check if the process died unexpectedly
if runner.process.poll() is not None:
print(f"Process exited with code: {runner.process.returncode}", flush=True)
return 1
finally:
runner.stop()
print("Done.", flush=True)
return 1
# --- Main Logic ---
if __name__ == "__main__":
exit_code = test_system_initialize()
sys.exit(exit_code)