repo structure

This commit is contained in:
2026-01-04 15:29:33 +01:00
parent 0e993afad5
commit b8e412f4b1
6 changed files with 5 additions and 3 deletions

103
TPF/scenario_exe_parser.py Normal file
View File

@@ -0,0 +1,103 @@
import xml.etree.ElementTree as ET
import os
import sys
import json
from collections import defaultdict
from pathlib import Path
# Get the directory of the current Python file
current_directory = os.path.dirname(os.path.abspath(__file__))
COMPONENT_DIR = os.path.join(current_directory, "Sensor_hub_repo", "components")
def finalize_output(data_obj):
# Convert defaultdict to standard dict recursively
# This removes the <lambda> and <class 'list'> metadata
standard_dict = json.loads(json.dumps(data_obj))
# Print ONLY the JSON string to stdout
#print(json.dumps(standard_dict, indent=4))
return standard_dict
def parse_test_scenario(xml_file_path):
"""
Parses a test scenario XML file and extracts the configuration and all
test case IDs mapped to their execution commands.
Args:
xml_file_path (str): The path to the XML file to parse.
Returns:
dict: A dictionary in the format:
{
'config': <config_value>,
'test_cases': {
<test_case_id>: <test_exec_command>,
...
}
}
Returns an empty dictionary on error.
"""
if not os.path.exists(xml_file_path):
print(f"Error: File not found at '{xml_file_path}'")
return {}
try:
# 1. Parse the XML file
tree = ET.parse(xml_file_path)
root = tree.getroot()
except ET.ParseError as e:
print(f"Error: Failed to parse XML file. Details: {e}")
return {}
except Exception as e:
print(f"An unexpected error occurred during file parsing: {e}")
return {}
# Initialize the final structured output
parsed_data = {
'config': '',
'test_cases': {}
}
# 2. Extract the mandatory <config> value
config_element = root.find('config')
if config_element is not None and config_element.text:
parsed_data['config'] = config_element.text.strip()
# 3. Iterate over all <test_case> elements and extract ID and Exec
for tc in root.findall('test_case'):
tc_id_element = tc.find('test_case_id')
tc_exec_element = tc.find('test_exec')
# Use strip() and check against None for safety, even if validation passed
tc_id = tc_id_element.text.strip() if tc_id_element is not None and tc_id_element.text else "UNKNOWN_ID"
tc_exec = tc_exec_element.text.strip() if tc_exec_element is not None and tc_exec_element.text else "UNKNOWN_EXEC"
# Add to the test_cases dictionary
parsed_data['test_cases'][tc_id] = tc_exec
return parsed_data
if __name__ == "__main__":
# Define a default path to test against
default_test_file = 'sample_scenario.xml'
# Allow passing the file path as a command-line argument for flexibility
file_to_check = sys.argv[1] if len(sys.argv) > 1 else print({})
file_path = os.path.join(COMPONENT_DIR, file_to_check)
print(f"--- XML Test Scenario Parser ---")
print(f"Parsing file: {file_to_check}\n")
# Run the parser
scenario_data = parse_test_scenario(file_path)
# Print results
# if scenario_data:
# print("✅ Parsing Successful. Extracted Data Structure:")
# print(f"CONFIG: {scenario_data['config']}")
# print("\nTEST CASES:")
# for test_id, command in scenario_data['test_cases'].items():
# print(f" - {test_id}:\n '{command}'")
print(finalize_output(scenario_data))
#return finalize_output(scenario_data['test_cases'])

182
TPF/scenario_execution.py Normal file
View File

