60 lines
2.3 KiB
Python
60 lines
2.3 KiB
Python
import serial
|
|
import struct
|
|
import time
|
|
|
|
class SCD30_HIL_Lib:
|
|
def __init__(self, emulator_port='/dev/i2c_emulator', target_port='/dev/esp_sensor_test', baud=921600):
|
|
# High speed for the bridge to minimize I2C latency
|
|
self.emulator = serial.Serial(emulator_port, baud, timeout=0.05)
|
|
# Standard speed for the Target application logs
|
|
self.target = serial.Serial(target_port, 115200, timeout=0.1)
|
|
|
|
def _calculate_crc8(self, data):
|
|
"""SCD30 specific CRC-8 (Polynomial 0x31)"""
|
|
crc = 0xFF
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x80:
|
|
crc = (crc << 1) ^ 0x31
|
|
else:
|
|
crc <<= 1
|
|
crc &= 0xFF
|
|
return crc
|
|
|
|
def _pack_float(self, value):
|
|
"""Converts float to [MSB, LSB, CRC, MSB2, LSB2, CRC]"""
|
|
b = struct.pack('>f', value) # Big-endian IEEE754
|
|
w1, w2 = [b[0], b[1]], [b[2], b[3]]
|
|
return w1 + [self._calculate_crc8(w1)] + w2 + [self._calculate_crc8(w2)]
|
|
|
|
def process_bridge(self, co2_val, temp_val, hum_val):
|
|
"""
|
|
Listens for 'CMD:' from Emulator and replies with 'DATA:'
|
|
Returns True if a measurement was successfully served.
|
|
"""
|
|
line = self.emulator.readline().decode('utf-8', errors='ignore').strip()
|
|
|
|
if line.startswith("CMD:"):
|
|
cmd = line[4:]
|
|
|
|
if "0202" in cmd: # Data Ready Status request
|
|
resp = [0x00, 0x01, self._calculate_crc8([0x00, 0x01])]
|
|
self.emulator.write(f"DATA:{''.join(f'{b:02X}' for b in resp)}\n".encode())
|
|
|
|
elif "0300" in cmd: # Read Measurement request
|
|
payload = self._pack_float(co2_val) + \
|
|
self._pack_float(temp_val) + \
|
|
self._pack_float(hum_val)
|
|
hex_data = "".join(f"{b:02X}" for b in payload)
|
|
self.emulator.write(f"DATA:{hex_data}\n".encode())
|
|
return True # Measurement cycle complete
|
|
return False
|
|
|
|
def get_target_reading(self):
|
|
"""Captures the output from the Target ESP32 Serial Monitor"""
|
|
return self.target.readline().decode('utf-8', errors='ignore').strip()
|
|
|
|
def close(self):
|
|
self.emulator.close()
|
|
self.target.close() |