From 86bc6505c88dad7a0d242111ff7fac425f097212 Mon Sep 17 00:00:00 2001 From: mahmamdouh Date: Mon, 9 Feb 2026 16:41:40 +0100 Subject: [PATCH] Update ports --- hil_lib.py | 71 +++++++++++++++++++++++++------------------------- run_test.py | 74 +++++++++++++++++++++++++++++------------------------ 2 files changed, 76 insertions(+), 69 deletions(-) diff --git a/hil_lib.py b/hil_lib.py index 6ddbb9e..1aa342b 100644 --- a/hil_lib.py +++ b/hil_lib.py @@ -2,14 +2,15 @@ import serial import struct import time -class HILController: - def __init__(self, emulator_port, target_port, baud=921600): - # Port for the ESP32 acting as the Sensor - self.emulator = serial.Serial(emulator_port, baud, timeout=0.1) - # Port for the ESP32 running your application code +class SCD30_HIL_Lib: + def __init__(self, emulator_port='/dev/i2c_emulator', target_port='/dev/esp_sensor_test', baud=921600): + # High speed for the bridge to minimize I2C latency + self.emulator = serial.Serial(emulator_port, baud, timeout=0.05) + # Standard speed for the Target application logs self.target = serial.Serial(target_port, 115200, timeout=0.1) - def calculate_scd30_crc(self, data): + def _calculate_crc8(self, data): + """SCD30 specific CRC-8 (Polynomial 0x31)""" crc = 0xFF for byte in data: crc ^= byte @@ -21,40 +22,38 @@ class HILController: crc &= 0xFF return crc - def pack_float_scd30(self, value): - """Packs a float into the SCD30 [MSB, LSB, CRC, MSB, LSB, CRC] format""" - b = struct.pack('>f', value) - w1 = [b[0], b[1]] - w2 = [b[2], b[3]] - return w1 + [self.calculate_scd30_crc(w1)] + w2 + [self.calculate_scd30_crc(w2)] + def _pack_float(self, value): + """Converts float to [MSB, LSB, CRC, MSB2, LSB2, CRC]""" + b = struct.pack('>f', value) # Big-endian IEEE754 + w1, w2 = [b[0], b[1]], [b[2], b[3]] + return w1 + [self._calculate_crc8(w1)] + w2 + [self._calculate_crc8(w2)] - def set_emulator_data(self, co2, temp, hum): - """Prepares the DATA: hex string for the Emulator ESP32""" - # SCD30 Ready Status (0x0001 + CRC) - ready_hex = "000103" - - # Build the 18-byte measurement buffer - payload = self.pack_float_scd30(co2) + \ - self.pack_float_scd30(temp) + \ - self.pack_float_scd30(hum) - - data_hex = "".join(f"{b:02X}" for b in payload) - - # We check for CMD from emulator and respond + def process_bridge(self, co2_val, temp_val, hum_val): + """ + Listens for 'CMD:' from Emulator and replies with 'DATA:' + Returns True if a measurement was successfully served. + """ line = self.emulator.readline().decode('utf-8', errors='ignore').strip() - if "CMD:0202" in line: # Master asking if data is ready - self.emulator.write(f"DATA:{ready_hex}\n".encode()) - elif "CMD:0300" in line: # Master asking for the 18 bytes - self.emulator.write(f"DATA:{data_hex}\n".encode()) - return True + + if line.startswith("CMD:"): + cmd = line[4:] + + if "0202" in cmd: # Data Ready Status request + resp = [0x00, 0x01, self._calculate_crc8([0x00, 0x01])] + self.emulator.write(f"DATA:{''.join(f'{b:02X}' for b in resp)}\n".encode()) + + elif "0300" in cmd: # Read Measurement request + payload = self._pack_float(co2_val) + \ + self._pack_float(temp_val) + \ + self._pack_float(hum_val) + hex_data = "".join(f"{b:02X}" for b in payload) + self.emulator.write(f"DATA:{hex_data}\n".encode()) + return True # Measurement cycle complete return False - def read_target_output(self): - """Reads the latest print statement from the Test Code ESP32""" - line = self.target.readline().decode('utf-8', errors='ignore').strip() - if "CO2:" in line: - return line - return None + def get_target_reading(self): + """Captures the output from the Target ESP32 Serial Monitor""" + return self.target.readline().decode('utf-8', errors='ignore').strip() def close(self): self.emulator.close() diff --git a/run_test.py b/run_test.py index 3f95672..63518aa 100644 --- a/run_test.py +++ b/run_test.py @@ -1,48 +1,56 @@ import csv import time -from hil_lib import HILController +from hil_lib import SCD30_HIL_Lib -# --- SET YOUR PORTS HERE --- -EMULATOR_PORT = 'COM4' # The Bridge -TARGET_PORT = 'COM5' # The App -FILENAME = 'hil_test_results.csv' +# Device paths from your environment +EMULATOR = '/dev/i2c_emulator' +TARGET = '/dev/esp_sensor_test' +LOG_FILE = 'hil_validation_results.csv' -hil = HILController(EMULATOR_PORT, TARGET_PORT) +# Initialize HIL +hil = SCD30_HIL_Lib(EMULATOR, TARGET) -print(f"Starting HIL Test... Saving to {FILENAME}") +print(f"--- SCD30 HIL Test Started ---") +print(f"Logging to: {LOG_FILE}") -with open(FILENAME, mode='w', newline='') as file: - writer = csv.writer(file) - writer.writerow(["Simulated_CO2", "Target_Read_Output", "Timestamp"]) +with open(LOG_FILE, mode='w', newline='') as f: + writer = csv.writer(f) + writer.writerow(["Timestamp", "Simulated_CO2", "Target_Received_Msg", "Status"]) + + # Test cases: varying CO2 levels + test_points = [400.0, 850.5, 1200.0, 2500.0, 5000.0] try: - # Test values to cycle through - test_values = [400.0, 600.5, 850.0, 1200.0, 2500.0] - - for val in test_values: - print(f"\nTesting CO2: {val} ppm") + for co2 in test_points: + print(f"\n[PHASE] Setting Simulated CO2 to {co2} ppm") - # 1. Put data on the bus (Wait for emulator to request it) - start_time = time.time() - success = False - while time.time() - start_tick < 5: # 5 second timeout per sample - if hil.set_emulator_data(co2=val, temp=25.0, hum=50.0): - success = True + # 1. Feed the bridge until the Target performs a successful Read + # We loop briefly to wait for the Target's polling interval + timeout = time.time() + 10 + data_served = False + while time.time() < timeout: + if hil.process_bridge(co2, 24.0, 50.0): + data_served = True break - # 2. Read what the Target ESP32 saw on its Serial Monitor - time.sleep(1) # Give the Target a moment to process and print - target_data = hil.read_target_output() + if not data_served: + print("Error: Target didn't request data within 10s.") + continue + + # 2. Capture what the Target actually saw + # Wait for Target to print the result to Serial + time.sleep(0.5) + target_msg = hil.get_target_reading() - if target_data: - print(f"Target Received: {target_data}") - # 3. Store in CSV - writer.writerow([val, target_data, time.ctime()]) - file.flush() # Ensure data is written to disk - else: - print("Warning: Target did not report data in time.") + # 3. Simple Validation + status = "PASS" if str(int(co2)) in target_msg else "CHECK" + + print(f"[RESULT] Target Output: {target_msg}") + writer.writerow([time.strftime("%H:%M:%S"), co2, target_msg, status]) + f.flush() except KeyboardInterrupt: - print("Test stopped by user.") + print("\nTest Aborted.") finally: - hil.close() \ No newline at end of file + hil.close() + print("--- HIL Terminated ---") \ No newline at end of file