@@ -0,0 +1,182 @@
import os
import sys
import json
import subprocess
from scenario_exe_parser import parse_test_scenario
# --- Global Paths ---
current_directory = os.path.dirname(os.path.abspath(__file__))
REPO_PATH = os.path.join(current_directory, "Sensor_hub_repo")
COMPONENT_DIR = os.path.join(REPO_PATH, "components")
RESULT_PATH = "/home/asf/testarena"
# The HTML Template
REPORT_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ESP32 Test Execution Report</title>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 40px; background-color: #f4f7f6; }
h2 { color: #333; border-bottom: 2px solid #ccc; padding-bottom: 10px; }
table { width: 100%; border-collapse: collapse; margin: 20px 0; background-color: #fff; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }
th, td { padding: 12px 15px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #2c3e50; color: white; text-transform: uppercase; letter-spacing: 0.1em; }
.status-pass { color: #ffffff; background-color: #27ae60; padding: 4px 12px; border-radius: 4px; font-weight: bold; }
.status-fail { color: #ffffff; background-color: #c0392b; padding: 4px 12px; border-radius: 4px; font-weight: bold; }
a { color: #2980b9; text-decoration: none; font-weight: bold; }
a:hover { text-decoration: underline; }
tr:hover { background-color: #f1f1f1; }
</style>
</head>
<body>
<h2>Overall Scenario Summary</h2>
<table>
<thead>
<tr>
<th>Scenario Name</th>
<th>Final Result</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{scenario_name}}</td>
<td><span class="{{overall_class}}">{{overall_status}}</span></td>
</tr>
</tbody>
</table>
<h2>Detailed Test Cases</h2>
<table>
<thead>
<tr>
<th>Test Case ID</th>
<th>Result</th>
<th>Execution Log</th>
</tr>
</thead>
<tbody>
{{test_case_rows}}
</tbody>
</table>
</body>
</html>
"""
def run_test_suite(tasks):
aggregated_results = {}
# Use path relative to this script
script_dir = os.path.dirname(os.path.abspath(__file__))
shell_script = os.path.join(script_dir, "test_execution.sh")
shell_script = "/home/asf/testarena_backend/TPF/test_execution.sh"
if os.name != 'nt':
subprocess.run(["chmod", "+x", shell_script])
for task in tasks:
print(f"--- Starting Task: {task['id']} ---")
# Use Popen to stream output in real-time
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
process = subprocess.Popen(
[shell_script, task['id'], task['cmd'], task['path'], REPO_PATH],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env
)
full_output = ""
for line in process.stdout:
print(line, end="")
full_output += line
process.wait()
json_found = False
for line in full_output.splitlines():
if line.startswith("FINAL_JSON_OUTPUT:"):
json_string = line.replace("FINAL_JSON_OUTPUT:", "").strip()
try:
task_json = json.loads(json_string)
aggregated_results.update(task_json)
json_found = True
except json.JSONDecodeError as e:
print(f"!!! JSON Parsing Error: {e}")
if not json_found:
aggregated_results[task['id']] = ["ERROR", "N/A"]
return aggregated_results
def generate_html_report(scenario_name, results, output_path):
all_passed = all(info[0] == "PASS" for info in results.values())
overall_status = "PASS" if all_passed else "FAIL"
overall_class = "status-pass" if all_passed else "status-fail"
test_case_rows = ""
for tc_id, info in results.items():
status = info[0]
log_url = info[1]
log_url = log_url.replace("file:///home/asf/testarena/", "http://asf-server.duckdns.org:8080/results/")
status_class = "status-pass" if status == "PASS" else "status-fail"
test_case_rows += f"""
<tr>
<td>{tc_id}</td>
<td><span class="{status_class}">{status}</span></td>
<td><a href="{log_url}" target="_blank">View Log</a></td>
</tr>
"""
# Use the global REPORT_TEMPLATE
report_content = REPORT_TEMPLATE.replace("{{scenario_name}}", scenario_name) \
.replace("{{overall_status}}", overall_status) \
.replace("{{overall_class}}", overall_class) \
.replace("{{test_case_rows}}", test_case_rows)
report_file = os.path.join(output_path, "execution_report.html")
with open(report_file, "w") as f:
f.write(report_content)
print(f"HTML Report generated at: {report_file}")
def save_summary(results, task_id_path):
json_path = os.path.join(task_id_path, "final_summary.json")
with open(json_path, "w") as f:
json.dump(results, f, indent=4)
print(f"\nFinal results saved to {json_path}")
if __name__ == "__main__":
if len(sys.argv) > 3:
queue_id = sys.argv[1] #"1234"
scenario_path = sys.argv[2] #"application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml"
task_id = sys.argv[3] #"56754"
else:
print("Usage: python scenario_execution.py <queue_id> <scenario_path> <task_id>")
sys.exit(1)
# Path logic
queue_path = os.path.join(RESULT_PATH, queue_id)
task_id_path = os.path.join(queue_path, task_id) # Corrected pathing
os.makedirs(task_id_path, exist_ok=True)
# Note: Ensure parse_test_scenario is defined or imported
scenario_data = parse_test_scenario(os.path.join(COMPONENT_DIR, scenario_path))
my_tasks = []
sub_tasks_data = scenario_data['test_cases']
for case_id, exec_cmd in sub_tasks_data.items():
my_tasks.append({
"id": case_id,
"cmd": exec_cmd,
"path": task_id_path
})
final_data = run_test_suite(my_tasks)
save_summary(final_data, task_id_path)
# Generate report INSIDE the task folder
generate_html_report(os.path.basename(scenario_path), final_data, task_id_path)

147
TPF/scenario_scan.py Normal file
View File

@@ -0,0 +1,147 @@
import os
import sys
from collections import defaultdict
from pathlib import Path
import json
# Get the directory of the current Python file
current_directory = os.path.dirname(os.path.abspath(__file__))
repo_root = Path(current_directory).parents[1]
COMPONENT_DIR = os.path.join(repo_root, "components")
DEBUG = False
def finalize_output(data_obj):
# Convert defaultdict to standard dict recursively
# This removes the <lambda> and <class 'list'> metadata
standard_dict = json.loads(json.dumps(data_obj))
# Print ONLY the JSON string to stdout
#print(json.dumps(standard_dict, indent=4))
return standard_dict
def find_test_scenarios(root_dir):
"""
Recursively searches the given root directory for files ending with
'.test_scenario.xml' and returns a dictionary mapping scenario names to their
paths relative to the root directory.
Args:
root_dir (str): The absolute path to the starting directory (e.g., 'COMPONENTS').
Returns:
dict[str, str]: A dictionary mapping scenario names (without suffix) to
their relative file paths.
"""
if not os.path.isdir(root_dir):
print(f"Error: Directory not found or not accessible: {root_dir}")
return {} # Return empty dictionary
if DEBUG:
print(f"Scanning directory: '{root_dir}'...")
scenario_suffix = ".test_scenario.xml"
# Dictionary comprehension: {scenario_name: relative_path}
scenarios_map = {
# Key: Scenario name (filename without suffix)
filename.replace(scenario_suffix, ""):
# Value: Relative path
os.path.relpath(os.path.join(dirpath, filename), root_dir)
for dirpath, _, filenames in os.walk(root_dir)
for filename in filenames if filename.endswith(scenario_suffix)
}
return scenarios_map
def organize_by_layer_component(scenarios_map):
"""
Organizes scenario paths into a nested dictionary structure based on the file path:
{Layer_Folder: {Component_Folder: [scenario_name, ...]}}
It assumes the Layer is the first folder and the Component is the folder
preceding the 'test' directory (i.e., the third-to-last segment).
Args:
scenarios_map (dict[str, str]): Dictionary mapping scenario names to their
relative file paths.
Returns:
defaultdict: Nested dictionary (Layer -> Component -> List of Scenario Names).
"""
organized_data = defaultdict(lambda: defaultdict(list))
# Iterate over the scenario name and path
for scenario_name, path in scenarios_map.items():
# Split path into segments using the OS separator
segments = path.split(os.sep)
# Layer is the first segment (e.g., 'application_layer', 'drivers')
layer = segments[0]
# Component is the third-to-last segment (e.g., 'actuator_manager', 'ammonia')
# We assume the file is inside a 'test' folder inside a component folder.
if len(segments) >= 3:
component = segments[-3]
else:
# Fallback for scenarios found too close to the root
component = "Root_Component"
# Populate the nested dictionary
organized_data[layer][component].append(scenario_name)
return organized_data
def scenario_scan(components_root_dir):
"""
Main function to scan for test scenarios, print the organized structure, and
return the resulting dictionaries.
Returns:
tuple[defaultdict, dict]: The organized layer/component structure and the
raw dictionary of scenario names to paths.
"""
# 1. Find all relative paths (now a dictionary: {name: path})
found_scenarios_map = find_test_scenarios(components_root_dir)
if not found_scenarios_map:
print(f"\nNo files ending with '.test_scenario.xml' were found in {components_root_dir}.")
# Return empty structures if nothing is found
return defaultdict(lambda: defaultdict(list)), {}
num_scenarios = len(found_scenarios_map)
if DEBUG:
# 2. Print the simple list of found paths
print(f"\n--- Found {num_scenarios} Test Scenarios ---")
for scenario_name, path in found_scenarios_map.items():
print(f"Scenario: '{scenario_name}' | Relative Path: {os.path.join("components",path)}")
# 3. Organize into the layer/component structure
organized_scenarios = organize_by_layer_component(found_scenarios_map)
if DEBUG:
# 4. Print the organized structure
print("\n--- Organized Layer/Component Structure ---")
for layer, components in organized_scenarios.items():
print(f"\n[LAYER] {layer.upper()}:")
for component, scenarios in components.items():
scenario_list = ", ".join(scenarios)
print(f" [Component] {component}: {scenario_list}")
return organized_scenarios, found_scenarios_map
if __name__ == "__main__":
# The return value from scenario_scan now includes the dictionary you requested
organized_data, scenario_map = scenario_scan(COMPONENT_DIR)
combined_result = {
"organized_data": finalize_output(organized_data),
"scenario_map": finalize_output(scenario_map)
}
# 3. Print the combined object as a single JSON string
# This is what will be captured by the SSH command
print(json.dumps(combined_result))

64
TPF/test_execution.sh Normal file
View File

@@ -0,0 +1,64 @@
#!/bin/bash
# Check if correct number of arguments are provided (now 4)
if [ "$#" -ne 4 ]; then
echo "Usage: $0 <task_id> <command> <result_dir> <repo_path>"
exit 1
fi
TASK_ID=$1
CMD=$2
RESULT_DIR=$3
REPO_PATH=$4
echo $TASK_ID
# Create result directory if it doesn't exist (absolute path)
mkdir -p "$RESULT_DIR"
# Use realpath on the (now-existing) result dir and a clearer filename
LOG_FILE="$(realpath "$RESULT_DIR")/${TASK_ID}-logging.html"
# Initialize HTML file with basic styling
cat <<EOF > "$LOG_FILE"
<html>
<head>
<style>
body { font-family: monospace; background-color: #1e1e1e; color: #d4d4d4; padding: 20px; }
.cmd { color: #569cd6; font-weight: bold; }
.repo { color: #ce9178; }
.output { white-space: pre-wrap; display: block; margin-top: 10px; border-left: 3px solid #666; padding-left: 10px; }
</style>
</head>
<body>
<h2>Execution Log for Task: $TASK_ID</h2>
<p class="repo">Working Directory: $REPO_PATH</p>
<p class="cmd">Executing: $CMD</p>
<hr>
<div class="output">
EOF
# 1. CD into the repo path
# 2. Execute command and capture output
# 3. PIPESTATUS[0] captures the exit code of the eval "$CMD"
export PYTHONUNBUFFERED=1
echo "--- Execution Start ---" | tee -a >(sed 's/$/<br>/' >> "$LOG_FILE")
cd "$REPO_PATH" && stdbuf -oL -eL /bin/bash -c "$CMD" 2>&1 | tee -a >(sed 's/$/<br>/' >> "$LOG_FILE")
EXIT_CODE=${PIPESTATUS[0]}
echo "--- Execution End (Exit Code: $EXIT_CODE) ---" | tee -a >(sed 's/$/<br>/' >> "$LOG_FILE")
# Close HTML tags
echo "</div></body></html>" >> "$LOG_FILE"
# Determine PASS/FAIL
# We consider it a FAIL if the exit code is non-zero
if [ $EXIT_CODE -eq 0 ]; then
RESULT="PASS"
else
RESULT="FAIL"
fi
EVIDENCE_URL="file://$LOG_FILE"
# Return JSON output
# ... (rest of the script remains the same)
# Return JSON output with a unique marker prefix
printf 'FINAL_JSON_OUTPUT:{"%s": ["%s", "%s"]}\n' "$TASK_ID" "$RESULT" "$EVIDENCE_URL"

32
TPF/tpf_execution.py Normal file
View File

@@ -0,0 +1,32 @@
import sys
import json
import time
import random
def main():
if len(sys.argv) < 4:
print("Usage: python tpf_execution.py <queue_id> <scenario_path> <task_id>")
sys.exit(1)
queue_id = sys.argv[1]
scenario_path = sys.argv[2]
task_id = sys.argv[3]
print(f"Starting execution for Task: {task_id} in Queue: {queue_id}")
print(f"Scenario: {scenario_path}")
# Simulate work
duration = random.randint(2, 5)
time.sleep(duration)
result = {
"task_id": task_id,
"status": "Success",
"duration": duration,
"details": f"Scenario {scenario_path} executed successfully."
}
print(json.dumps(result))
if __name__ == "__main__":
main()