Update ports
This commit is contained in:
71
hil_lib.py
71
hil_lib.py
@@ -2,14 +2,15 @@ import serial
|
|||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
|
||||||
class HILController:
|
class SCD30_HIL_Lib:
|
||||||
def __init__(self, emulator_port, target_port, baud=921600):
|
def __init__(self, emulator_port='/dev/i2c_emulator', target_port='/dev/esp_sensor_test', baud=921600):
|
||||||
# Port for the ESP32 acting as the Sensor
|
# High speed for the bridge to minimize I2C latency
|
||||||
self.emulator = serial.Serial(emulator_port, baud, timeout=0.1)
|
self.emulator = serial.Serial(emulator_port, baud, timeout=0.05)
|
||||||
# Port for the ESP32 running your application code
|
# Standard speed for the Target application logs
|
||||||
self.target = serial.Serial(target_port, 115200, timeout=0.1)
|
self.target = serial.Serial(target_port, 115200, timeout=0.1)
|
||||||
|
|
||||||
def calculate_scd30_crc(self, data):
|
def _calculate_crc8(self, data):
|
||||||
|
"""SCD30 specific CRC-8 (Polynomial 0x31)"""
|
||||||
crc = 0xFF
|
crc = 0xFF
|
||||||
for byte in data:
|
for byte in data:
|
||||||
crc ^= byte
|
crc ^= byte
|
||||||
@@ -21,40 +22,38 @@ class HILController:
|
|||||||
crc &= 0xFF
|
crc &= 0xFF
|
||||||
return crc
|
return crc
|
||||||
|
|
||||||
def pack_float_scd30(self, value):
|
def _pack_float(self, value):
|
||||||
"""Packs a float into the SCD30 [MSB, LSB, CRC, MSB, LSB, CRC] format"""
|
"""Converts float to [MSB, LSB, CRC, MSB2, LSB2, CRC]"""
|
||||||
b = struct.pack('>f', value)
|
b = struct.pack('>f', value) # Big-endian IEEE754
|
||||||
w1 = [b[0], b[1]]
|
w1, w2 = [b[0], b[1]], [b[2], b[3]]
|
||||||
w2 = [b[2], b[3]]
|
return w1 + [self._calculate_crc8(w1)] + w2 + [self._calculate_crc8(w2)]
|
||||||
return w1 + [self.calculate_scd30_crc(w1)] + w2 + [self.calculate_scd30_crc(w2)]
|
|
||||||
|
|
||||||
def set_emulator_data(self, co2, temp, hum):
|
def process_bridge(self, co2_val, temp_val, hum_val):
|
||||||
"""Prepares the DATA: hex string for the Emulator ESP32"""
|
"""
|
||||||
# SCD30 Ready Status (0x0001 + CRC)
|
Listens for 'CMD:' from Emulator and replies with 'DATA:'
|
||||||
ready_hex = "000103"
|
Returns True if a measurement was successfully served.
|
||||||
|
"""
|
||||||
# Build the 18-byte measurement buffer
|
|
||||||
payload = self.pack_float_scd30(co2) + \
|
|
||||||
self.pack_float_scd30(temp) + \
|
|
||||||
self.pack_float_scd30(hum)
|
|
||||||
|
|
||||||
data_hex = "".join(f"{b:02X}" for b in payload)
|
|
||||||
|
|
||||||
# We check for CMD from emulator and respond
|
|
||||||
line = self.emulator.readline().decode('utf-8', errors='ignore').strip()
|
line = self.emulator.readline().decode('utf-8', errors='ignore').strip()
|
||||||
if "CMD:0202" in line: # Master asking if data is ready
|
|
||||||
self.emulator.write(f"DATA:{ready_hex}\n".encode())
|
if line.startswith("CMD:"):
|
||||||
elif "CMD:0300" in line: # Master asking for the 18 bytes
|
cmd = line[4:]
|
||||||
self.emulator.write(f"DATA:{data_hex}\n".encode())
|
|
||||||
return True
|
if "0202" in cmd: # Data Ready Status request
|
||||||
|
resp = [0x00, 0x01, self._calculate_crc8([0x00, 0x01])]
|
||||||
|
self.emulator.write(f"DATA:{''.join(f'{b:02X}' for b in resp)}\n".encode())
|
||||||
|
|
||||||
|
elif "0300" in cmd: # Read Measurement request
|
||||||
|
payload = self._pack_float(co2_val) + \
|
||||||
|
self._pack_float(temp_val) + \
|
||||||
|
self._pack_float(hum_val)
|
||||||
|
hex_data = "".join(f"{b:02X}" for b in payload)
|
||||||
|
self.emulator.write(f"DATA:{hex_data}\n".encode())
|
||||||
|
return True # Measurement cycle complete
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def read_target_output(self):
|
def get_target_reading(self):
|
||||||
"""Reads the latest print statement from the Test Code ESP32"""
|
"""Captures the output from the Target ESP32 Serial Monitor"""
|
||||||
line = self.target.readline().decode('utf-8', errors='ignore').strip()
|
return self.target.readline().decode('utf-8', errors='ignore').strip()
|
||||||
if "CO2:" in line:
|
|
||||||
return line
|
|
||||||
return None
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.emulator.close()
|
self.emulator.close()
|
||||||
|
|||||||
72
run_test.py
72
run_test.py
@@ -1,48 +1,56 @@
|
|||||||
import csv
|
import csv
|
||||||
import time
|
import time
|
||||||
from hil_lib import HILController
|
from hil_lib import SCD30_HIL_Lib
|
||||||
|
|
||||||
# --- SET YOUR PORTS HERE ---
|
# Device paths from your environment
|
||||||
EMULATOR_PORT = 'COM4' # The Bridge
|
EMULATOR = '/dev/i2c_emulator'
|
||||||
TARGET_PORT = 'COM5' # The App
|
TARGET = '/dev/esp_sensor_test'
|
||||||
FILENAME = 'hil_test_results.csv'
|
LOG_FILE = 'hil_validation_results.csv'
|
||||||
|
|
||||||
hil = HILController(EMULATOR_PORT, TARGET_PORT)
|
# Initialize HIL
|
||||||
|
hil = SCD30_HIL_Lib(EMULATOR, TARGET)
|
||||||
|
|
||||||
print(f"Starting HIL Test... Saving to {FILENAME}")
|
print(f"--- SCD30 HIL Test Started ---")
|
||||||
|
print(f"Logging to: {LOG_FILE}")
|
||||||
|
|
||||||
with open(FILENAME, mode='w', newline='') as file:
|
with open(LOG_FILE, mode='w', newline='') as f:
|
||||||
writer = csv.writer(file)
|
writer = csv.writer(f)
|
||||||
writer.writerow(["Simulated_CO2", "Target_Read_Output", "Timestamp"])
|
writer.writerow(["Timestamp", "Simulated_CO2", "Target_Received_Msg", "Status"])
|
||||||
|
|
||||||
|
# Test cases: varying CO2 levels
|
||||||
|
test_points = [400.0, 850.5, 1200.0, 2500.0, 5000.0]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Test values to cycle through
|
for co2 in test_points:
|
||||||
test_values = [400.0, 600.5, 850.0, 1200.0, 2500.0]
|
print(f"\n[PHASE] Setting Simulated CO2 to {co2} ppm")
|
||||||
|
|
||||||
for val in test_values:
|
# 1. Feed the bridge until the Target performs a successful Read
|
||||||
print(f"\nTesting CO2: {val} ppm")
|
# We loop briefly to wait for the Target's polling interval
|
||||||
|
timeout = time.time() + 10
|
||||||
# 1. Put data on the bus (Wait for emulator to request it)
|
data_served = False
|
||||||
start_time = time.time()
|
while time.time() < timeout:
|
||||||
success = False
|
if hil.process_bridge(co2, 24.0, 50.0):
|
||||||
while time.time() - start_tick < 5: # 5 second timeout per sample
|
data_served = True
|
||||||
if hil.set_emulator_data(co2=val, temp=25.0, hum=50.0):
|
|
||||||
success = True
|
|
||||||
break
|
break
|
||||||
|
|
||||||
# 2. Read what the Target ESP32 saw on its Serial Monitor
|
if not data_served:
|
||||||
time.sleep(1) # Give the Target a moment to process and print
|
print("Error: Target didn't request data within 10s.")
|
||||||
target_data = hil.read_target_output()
|
continue
|
||||||
|
|
||||||
if target_data:
|
# 2. Capture what the Target actually saw
|
||||||
print(f"Target Received: {target_data}")
|
# Wait for Target to print the result to Serial
|
||||||
# 3. Store in CSV
|
time.sleep(0.5)
|
||||||
writer.writerow([val, target_data, time.ctime()])
|
target_msg = hil.get_target_reading()
|
||||||
file.flush() # Ensure data is written to disk
|
|
||||||
else:
|
# 3. Simple Validation
|
||||||
print("Warning: Target did not report data in time.")
|
status = "PASS" if str(int(co2)) in target_msg else "CHECK"
|
||||||
|
|
||||||
|
print(f"[RESULT] Target Output: {target_msg}")
|
||||||
|
writer.writerow([time.strftime("%H:%M:%S"), co2, target_msg, status])
|
||||||
|
f.flush()
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("Test stopped by user.")
|
print("\nTest Aborted.")
|
||||||
finally:
|
finally:
|
||||||
hil.close()
|
hil.close()
|
||||||
|
print("--- HIL Terminated ---")
|
||||||
Reference in New Issue
Block a user