import serial import struct import time class SCD30_HIL_Lib: def __init__(self, emulator_port='/dev/i2c_emulator', target_port='/dev/esp_sensor_test', baud=921600): # High speed for the bridge to minimize I2C latency self.emulator = serial.Serial(emulator_port, baud, timeout=0.05) # Standard speed for the Target application logs self.target = serial.Serial(target_port, 115200, timeout=0.1) def _calculate_crc8(self, data): """SCD30 specific CRC-8 (Polynomial 0x31)""" crc = 0xFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x80: crc = (crc << 1) ^ 0x31 else: crc <<= 1 crc &= 0xFF return crc def _pack_float(self, value): """Converts float to [MSB, LSB, CRC, MSB2, LSB2, CRC]""" b = struct.pack('>f', value) # Big-endian IEEE754 w1, w2 = [b[0], b[1]], [b[2], b[3]] return w1 + [self._calculate_crc8(w1)] + w2 + [self._calculate_crc8(w2)] def process_bridge(self, co2_val, temp_val, hum_val): """ Listens for 'CMD:' from Emulator and replies with 'DATA:' Returns True if a measurement was successfully served. """ line = self.emulator.readline().decode('utf-8', errors='ignore').strip() if line.startswith("CMD:"): cmd = line[4:] if "0202" in cmd: # Data Ready Status request resp = [0x00, 0x01, self._calculate_crc8([0x00, 0x01])] self.emulator.write(f"DATA:{''.join(f'{b:02X}' for b in resp)}\n".encode()) elif "0300" in cmd: # Read Measurement request payload = self._pack_float(co2_val) + \ self._pack_float(temp_val) + \ self._pack_float(hum_val) hex_data = "".join(f"{b:02X}" for b in payload) self.emulator.write(f"DATA:{hex_data}\n".encode()) return True # Measurement cycle complete return False def get_target_reading(self): """Captures the output from the Target ESP32 Serial Monitor""" return self.target.readline().decode('utf-8', errors='ignore').strip() def close(self): self.emulator.close() self.target.close()