194 lines
7.7 KiB
Python
194 lines
7.7 KiB
Python
import serial
|
|
import time
|
|
import struct
|
|
import argparse
|
|
import sys
|
|
|
|
# CRC-8 Calculation (Polynomial 0x31, Initial 0xFF)
|
|
def calculate_crc8(data):
|
|
crc = 0xFF
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x80:
|
|
crc = (crc << 1) ^ 0x31
|
|
else:
|
|
crc <<= 1
|
|
crc &= 0xFF
|
|
return crc
|
|
|
|
# Generate SCD30 Data Packet (18 bytes)
|
|
# Format: [BigEndianFloat1_High, BigEndianFloat1_Low, CRC, ...]
|
|
def generate_scd30_data(co2, temp, himidity):
|
|
# Pack floats as Big Endian (Network Order)
|
|
b_co2 = struct.pack('>f', co2)
|
|
b_temp = struct.pack('>f', temp)
|
|
b_rh = struct.pack('>f', himidity)
|
|
|
|
packet = bytearray()
|
|
|
|
# helper to append 2 bytes + CRC
|
|
def append_word_crc(b_val):
|
|
# b_val is 4 bytes
|
|
# First word (MSB 31-16)
|
|
w1 = b_val[0:2]
|
|
packet.extend(w1)
|
|
packet.append(calculate_crc8(w1))
|
|
# Second word (LSB 15-0)
|
|
w2 = b_val[2:4]
|
|
packet.extend(w2)
|
|
packet.append(calculate_crc8(w2))
|
|
|
|
append_word_crc(b_co2)
|
|
append_word_crc(b_temp)
|
|
append_word_crc(b_rh)
|
|
|
|
return packet
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='SCD30 HIL Controller')
|
|
parser.add_argument('port', help='Serial port (e.g., COM3 or /dev/ttyUSB0)')
|
|
parser.add_argument('--baud', type=int, default=921600, help='Baud rate')
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
ser = serial.Serial(args.port, args.baud, timeout=0.1)
|
|
print(f"Connected to {args.port} at {args.baud} baud")
|
|
except serial.SerialException as e:
|
|
print(f"Error opening serial port: {e}")
|
|
sys.exit(1)
|
|
|
|
# Simulation State
|
|
# 0: Idle
|
|
# 1: Measurement Started
|
|
state_measurement_active = False
|
|
|
|
# Synthetic Data Values
|
|
sim_co2 = 400.0
|
|
sim_temp = 25.0
|
|
sim_rh = 50.0
|
|
|
|
print("Starting HIL Loop...")
|
|
print("Commands: CMD:<hex string>")
|
|
|
|
buffer = ""
|
|
|
|
try:
|
|
while True:
|
|
# Simple line buffering
|
|
if ser.in_waiting:
|
|
data = ser.read(ser.in_waiting)
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
buffer += text
|
|
except:
|
|
pass
|
|
|
|
while '\n' in buffer:
|
|
line, buffer = buffer.split('\n', 1)
|
|
line = line.strip()
|
|
if not line.startswith("CMD:"):
|
|
continue
|
|
|
|
cmd_hex = line[4:]
|
|
try:
|
|
cmd_bytes = bytes.fromhex(cmd_hex)
|
|
except ValueError:
|
|
print(f"Invalid Hex: {cmd_hex}")
|
|
continue
|
|
|
|
# Process Command
|
|
# SCD30 Protocol:
|
|
# Write: [CMD_MSB, CMD_LSB, ARG...]
|
|
# Read: Wait for response
|
|
|
|
if len(cmd_bytes) < 2:
|
|
continue
|
|
|
|
command_id = (cmd_bytes[0] << 8) | cmd_bytes[1]
|
|
|
|
response = None
|
|
|
|
if command_id == 0x0010: # Start Continuous Measurement
|
|
print(f"RX: Start Measurement (Arg: {cmd_bytes[2:].hex()})")
|
|
state_measurement_active = True
|
|
# No data returned for writes, but emulator waits for DATA:?
|
|
# Wait, Emulator logic:
|
|
# 1. Master sends Write (Command).
|
|
# 2. Emulator sends CMD: to PC.
|
|
# 3. Emulator waits for DATA: from PC.
|
|
# For a WRITE command, the SCD30 ACK's.
|
|
# The Emulator implementation blindly expects DATA: from PC to load into TX buffer.
|
|
# BUT! If it's a WRITE transaction, the Master isn't reading anything back immediately?
|
|
# Wait. the SCD30 driver does `i2c_master_transmit`.
|
|
# It treats `0x0010` as a write.
|
|
# The Emulator logic (Step 27, lines 58-60) uses `i2c_slave_receive`.
|
|
# If Master writes, Slave receives.
|
|
# Logic:
|
|
# Slave gets data.
|
|
# Sends CMD:<data> to PC.
|
|
# Waits for DATA:<response> from PC.
|
|
# Then fills TX buffer `i2c_slave_transmit`.
|
|
# Wait. If Master only WROTE, it won't read back immediately.
|
|
# So the `i2c_slave_transmit` will TIMEOUT or BLOCK until Master reads?
|
|
# If Master doesn't read, the Slave TX buffer loads but never sends.
|
|
# The Master just stops.
|
|
# So for Write commands (0x0010), we should probably send an empty DATA:?
|
|
# Or dummy data.
|
|
# Ideally, we send empty string "DATA:"?
|
|
# Let's check Emulator code.
|
|
# Line 92: `i2c_slave_transmit(..., tx_len ...)`.
|
|
# If tx_len is 0, what happens?
|
|
# Just prepares 0 bytes?
|
|
# If Master tries to read later, it gets nothing?
|
|
# But Master IS NOT reading. Master just did a Write.
|
|
# So it's fine if we send nothing to TX buffer?
|
|
# Or does `i2c_slave_transmit` block?
|
|
# It has a timeout of 50ms (Line 92).
|
|
# Since Master isn't clocking a Read, Slave Transmit will timeout.
|
|
# This works fine, actually. The call will just exit after 50ms.
|
|
# So for Write commands from Master, we don't strictly need to provide data,
|
|
# but we MUST send "DATA:" line to release the PC wait loop in Emulator.
|
|
response = b''
|
|
|
|
elif command_id == 0x0202: # Get Ready Status
|
|
# Returns 1 (Ready)
|
|
# Format: [0x00, 0x01, CRC]
|
|
print("RX: Poll Ready Status")
|
|
# 0x00 0x01
|
|
# CRC of 0x00 0x01 is ...
|
|
# calc_crc(b'\x00\x01')
|
|
val = bytes([0x00, 0x01])
|
|
crc = calculate_crc8(val)
|
|
response = val + bytes([crc])
|
|
|
|
elif command_id == 0x0300: # Read Measurement
|
|
print("RX: Read Measurement")
|
|
# Generate random variation
|
|
sim_co2 += 0.5
|
|
if sim_co2 > 420: sim_co2 = 400.0
|
|
|
|
response = generate_scd30_data(sim_co2, sim_temp, sim_rh)
|
|
print(f"TX: CO2={sim_co2:.1f}, T={sim_temp:.1f}, RH={sim_rh:.1f}")
|
|
|
|
else:
|
|
print(f"Unknown Command: {command_id:04X}")
|
|
# Send dummy empty
|
|
response = b''
|
|
|
|
# Send response back
|
|
# Format: DATA:<hex>
|
|
resp_hex = response.hex().upper()
|
|
msg = f"DATA:{resp_hex}\n"
|
|
ser.write(msg.encode('utf-8'))
|
|
|
|
# Small sleep to prevent CPU hogging
|
|
time.sleep(0.001)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...")
|
|
ser.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|