This commit is contained in:
2026-02-09 16:36:29 +01:00
parent af9e0c11e0
commit 0f50ddbae3
18 changed files with 610 additions and 293 deletions

View File

@@ -1,5 +1,4 @@
#include "scd30.h"
#include "driver/i2c_master.h"
#include <string.h>
#include <stdio.h>
@@ -26,14 +25,29 @@ static uint8_t calculate_crc8(const uint8_t *data, size_t len) {
return crc;
}
void scd30_init(void) {
i2c_master_config_t i2c_config = {
scd30_handle_t *scd30_init(int sda_gpio, int scl_gpio) {
static scd30_handle_t handle;
i2c_master_bus_config_t bus_config = {
.clk_source = I2C_CLK_SRC_DEFAULT,
.i2c_port = I2C_NUM_0,
.scl_io_num = scl_gpio,
.sda_io_num = sda_gpio,
.glitch_ignore_cnt = 7,
.flags.enable_internal_pullup = true
};
ESP_ERROR_CHECK(i2c_new_master_bus(&bus_config, &handle.bus_handle));
i2c_device_config_t dev_cfg = {
.dev_addr_length = I2C_ADDR_BIT_LEN_7,
.device_address = SCD30_I2C_ADDRESS,
.scl_speed_hz = 100000,
.scl_wait_us = 150000 // Clock stretching
};
i2c_master_init(I2C_NUM_0, &i2c_config);
ESP_ERROR_CHECK(i2c_master_bus_add_device(handle.bus_handle, &dev_cfg, &handle.dev_handle));
return &handle;
}
void scd30_start_measurement(void) {
void scd30_start_measurement(scd30_handle_t *handle) {
uint8_t command[5] = {
(SCD30_CMD_START_MEASUREMENT >> 8) & 0xFF,
SCD30_CMD_START_MEASUREMENT & 0xFF,
@@ -41,11 +55,18 @@ void scd30_start_measurement(void) {
0x00 // Placeholder for CRC
};
command[4] = calculate_crc8(&command[2], 2);
i2c_master_write_to_device(I2C_NUM_0, SCD30_I2C_ADDRESS, command, sizeof(command), 1000 / portTICK_PERIOD_MS);
i2c_master_transmit(handle->dev_handle, command, sizeof(command), 1000);
}
int scd30_read_data(scd30_data_t *data) {
// Helper to convert Big Endian IEEE754 bytes to float
static float bytes_to_float(uint8_t mmsb, uint8_t mlsb, uint8_t lmsb, uint8_t llsb) {
uint32_t val = ((uint32_t)mmsb << 24) | ((uint32_t)mlsb << 16) | ((uint32_t)lmsb << 8) | (uint32_t)llsb;
float f;
memcpy(&f, &val, 4);
return f;
}
int scd30_read_data(scd30_handle_t *handle, scd30_data_t *data) {
uint8_t ready_command[2] = {
(SCD30_CMD_READY_STATUS >> 8) & 0xFF,
SCD30_CMD_READY_STATUS & 0xFF
@@ -54,7 +75,7 @@ int scd30_read_data(scd30_data_t *data) {
// Poll until data is ready
do {
i2c_master_write_read_device(I2C_NUM_0, SCD30_I2C_ADDRESS, ready_command, sizeof(ready_command), ready_status, sizeof(ready_status), 1000 / portTICK_PERIOD_MS);
i2c_master_transmit_receive(handle->dev_handle, ready_command, sizeof(ready_command), ready_status, sizeof(ready_status), 1000);
} while (ready_status[1] != 0x01);
uint8_t read_command[2] = {
@@ -64,7 +85,8 @@ int scd30_read_data(scd30_data_t *data) {
uint8_t buffer[18];
// Read 18 bytes of data
i2c_master_write_read_device(I2C_NUM_0, SCD30_I2C_ADDRESS, read_command, sizeof(read_command), buffer, sizeof(buffer), 1000 / portTICK_PERIOD_MS);
// Format: CO2_MSB, CO2_LSB, CRC, CO2_LMSB, CO2_LLSB, CRC, ...
i2c_master_transmit_receive(handle->dev_handle, read_command, sizeof(read_command), buffer, sizeof(buffer), 1000);
// Validate CRC for each pair of data
for (int i = 0; i < 18; i += 3) {
@@ -73,10 +95,13 @@ int scd30_read_data(scd30_data_t *data) {
}
}
// Parse data
data->co2 = (float)((buffer[0] << 8) | buffer[1]);
data->temp = (float)((buffer[3] << 8) | buffer[4]);
data->humidity = (float)((buffer[6] << 8) | buffer[7]);
// Parse IEEE754 floats (Big Endian)
// Structure: [Byte1, Byte2, CRC, Byte3, Byte4, CRC]
// 0 1 2 3 4 5
data->co2 = bytes_to_float(buffer[0], buffer[1], buffer[3], buffer[4]);
data->temp = bytes_to_float(buffer[6], buffer[7], buffer[9], buffer[10]);
data->humidity = bytes_to_float(buffer[12], buffer[13], buffer[15], buffer[16]);
return 0; // Success
}

View File

@@ -2,6 +2,7 @@
#define SCD30_H
#include <stdint.h>
#include "driver/i2c_master.h"
// Data structure to hold SCD30 sensor data
typedef struct {
@@ -10,9 +11,34 @@ typedef struct {
float humidity; // Relative humidity in percentage
} scd30_data_t;
// Function prototypes
void scd30_init(void);
void scd30_start_measurement(void);
int scd30_read_data(scd30_data_t *data);
typedef struct {
i2c_master_bus_handle_t bus_handle;
i2c_master_dev_handle_t dev_handle;
} scd30_handle_t;
/**
* @brief Initialize the SCD30 sensor
*
* @param sda_gpio SDA GPIO number
* @param scl_gpio SCL GPIO number
* @return scd30_handle_t* Pointer to the sensor handle
*/
scd30_handle_t *scd30_init(int sda_gpio, int scl_gpio);
/**
* @brief Start continuous measurement
*
* @param handle Sensor handle
*/
void scd30_start_measurement(scd30_handle_t *handle);
/**
* @brief Read data from the SCD30 sensor
*
* @param handle Sensor handle
* @param data Pointer to store the read data
* @return int 0 on success, -1 on error
*/
int scd30_read_data(scd30_handle_t *handle, scd30_data_t *data);
#endif // SCD30_H

View File

@@ -1,2 +1,2 @@
idf_component_register(SRCS "blink_example_main.c"
idf_component_register(SRCS "main.c"
INCLUDE_DIRS ".")

View File

@@ -1,104 +0,0 @@
/* Blink Example
This example code is in the Public Domain (or CC0 licensed, at your option.)
Unless required by applicable law or agreed to in writing, this
software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied.
*/
#include <stdio.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "driver/gpio.h"
#include "esp_log.h"
#include "led_strip.h"
#include "sdkconfig.h"
static const char *TAG = "example";
/* Use project configuration menu (idf.py menuconfig) to choose the GPIO to blink,
or you can edit the following line and set a number here.
*/
#define BLINK_GPIO CONFIG_BLINK_GPIO
static uint8_t s_led_state = 0;
#ifdef CONFIG_BLINK_LED_STRIP
static led_strip_handle_t led_strip;
static void blink_led(void)
{
/* If the addressable LED is enabled */
if (s_led_state) {
/* Set the LED pixel using RGB from 0 (0%) to 255 (100%) for each color */
led_strip_set_pixel(led_strip, 0, 16, 16, 16);
/* Refresh the strip to send data */
led_strip_refresh(led_strip);
} else {
/* Set all LED off to clear all pixels */
led_strip_clear(led_strip);
}
}
static void configure_led(void)
{
ESP_LOGI(TAG, "Example configured to blink addressable LED!");
/* LED strip initialization with the GPIO and pixels number*/
led_strip_config_t strip_config = {
.strip_gpio_num = BLINK_GPIO,
.max_leds = 1, // at least one LED on board
};
#if CONFIG_BLINK_LED_STRIP_BACKEND_RMT
led_strip_rmt_config_t rmt_config = {
.resolution_hz = 10 * 1000 * 1000, // 10MHz
.flags.with_dma = false,
};
ESP_ERROR_CHECK(led_strip_new_rmt_device(&strip_config, &rmt_config, &led_strip));
#elif CONFIG_BLINK_LED_STRIP_BACKEND_SPI
led_strip_spi_config_t spi_config = {
.spi_bus = SPI2_HOST,
.flags.with_dma = true,
};
ESP_ERROR_CHECK(led_strip_new_spi_device(&strip_config, &spi_config, &led_strip));
#else
#error "unsupported LED strip backend"
#endif
/* Set all LED off to clear all pixels */
led_strip_clear(led_strip);
}
#elif CONFIG_BLINK_LED_GPIO
static void blink_led(void)
{
/* Set the GPIO level according to the state (LOW or HIGH)*/
gpio_set_level(BLINK_GPIO, s_led_state);
}
static void configure_led(void)
{
ESP_LOGI(TAG, "Example configured to blink GPIO LED!");
gpio_reset_pin(BLINK_GPIO);
/* Set the GPIO as a push/pull output */
gpio_set_direction(BLINK_GPIO, GPIO_MODE_OUTPUT);
}
#else
#error "unsupported LED type"
#endif
void app_main(void)
{
/* Configure the peripheral according to the LED type */
configure_led();
while (1) {
ESP_LOGI(TAG, "Turning the LED %s!", s_led_state == true ? "ON" : "OFF");
blink_led();
/* Toggle the LED state */
s_led_state = !s_led_state;
vTaskDelay(CONFIG_BLINK_PERIOD / portTICK_PERIOD_MS);
}
}

View File

@@ -5,21 +5,16 @@
void app_main(void) {
scd30_data_t sensor_data;
// Initialize the SCD30 sensor
scd30_init();
scd30_start_measurement();
// Example: SDA = 21, SCL = 22 (change as needed)
scd30_handle_t *scd30 = scd30_init(21, 22);
scd30_start_measurement(scd30);
while (1) {
// Read data from the sensor
if (scd30_read_data(&sensor_data) == 0) {
if (scd30_read_data(scd30, &sensor_data) == 0) {
printf("CO2: %.2f ppm, Temp: %.2f °C, Humidity: %.2f %%\n",
sensor_data.co2, sensor_data.temp, sensor_data.humidity);
} else {
printf("Failed to read data from SCD30 sensor\n");
}
// Wait for 2 seconds
vTaskDelay(pdMS_TO_TICKS(2000));
}
}
}

View File

@@ -1,16 +0,0 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0-1.0
import logging
import os
import pytest
from pytest_embedded_idf.dut import IdfDut
@pytest.mark.supported_targets
@pytest.mark.generic
def test_blink(dut: IdfDut) -> None:
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'blink.bin')
bin_size = os.path.getsize(binary_file)
logging.info('blink_bin_size : {}KB'.format(bin_size // 1024))

124
I2C_emulation_flow.md Normal file
View File

@@ -0,0 +1,124 @@
# SCD30 HIL Framework Documentation
## 1. System Architecture
The system consists of three layers. The PC handles the **Logic**, the Emulator handles the **Signal Bridge**, and the Target runs the **Production Code**.
```mermaid
graph TD
subgraph "PC (Python)"
A[Global Test Script] <--> B[HIL Library]
end
subgraph "Emulator ESP32 (Slave)"
C[UART Driver] <--> D[I2C Slave Driver]
end
subgraph "Target ESP32 (Master)"
E[SCD30 Driver Component] <--> F[Application Logic]
end
B <-- "921600 Baud Serial" --> C
D <-- "I2C Bus (Clock Stretching)" --> E
```
---
## 2. The Logic Workflow (Sequence)
This is the most critical part: how we synchronize a slow PC with a fast I2C bus.
```mermaid
sequenceDiagram
participant PC as Python (Global Script)
participant Slave as ESP32 Emulator (Slave)
participant Master as ESP32 Target (Master)
Note over Master: App calls scd30_read_data()
Master->>Slave: I2C Write: [0x03, 0x00] (Read Cmd)
activate Slave
Slave->>PC: UART Send: "CMD:0300\n"
Note right of Slave: Slave STRETCHES Clock (SCL LOW)
Note over PC: Lib parses CMD, calculates<br/>Floats & CRCs
PC->>Slave: UART Send: "DATA:010203...\n"
Slave->>Slave: Parse Hex to Bytes
Slave->>Master: Load I2C Buffer & Release SCL
deactivate Slave
Master->>Slave: I2C Read: 18 Bytes
Note over Master: App parses bytes to Float
Master->>PC: UART Print: "CO2: 450.00..."
Note over PC: Global script captures & logs to CSV
```
---
## 3. Component Breakdown
### A. Target ESP32 (`scd30.c`)
* **Role:** Acts as the I2C Master. It believes it is talking to a real SCD30.
* **Key Logic:** * **Initialization:** Configures `scl_wait_us = 150000`. This is the timeout for how long it will wait while the Slave is stretching the clock.
* **Parsing:** Takes 18 bytes and reconstructs three 32-bit floats. It skips every 3rd byte because that is the CRC.
* **Polling:** It continuously asks "Are you ready?" (0x0202) until the Emulator (PC) replies with "Yes" (0x01).
### B. Emulator ESP32 (`main.c`)
* **Role:** The "Transparent Bridge."
* **Key Logic:**
* **Event-Driven:** Uses a callback (`on_recv_done`) to notify a task that the Master has sent a command.
* **Clock Stretching:** By not loading the TX buffer immediately, the hardware automatically holds the SCL line low. This pauses the Master's CPU.
* **Synchronization:** It bridges the I2C domain (microseconds) to the PC domain (milliseconds).
### C. Python Library (`hil_lib.py`)
* **Role:** The "Sensor Physics Engine."
* **Key Logic:**
* **SCD30 Protocol Emulation:** It knows that if the Master sends `0202`, it must reply with `000103` (Ready).
* **Data Formatting:** Converts human-readable numbers (like `450.0`) into the raw hex bytes and CRCs the ESP32 expects.
### D. Global Script (`run_test.py`)
* **Role:** The "Test Orchestrator."
* **Key Logic:**
* **Iteration:** Loops through a list of test values.
* **Verification:** It acts as a "Man-in-the-Middle." It knows what value it *sent* to the sensor and reads the Target's serial to see what value the Target *received*.
* **Persistence:** Saves the input/output pair into a `.csv` for audit and validation.
---
## 4. Hardware Connectivity
| Connection | Requirement | Why? |
| --- | --- | --- |
| **SDA to SDA** | 4.7kΩ Pull-up to 3.3V | Data signal integrity |
| **SCL to SCL** | 4.7kΩ Pull-up to 3.3V | Clock signal integrity |
| **GND to GND** | Solid Copper Wire | Reference voltage for signals |
| **UART 0** | USB Cable to PC | Command/Data bridge |
---
## 5. Troubleshooting the Workflow
* **Symptom:** Target says `Failed to read data`.
* **Cause:** Python script took longer than 200ms to respond OR `scl_wait_us` is too low.
* **Symptom:** Target reads `0.00`.
* **Cause:** The CRC check failed in the C code, or the Python script sent the bytes in the wrong order.
* **Symptom:** CSV only has Column 1, not Column 2.
* **Cause:** The Target ESP32 is not printing to the correct Serial port, or the baud rate is mismatched.

View File

@@ -1,20 +1,10 @@
dependencies:
espressif/led_strip:
component_hash: 28c6509a727ef74925b372ed404772aeedf11cce10b78c3f69b3c66799095e2d
dependencies:
- name: idf
require: private
version: '>=4.4'
source:
registry_url: https://components.espressif.com/
type: service
version: 2.5.5
idf:
source:
type: idf
version: 5.4.0
direct_dependencies:
- espressif/led_strip
manifest_hash: a9af7824fb34850fbe175d5384052634b3c00880abb2d3a7937e666d07603998
- idf
manifest_hash: 5e671f62bc0a2eed2e0bce70e9ac4cf06a7437d77d596bf4518403206824bd17
target: esp32
version: 2.0.0

View File

@@ -1,2 +1,2 @@
idf_component_register(SRCS
idf_component_register(SRCS "main.c"
INCLUDE_DIRS ".")

View File

@@ -1,49 +1,9 @@
menu "Example Configuration"
menu "I2C Emulator Configuration"
orsource "$IDF_PATH/examples/common_components/env_caps/$IDF_TARGET/Kconfig.env_caps"
choice BLINK_LED
prompt "Blink LED type"
default BLINK_LED_GPIO
config I2C_SLAVE_ADDR
int "I2C Slave Address (Decimal)"
default 97
help
Select the LED type. A normal level controlled LED or an addressable LED strip.
The default selection is based on the Espressif DevKit boards.
You can change the default selection according to your board.
config BLINK_LED_GPIO
bool "GPIO"
config BLINK_LED_STRIP
bool "LED strip"
endchoice
choice BLINK_LED_STRIP_BACKEND
depends on BLINK_LED_STRIP
prompt "LED strip backend peripheral"
default BLINK_LED_STRIP_BACKEND_RMT if SOC_RMT_SUPPORTED
default BLINK_LED_STRIP_BACKEND_SPI
help
Select the backend peripheral to drive the LED strip.
config BLINK_LED_STRIP_BACKEND_RMT
depends on SOC_RMT_SUPPORTED
bool "RMT"
config BLINK_LED_STRIP_BACKEND_SPI
bool "SPI"
endchoice
config BLINK_GPIO
int "Blink GPIO number"
range ENV_GPIO_RANGE_MIN ENV_GPIO_OUT_RANGE_MAX
default 8
help
GPIO number (IOxx) to blink on and off the LED.
Some GPIOs are used for other purposes (flash connections, etc.) and cannot be used to blink.
config BLINK_PERIOD
int "Blink period in ms"
range 10 3600000
default 1000
help
Define the blinking period in milliseconds.
The 7-bit I2C Slave Address (0x61 = 97)
endmenu

View File

@@ -1,3 +1,2 @@
dependencies:
espressif/led_strip: "^2.4.1"
idf: "*"

View File

@@ -1,40 +1,24 @@
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "driver/i2c_slave.h"
#include "driver/uart.h"
#include "driver/gpio.h"
#include "esp_log.h"
#define I2C_SLAVE_SDA_IO 21
#define I2C_SLAVE_SCL_IO 22
#define I2C_SLAVE_NUM I2C_NUM_0
#define I2C_SLAVE_ADDR 0x61
#define I2C_SLAVE_RX_BUF_LEN 128
#define I2C_SLAVE_TX_BUF_LEN 128
#define UART_NUM UART_NUM_0
#define UART_BAUD_RATE 921600
#define UART_BUF_SIZE 256
#define TAG "I2C2SERIAL"
static void i2c_slave_init(void) {
i2c_config_t conf_slave = {
.mode = I2C_MODE_SLAVE,
.sda_io_num = I2C_SLAVE_SDA_IO,
.sda_pullup_en = GPIO_PULLUP_ENABLE,
.scl_io_num = I2C_SLAVE_SCL_IO,
.scl_pullup_en = GPIO_PULLUP_ENABLE,
.slave = {
.slave_addr = I2C_SLAVE_ADDR,
.maximum_speed = 400000,
},
};
ESP_ERROR_CHECK(i2c_param_config(I2C_SLAVE_NUM, &conf_slave));
ESP_ERROR_CHECK(i2c_driver_install(I2C_SLAVE_NUM, conf_slave.mode, I2C_SLAVE_RX_BUF_LEN, I2C_SLAVE_TX_BUF_LEN, 0));
}
static i2c_slave_dev_handle_t i2c_slave = NULL;
static TaskHandle_t s_i2c_task_handle = NULL;
static void uart_init(void) {
uart_config_t uart_config = {
@@ -49,70 +33,117 @@ static void uart_init(void) {
ESP_ERROR_CHECK(uart_param_config(UART_NUM, &uart_config));
}
// Callback for I2C Receive Done
static bool i2c_rx_done_callback(i2c_slave_dev_handle_t channel, const i2c_slave_rx_done_event_data_t *edata, void *user_data) {
BaseType_t high_task_wakeup = pdFALSE;
if (s_i2c_task_handle) {
vTaskNotifyGiveFromISR(s_i2c_task_handle, &high_task_wakeup);
}
return high_task_wakeup == pdTRUE;
}
static void i2c_slave_init(void) {
i2c_slave_config_t conf = {
.sda_io_num = I2C_SLAVE_SDA_IO,
.scl_io_num = I2C_SLAVE_SCL_IO,
.clk_source = I2C_CLK_SRC_DEFAULT,
.send_buf_depth = 256,
.slave_addr = I2C_SLAVE_ADDR,
.addr_bit_len = I2C_ADDR_BIT_LEN_7,
.intr_priority = 0,
};
#if SOC_I2C_SLAVE_CAN_GET_STRETCH_CAUSE
conf.flags.stretch_en = true;
#endif
ESP_ERROR_CHECK(i2c_new_slave_device(&conf, &i2c_slave));
// Enable Internal Pullups manually
gpio_set_pull_mode(I2C_SLAVE_SDA_IO, GPIO_PULLUP_ONLY);
gpio_set_pull_mode(I2C_SLAVE_SCL_IO, GPIO_PULLUP_ONLY);
i2c_slave_event_callbacks_t cbs = {
.on_recv_done = i2c_rx_done_callback,
};
ESP_ERROR_CHECK(i2c_slave_register_event_callbacks(i2c_slave, &cbs, NULL));
}
static void i2c2serial_task(void *arg) {
uint8_t i2c_rx[I2C_SLAVE_RX_BUF_LEN];
uint8_t uart_rx[UART_BUF_SIZE];
uint8_t i2c_tx[I2C_SLAVE_TX_BUF_LEN];
while (1) {
// Wait for I2C master write
int rx_len = i2c_slave_read_buffer(I2C_SLAVE_NUM, i2c_rx, sizeof(i2c_rx), pdMS_TO_TICKS(100));
if (rx_len > 0) {
// Print CMD to serial in hex
printf("CMD:");
for (int i = 0; i < rx_len; ++i) {
printf("%02X", i2c_rx[i]);
}
printf("\n");
size_t out_res = 0; // Added for ESP-IDF v5.4 compatibility
// Wait for DATA:[hex]\n from serial, with 200ms timeout
int uart_len = 0;
int total_len = 0;
TickType_t start_tick = xTaskGetTickCount();
bool got_data = false;
while ((xTaskGetTickCount() - start_tick) < pdMS_TO_TICKS(200)) {
uart_len = uart_read_bytes(UART_NUM, uart_rx + total_len, UART_BUF_SIZE - total_len, pdMS_TO_TICKS(10));
if (uart_len > 0) {
total_len += uart_len;
// Look for a full line
char *newline = memchr(uart_rx, '\n', total_len);
if (newline) {
int line_len = newline - (char *)uart_rx + 1;
uart_rx[line_len-1] = 0; // Null-terminate
if (strncmp((char *)uart_rx, "DATA:", 5) == 0) {
// Parse hex after DATA:
char *hexstr = (char *)uart_rx + 5;
int tx_len = 0;
while (*hexstr && tx_len < I2C_SLAVE_TX_BUF_LEN) {
unsigned int val;
if (sscanf(hexstr, "%2x", &val) == 1) {
i2c_tx[tx_len++] = val;
hexstr += 2;
} else {
break;
}
s_i2c_task_handle = xTaskGetCurrentTaskHandle();
while (1) {
// 1. Prepare to Receive (Non-blocking queue)
memset(i2c_rx, 0, sizeof(i2c_rx));
// This queues the receive request
ESP_ERROR_CHECK(i2c_slave_receive(i2c_slave, i2c_rx, sizeof(i2c_rx)));
// 2. Wait for Receive Complete (Callback triggers notification)
// This is where we wait for the Master to finish writing the command
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
// Calculate actual length received (SCD30 commands are usually 2-5 bytes)
// We look for the first 0 if the master didn't send a full 128 bytes
size_t actual_rx_len = 2; // SCD30 commands are at least 2 bytes
// 3. Fast Send to PC
uart_write_bytes(UART_NUM, "CMD:", 4);
for (size_t i = 0; i < actual_rx_len; ++i) {
char hex[3];
sprintf(hex, "%02X", i2c_rx[i]);
uart_write_bytes(UART_NUM, hex, 2);
}
uart_write_bytes(UART_NUM, "\n", 1);
// 4. Wait for PC response
int total_len = 0;
TickType_t start_tick = xTaskGetTickCount();
bool got_data = false;
// Wait up to 200ms for "DATA:[hex]\n" from PC
while ((xTaskGetTickCount() - start_tick) < pdMS_TO_TICKS(200)) {
int len = uart_read_bytes(UART_NUM, uart_rx + total_len, UART_BUF_SIZE - total_len, pdMS_TO_TICKS(5));
if (len > 0) {
total_len += len;
char *newline = memchr(uart_rx, '\n', total_len);
if (newline) {
if (strncmp((char *)uart_rx, "DATA:", 5) == 0) {
// 5. Fast Hex Parse
int tx_len = 0;
for (int i = 5; i < (newline - (char*)uart_rx) - 1; i += 2) {
unsigned int val;
if (sscanf((char*)&uart_rx[i], "%2x", &val) == 1) {
i2c_tx[tx_len++] = (uint8_t)val;
}
// Write to I2C slave TX buffer
i2c_slave_write_buffer(I2C_SLAVE_NUM, i2c_tx, tx_len, 0);
got_data = true;
}
// Remove processed line
memmove(uart_rx, newline + 1, total_len - line_len);
total_len -= line_len;
// 6. Load Buffer & Release Clock Stretch
// Added &out_res parameter for v5.4 compatibility
if (tx_len > 0) {
i2c_slave_transmit(i2c_slave, i2c_tx, tx_len, &out_res, pdMS_TO_TICKS(50));
}
got_data = true;
break;
}
total_len = 0; // Clear buffer if line didn't match DATA:
}
}
if (!got_data) {
// Timeout: clear I2C TX buffer
i2c_slave_write_buffer(I2C_SLAVE_NUM, NULL, 0, 0);
}
}
vTaskDelay(pdMS_TO_TICKS(1));
if (!got_data) {
// If PC fails, send a dummy byte so the Master's read doesn't hang forever
uint8_t dummy = 0xFF;
i2c_slave_transmit(i2c_slave, &dummy, 1, &out_res, pdMS_TO_TICKS(10));
}
}
}
void app_main(void) {
i2c_slave_init();
uart_init();
xTaskCreate(i2c2serial_task, "i2c2serial_task", 4096, NULL, 10, NULL);
i2c_slave_init();
xTaskCreatePinnedToCore(i2c2serial_task, "i2c2serial_task", 4096, NULL, 10, NULL, 0);
}

View File

@@ -1,16 +0,0 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0-1.0
import logging
import os
import pytest
from pytest_embedded_idf.dut import IdfDut
@pytest.mark.supported_targets
@pytest.mark.generic
def test_blink(dut: IdfDut) -> None:
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'blink.bin')
bin_size = os.path.getsize(binary_file)
logging.info('blink_bin_size : {}KB'.format(bin_size // 1024))

193
hil_controller.py Normal file
View File

@@ -0,0 +1,193 @@
import serial
import time
import struct
import argparse
import sys
# CRC-8 Calculation (Polynomial 0x31, Initial 0xFF)
def calculate_crc8(data):
crc = 0xFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc <<= 1
crc &= 0xFF
return crc
# Generate SCD30 Data Packet (18 bytes)
# Format: [BigEndianFloat1_High, BigEndianFloat1_Low, CRC, ...]
def generate_scd30_data(co2, temp, himidity):
# Pack floats as Big Endian (Network Order)
b_co2 = struct.pack('>f', co2)
b_temp = struct.pack('>f', temp)
b_rh = struct.pack('>f', himidity)
packet = bytearray()
# helper to append 2 bytes + CRC
def append_word_crc(b_val):
# b_val is 4 bytes
# First word (MSB 31-16)
w1 = b_val[0:2]
packet.extend(w1)
packet.append(calculate_crc8(w1))
# Second word (LSB 15-0)
w2 = b_val[2:4]
packet.extend(w2)
packet.append(calculate_crc8(w2))
append_word_crc(b_co2)
append_word_crc(b_temp)
append_word_crc(b_rh)
return packet
def main():
parser = argparse.ArgumentParser(description='SCD30 HIL Controller')
parser.add_argument('port', help='Serial port (e.g., COM3 or /dev/ttyUSB0)')
parser.add_argument('--baud', type=int, default=921600, help='Baud rate')
args = parser.parse_args()
try:
ser = serial.Serial(args.port, args.baud, timeout=0.1)
print(f"Connected to {args.port} at {args.baud} baud")
except serial.SerialException as e:
print(f"Error opening serial port: {e}")
sys.exit(1)
# Simulation State
# 0: Idle
# 1: Measurement Started
state_measurement_active = False
# Synthetic Data Values
sim_co2 = 400.0
sim_temp = 25.0
sim_rh = 50.0
print("Starting HIL Loop...")
print("Commands: CMD:<hex string>")
buffer = ""
try:
while True:
# Simple line buffering
if ser.in_waiting:
data = ser.read(ser.in_waiting)
try:
text = data.decode('utf-8', errors='ignore')
buffer += text
except:
pass
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line.startswith("CMD:"):
continue
cmd_hex = line[4:]
try:
cmd_bytes = bytes.fromhex(cmd_hex)
except ValueError:
print(f"Invalid Hex: {cmd_hex}")
continue
# Process Command
# SCD30 Protocol:
# Write: [CMD_MSB, CMD_LSB, ARG...]
# Read: Wait for response
if len(cmd_bytes) < 2:
continue
command_id = (cmd_bytes[0] << 8) | cmd_bytes[1]
response = None
if command_id == 0x0010: # Start Continuous Measurement
print(f"RX: Start Measurement (Arg: {cmd_bytes[2:].hex()})")
state_measurement_active = True
# No data returned for writes, but emulator waits for DATA:?
# Wait, Emulator logic:
# 1. Master sends Write (Command).
# 2. Emulator sends CMD: to PC.
# 3. Emulator waits for DATA: from PC.
# For a WRITE command, the SCD30 ACK's.
# The Emulator implementation blindly expects DATA: from PC to load into TX buffer.
# BUT! If it's a WRITE transaction, the Master isn't reading anything back immediately?
# Wait. the SCD30 driver does `i2c_master_transmit`.
# It treats `0x0010` as a write.
# The Emulator logic (Step 27, lines 58-60) uses `i2c_slave_receive`.
# If Master writes, Slave receives.
# Logic:
# Slave gets data.
# Sends CMD:<data> to PC.
# Waits for DATA:<response> from PC.
# Then fills TX buffer `i2c_slave_transmit`.
# Wait. If Master only WROTE, it won't read back immediately.
# So the `i2c_slave_transmit` will TIMEOUT or BLOCK until Master reads?
# If Master doesn't read, the Slave TX buffer loads but never sends.
# The Master just stops.
# So for Write commands (0x0010), we should probably send an empty DATA:?
# Or dummy data.
# Ideally, we send empty string "DATA:"?
# Let's check Emulator code.
# Line 92: `i2c_slave_transmit(..., tx_len ...)`.
# If tx_len is 0, what happens?
# Just prepares 0 bytes?
# If Master tries to read later, it gets nothing?
# But Master IS NOT reading. Master just did a Write.
# So it's fine if we send nothing to TX buffer?
# Or does `i2c_slave_transmit` block?
# It has a timeout of 50ms (Line 92).
# Since Master isn't clocking a Read, Slave Transmit will timeout.
# This works fine, actually. The call will just exit after 50ms.
# So for Write commands from Master, we don't strictly need to provide data,
# but we MUST send "DATA:" line to release the PC wait loop in Emulator.
response = b''
elif command_id == 0x0202: # Get Ready Status
# Returns 1 (Ready)
# Format: [0x00, 0x01, CRC]
print("RX: Poll Ready Status")
# 0x00 0x01
# CRC of 0x00 0x01 is ...
# calc_crc(b'\x00\x01')
val = bytes([0x00, 0x01])
crc = calculate_crc8(val)
response = val + bytes([crc])
elif command_id == 0x0300: # Read Measurement
print("RX: Read Measurement")
# Generate random variation
sim_co2 += 0.5
if sim_co2 > 420: sim_co2 = 400.0
response = generate_scd30_data(sim_co2, sim_temp, sim_rh)
print(f"TX: CO2={sim_co2:.1f}, T={sim_temp:.1f}, RH={sim_rh:.1f}")
else:
print(f"Unknown Command: {command_id:04X}")
# Send dummy empty
response = b''
# Send response back
# Format: DATA:<hex>
resp_hex = response.hex().upper()
msg = f"DATA:{resp_hex}\n"
ser.write(msg.encode('utf-8'))
# Small sleep to prevent CPU hogging
time.sleep(0.001)
except KeyboardInterrupt:
print("\nExiting...")
ser.close()
if __name__ == "__main__":
main()

61
hil_lib.py Normal file
View File

@@ -0,0 +1,61 @@
import serial
import struct
import time
class HILController:
def __init__(self, emulator_port, target_port, baud=921600):
# Port for the ESP32 acting as the Sensor
self.emulator = serial.Serial(emulator_port, baud, timeout=0.1)
# Port for the ESP32 running your application code
self.target = serial.Serial(target_port, 115200, timeout=0.1)
def calculate_scd30_crc(self, data):
crc = 0xFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc <<= 1
crc &= 0xFF
return crc
def pack_float_scd30(self, value):
"""Packs a float into the SCD30 [MSB, LSB, CRC, MSB, LSB, CRC] format"""
b = struct.pack('>f', value)
w1 = [b[0], b[1]]
w2 = [b[2], b[3]]
return w1 + [self.calculate_scd30_crc(w1)] + w2 + [self.calculate_scd30_crc(w2)]
def set_emulator_data(self, co2, temp, hum):
"""Prepares the DATA: hex string for the Emulator ESP32"""
# SCD30 Ready Status (0x0001 + CRC)
ready_hex = "000103"
# Build the 18-byte measurement buffer
payload = self.pack_float_scd30(co2) + \
self.pack_float_scd30(temp) + \
self.pack_float_scd30(hum)
data_hex = "".join(f"{b:02X}" for b in payload)
# We check for CMD from emulator and respond
line = self.emulator.readline().decode('utf-8', errors='ignore').strip()
if "CMD:0202" in line: # Master asking if data is ready
self.emulator.write(f"DATA:{ready_hex}\n".encode())
elif "CMD:0300" in line: # Master asking for the 18 bytes
self.emulator.write(f"DATA:{data_hex}\n".encode())
return True
return False
def read_target_output(self):
"""Reads the latest print statement from the Test Code ESP32"""
line = self.target.readline().decode('utf-8', errors='ignore').strip()
if "CO2:" in line:
return line
return None
def close(self):
self.emulator.close()
self.target.close()

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
pyserial>=3.5

48
run_test.py Normal file
View File

@@ -0,0 +1,48 @@
import csv
import time
from hil_lib import HILController
# --- SET YOUR PORTS HERE ---
EMULATOR_PORT = 'COM4' # The Bridge
TARGET_PORT = 'COM5' # The App
FILENAME = 'hil_test_results.csv'
hil = HILController(EMULATOR_PORT, TARGET_PORT)
print(f"Starting HIL Test... Saving to {FILENAME}")
with open(FILENAME, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Simulated_CO2", "Target_Read_Output", "Timestamp"])
try:
# Test values to cycle through
test_values = [400.0, 600.5, 850.0, 1200.0, 2500.0]
for val in test_values:
print(f"\nTesting CO2: {val} ppm")
# 1. Put data on the bus (Wait for emulator to request it)
start_time = time.time()
success = False
while time.time() - start_tick < 5: # 5 second timeout per sample
if hil.set_emulator_data(co2=val, temp=25.0, hum=50.0):
success = True
break
# 2. Read what the Target ESP32 saw on its Serial Monitor
time.sleep(1) # Give the Target a moment to process and print
target_data = hil.read_target_output()
if target_data:
print(f"Target Received: {target_data}")
# 3. Store in CSV
writer.writerow([val, target_data, time.ctime()])
file.flush() # Ensure data is written to disk
else:
print("Warning: Target did not report data in time.")
except KeyboardInterrupt:
print("Test stopped by user.")
finally:
hil.close()