cleanup sw req
This commit is contained in:
Binary file not shown.
109
1 software design/draft/components/system_tests/scan_serial.py
Normal file
109
1 software design/draft/components/system_tests/scan_serial.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
from queue import Queue, Empty
|
||||
import datetime
|
||||
|
||||
class ESP32Runner:
|
||||
def __init__(self, mode="SIM", port="COM9"):
|
||||
self.mode = mode.upper()
|
||||
self.port = port
|
||||
self.process = None
|
||||
self.output_queue = Queue()
|
||||
self.stop_event = threading.Event()
|
||||
|
||||
def _get_command(self):
|
||||
"""Constructs the command based on SIM or REAL mode."""
|
||||
if self.mode == "REAL":
|
||||
# For real hardware, we use idf.py monitor
|
||||
# We assume idf.py is in the PATH
|
||||
cmd_str = f"idf.py -p {self.port} monitor"
|
||||
else:
|
||||
# Existing QEMU command
|
||||
cmd_str = (
|
||||
"qemu-system-xtensa -M esp32 -m 4M "
|
||||
"-drive file=build/qemu_flash.bin,if=mtd,format=raw "
|
||||
"-nographic -serial mon:stdio"
|
||||
)
|
||||
|
||||
if os.name == 'posix':
|
||||
# Wrap with stdbuf to force line-buffering through the pipes
|
||||
return f"stdbuf -oL -eL {cmd_str}"
|
||||
return cmd_str
|
||||
|
||||
def _flash_hardware(self):
|
||||
"""Runs the ESP-IDF flash command and waits for completion."""
|
||||
print(f"--- Flashing hardware on {self.port} ---")
|
||||
flash_cmd = f"idf.py -p {self.port} flash"
|
||||
# We run this synchronously because we can't monitor until flashing is done
|
||||
result = subprocess.run(flash_cmd, shell=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception("Failed to flash ESP32 hardware.")
|
||||
print("--- Flash Complete ---")
|
||||
|
||||
def start(self):
|
||||
# 1. If REAL mode, flash the chip first
|
||||
if self.mode == "REAL":
|
||||
self._flash_hardware()
|
||||
|
||||
# 2. Start the process (either QEMU or idf.py monitor)
|
||||
cmd = self._get_command()
|
||||
print(f"Starting execution mode: {self.mode}")
|
||||
print(f"Command: {cmd}", flush=True)
|
||||
|
||||
kwargs = {
|
||||
"stdout": subprocess.PIPE,
|
||||
"stderr": subprocess.STDOUT,
|
||||
"text": True,
|
||||
"bufsize": 1,
|
||||
"shell": True,
|
||||
"env": os.environ.copy()
|
||||
}
|
||||
|
||||
if os.name == 'posix':
|
||||
kwargs.update(preexec_fn=os.setsid)
|
||||
|
||||
self.process = subprocess.Popen(cmd, **kwargs)
|
||||
|
||||
# 3. Start the background reader thread
|
||||
self.thread = threading.Thread(target=self._reader_thread, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def _reader_thread(self):
|
||||
"""Reads output and puts it into a queue."""
|
||||
# Note: idf.py monitor adds some ANSI escape codes for colors
|
||||
# You might want to strip those if you need raw text
|
||||
for line in iter(self.process.stdout.readline, ''):
|
||||
if line:
|
||||
self.output_queue.put(line.strip())
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
self.process.stdout.close()
|
||||
|
||||
def stop(self):
|
||||
"""Safely stops the process group."""
|
||||
self.stop_event.set()
|
||||
if self.process:
|
||||
print(f"Stopping {self.mode} process...")
|
||||
if os.name == 'posix':
|
||||
try:
|
||||
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
else:
|
||||
# Windows specific kill
|
||||
subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)],
|
||||
capture_output=True)
|
||||
self.process.wait()
|
||||
|
||||
def get_line(self, timeout=0.1):
|
||||
"""Returns a single line from the queue with an ISO timestamp."""
|
||||
try:
|
||||
line = self.output_queue.get(timeout=timeout)
|
||||
timestamp = datetime.datetime.now().isoformat()
|
||||
return f"[{timestamp}] {line}"
|
||||
except Empty:
|
||||
return None
|
||||
@@ -0,0 +1,45 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Get the absolute path to the folder you want to add
|
||||
# Example: Adding a folder named 'my_components' located in the current directory
|
||||
folder_path = os.path.abspath(os.path.join("components", "system_tests"))
|
||||
if folder_path not in sys.path:
|
||||
sys.path.append(folder_path)
|
||||
|
||||
from scan_serial import ESP32Runner
|
||||
import time
|
||||
|
||||
def test_system_initialize():
|
||||
runner = ESP32Runner(mode="REAL", port="/dev/ttyUSB0")
|
||||
runner.start()
|
||||
|
||||
# Force print to terminal immediately
|
||||
print("--- serial Started ---", flush=True)
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
# Loop for 30 seconds
|
||||
while time.time() - start_time < 30:
|
||||
# Wait up to 1 second for a line
|
||||
line = runner.get_line(timeout=1.0)
|
||||
if line:
|
||||
print(line, flush=True) # flush=True is critical
|
||||
|
||||
if "System initialization complete - entering main loop" in line:
|
||||
print("SUCCESS CRITERIA MET!", flush=True)
|
||||
return 0
|
||||
|
||||
# Check if the process died unexpectedly
|
||||
if runner.process.poll() is not None:
|
||||
print(f"Process exited with code: {runner.process.returncode}", flush=True)
|
||||
return 1
|
||||
finally:
|
||||
runner.stop()
|
||||
print("Done.", flush=True)
|
||||
return 1
|
||||
|
||||
# --- Main Logic ---
|
||||
if __name__ == "__main__":
|
||||
exit_code = test_system_initialize()
|
||||
sys.exit(exit_code)
|
||||
@@ -0,0 +1,45 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Get the absolute path to the folder you want to add
|
||||
# Example: Adding a folder named 'my_components' located in the current directory
|
||||
folder_path = os.path.abspath(os.path.join("components", "system_tests"))
|
||||
if folder_path not in sys.path:
|
||||
sys.path.append(folder_path)
|
||||
|
||||
from scan_serial import ESP32Runner
|
||||
import time
|
||||
|
||||
def test_system_initialize():
|
||||
runner = ESP32Runner(mode="SIM", port="COM9")
|
||||
runner.start()
|
||||
|
||||
# Force print to terminal immediately
|
||||
print("--- QEMU Runner Started ---", flush=True)
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
# Loop for 30 seconds
|
||||
while time.time() - start_time < 30:
|
||||
# Wait up to 1 second for a line
|
||||
line = runner.get_line(timeout=1.0)
|
||||
if line:
|
||||
print(line, flush=True) # flush=True is critical
|
||||
|
||||
if "System initialization complete - entering main loop" in line:
|
||||
print("SUCCESS CRITERIA MET!", flush=True)
|
||||
return 0
|
||||
|
||||
# Check if the process died unexpectedly
|
||||
if runner.process.poll() is not None:
|
||||
print(f"Process exited with code: {runner.process.returncode}", flush=True)
|
||||
return 1
|
||||
finally:
|
||||
runner.stop()
|
||||
print("Done.", flush=True)
|
||||
return 1
|
||||
|
||||
# --- Main Logic ---
|
||||
if __name__ == "__main__":
|
||||
exit_code = test_system_initialize()
|
||||
sys.exit(exit_code)
|
||||
Reference in New Issue
Block a user