This commit is contained in:
2025-12-26 22:58:18 +01:00
parent 83de0ca28e
commit 22d0ed24d7
5 changed files with 599 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
#!/bin/bash
set -e
# ----------------------------------------
# 1. Configuration (UPDATE IF NEEDED)
# ----------------------------------------
GIT_USERNAME="asfautomation"
GIT_PASSWORD="asfautomation"
REPO_HOST="gitea.nabd-co.com"
REPO_PATH="ASF-Nabd/ASF-SH"
TARGET_DIR="TPF/Sensor_hub_repo"
# ----------------------------------------
# 2. URL Encoding Function
# ----------------------------------------
urlencode() {
perl -pe 's/([^a-zA-Z0-9_.-])/sprintf("%%%02x", ord($1))/ge'
}
ENCODED_USERNAME=$(printf '%s' "${GIT_USERNAME}" | urlencode)
ENCODED_PASSWORD=$(printf '%s' "${GIT_PASSWORD}" | urlencode)
AUTH_URL="https://${ENCODED_USERNAME}:${ENCODED_PASSWORD}@${REPO_HOST}/${REPO_PATH}.git"
# ----------------------------------------
# 3. Command & Arguments
# ----------------------------------------
COMMAND="$1"
BRANCH_NAME="$2"
# ----------------------------------------
# 4. Functions
# ----------------------------------------
clone_repo() {
if [ -d "${TARGET_DIR}" ]; then
echo " Repository already exists. Skipping clone."
return 0
fi
echo "📥 Cloning repository..."
git clone "${AUTH_URL}" "${TARGET_DIR}"
echo "✅ Clone completed."
}
checkout_branch() {
if [ -z "${BRANCH_NAME}" ]; then
echo "❌ Branch name is required for checkout."
exit 1
fi
if [ ! -d "${TARGET_DIR}" ]; then
echo "❌ Repository not found. Run clone first."
exit 1
fi
cd "${TARGET_DIR}"
echo "📦 Stashing local changes (including untracked)..."
git stash push -u -m "automation-stash-before-checkout" || true
echo "🔄 Fetching latest changes..."
git fetch origin
echo "🌿 Checking out main branch..."
git checkout main
echo "⬇️ Pulling latest main..."
git pull "${AUTH_URL}" main
echo "🌿 Checking out target branch: ${BRANCH_NAME}"
if git show-ref --verify --quiet "refs/heads/${BRANCH_NAME}"; then
git checkout "${BRANCH_NAME}"
else
git checkout -b "${BRANCH_NAME}" "origin/${BRANCH_NAME}"
fi
echo "⬆️ Rebasing '${BRANCH_NAME}' onto latest main..."
git rebase main
cd - >/dev/null
echo "✅ Checkout and rebase completed successfully."
}
delete_repo() {
if [ -d "${TARGET_DIR}" ]; then
echo "🗑️ Deleting repository directory..."
rm -rf "${TARGET_DIR}"
echo "✅ Repository deleted."
else
echo " Repository directory does not exist."
fi
}
# ----------------------------------------
# 5. Main Execution
# ----------------------------------------
case "${COMMAND}" in
clone)
clone_repo
;;
checkout)
checkout_branch
;;
delete)
delete_repo
;;
*)
echo "❌ Invalid command."
echo "Usage:"
echo " $0 clone"
echo " $0 checkout <branch>"
echo " $0 delete"
exit 1
;;
esac
echo "----------------------------------------"
echo "✔ Automation script finished successfully"
echo "----------------------------------------"

View File

@@ -0,0 +1,103 @@
import xml.etree.ElementTree as ET
import os
import sys
import json
from collections import defaultdict
from pathlib import Path
# Get the directory of the current Python file
current_directory = os.path.dirname(os.path.abspath(__file__))
COMPONENT_DIR = os.path.join(current_directory, "Sensor_hub_repo", "components")
def finalize_output(data_obj):
# Convert defaultdict to standard dict recursively
# This removes the <lambda> and <class 'list'> metadata
standard_dict = json.loads(json.dumps(data_obj))
# Print ONLY the JSON string to stdout
#print(json.dumps(standard_dict, indent=4))
return standard_dict
def parse_test_scenario(xml_file_path):
"""
Parses a test scenario XML file and extracts the configuration and all
test case IDs mapped to their execution commands.
Args:
xml_file_path (str): The path to the XML file to parse.
Returns:
dict: A dictionary in the format:
{
'config': <config_value>,
'test_cases': {
<test_case_id>: <test_exec_command>,
...
}
}
Returns an empty dictionary on error.
"""
if not os.path.exists(xml_file_path):
print(f"Error: File not found at '{xml_file_path}'")
return {}
try:
# 1. Parse the XML file
tree = ET.parse(xml_file_path)
root = tree.getroot()
except ET.ParseError as e:
print(f"Error: Failed to parse XML file. Details: {e}")
return {}
except Exception as e:
print(f"An unexpected error occurred during file parsing: {e}")
return {}
# Initialize the final structured output
parsed_data = {
'config': '',
'test_cases': {}
}
# 2. Extract the mandatory <config> value
config_element = root.find('config')
if config_element is not None and config_element.text:
parsed_data['config'] = config_element.text.strip()
# 3. Iterate over all <test_case> elements and extract ID and Exec
for tc in root.findall('test_case'):
tc_id_element = tc.find('test_case_id')
tc_exec_element = tc.find('test_exec')
# Use strip() and check against None for safety, even if validation passed
tc_id = tc_id_element.text.strip() if tc_id_element is not None and tc_id_element.text else "UNKNOWN_ID"
tc_exec = tc_exec_element.text.strip() if tc_exec_element is not None and tc_exec_element.text else "UNKNOWN_EXEC"
# Add to the test_cases dictionary
parsed_data['test_cases'][tc_id] = tc_exec
return parsed_data
if __name__ == "__main__":
# Define a default path to test against
default_test_file = 'sample_scenario.xml'
# Allow passing the file path as a command-line argument for flexibility
file_to_check = sys.argv[1] if len(sys.argv) > 1 else print({})
file_path = os.path.join(COMPONENT_DIR, file_to_check)
print(f"--- XML Test Scenario Parser ---")
print(f"Parsing file: {file_to_check}\n")
# Run the parser
scenario_data = parse_test_scenario(file_path)
# Print results
# if scenario_data:
# print("✅ Parsing Successful. Extracted Data Structure:")
# print(f"CONFIG: {scenario_data['config']}")
# print("\nTEST CASES:")
# for test_id, command in scenario_data['test_cases'].items():
# print(f" - {test_id}:\n '{command}'")
print(finalize_output(scenario_data))
#return finalize_output(scenario_data['test_cases'])

View File

@@ -0,0 +1,165 @@
import os
import sys
import json
from scenario_exe_parser import parse_test_scenario
import subprocess
import os
import sys
import json
import subprocess
# Assuming parse_test_scenario is imported correctly
# from scenario_exe_parser import parse_test_scenario
# --- Global Paths ---
current_directory = os.path.dirname(os.path.abspath(__file__))
REPO_PATH = os.path.join(current_directory, "Sensor_hub_repo")
COMPONENT_DIR = os.path.join(REPO_PATH, "components")
RESULT_PATH = "/home/asf/testarena"
# The HTML Template
REPORT_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ESP32 Test Execution Report</title>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 40px; background-color: #f4f7f6; }
h2 { color: #333; border-bottom: 2px solid #ccc; padding-bottom: 10px; }
table { width: 100%; border-collapse: collapse; margin: 20px 0; background-color: #fff; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }
th, td { padding: 12px 15px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #2c3e50; color: white; text-transform: uppercase; letter-spacing: 0.1em; }
.status-pass { color: #ffffff; background-color: #27ae60; padding: 4px 12px; border-radius: 4px; font-weight: bold; }
.status-fail { color: #ffffff; background-color: #c0392b; padding: 4px 12px; border-radius: 4px; font-weight: bold; }
a { color: #2980b9; text-decoration: none; font-weight: bold; }
a:hover { text-decoration: underline; }
tr:hover { background-color: #f1f1f1; }
</style>
</head>
<body>
<h2>Overall Scenario Summary</h2>
<table>
<thead>
<tr>
<th>Scenario Name</th>
<th>Final Result</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{scenario_name}}</td>
<td><span class="{{overall_class}}">{{overall_status}}</span></td>
</tr>
</tbody>
</table>
<h2>Detailed Test Cases</h2>
<table>
<thead>
<tr>
<th>Test Case ID</th>
<th>Result</th>
<th>Execution Log</th>
</tr>
</thead>
<tbody>
{{test_case_rows}}
</tbody>
</table>
</body>
</html>
"""
def run_test_suite(tasks):
aggregated_results = {}
shell_script = "./TPF/test_execution.sh"
if os.name != 'nt':
subprocess.run(["chmod", "+x", shell_script])
for task in tasks:
print(f"--- Starting Task: {task['id']} ---")
result = subprocess.run(
[shell_script, task['id'], task['cmd'], task['path'], REPO_PATH],
capture_output=True, text=True
)
print(result.stdout)
json_found = False
for line in result.stdout.splitlines():
if line.startswith("FINAL_JSON_OUTPUT:"):
json_string = line.replace("FINAL_JSON_OUTPUT:", "").strip()
try:
task_json = json.loads(json_string)
aggregated_results.update(task_json)
json_found = True
except json.JSONDecodeError as e:
print(f"!!! JSON Parsing Error: {e}")
if not json_found:
aggregated_results[task['id']] = ["ERROR", "N/A"]
return aggregated_results
def generate_html_report(scenario_name, results, output_path):
all_passed = all(info[0] == "PASS" for info in results.values())
overall_status = "PASS" if all_passed else "FAIL"
overall_class = "status-pass" if all_passed else "status-fail"
test_case_rows = ""
for tc_id, info in results.items():
status = info[0]
log_url = info[1]
status_class = "status-pass" if status == "PASS" else "status-fail"
test_case_rows += f"""
<tr>
<td>{tc_id}</td>
<td><span class="{status_class}">{status}</span></td>
<td><a href="{log_url}" target="_blank">View Log</a></td>
</tr>
"""
# Use the global REPORT_TEMPLATE
report_content = REPORT_TEMPLATE.replace("{{scenario_name}}", scenario_name) \
.replace("{{overall_status}}", overall_status) \
.replace("{{overall_class}}", overall_class) \
.replace("{{test_case_rows}}", test_case_rows)
report_file = os.path.join(output_path, "execution_report.html")
with open(report_file, "w") as f:
f.write(report_content)
print(f"HTML Report generated at: {report_file}")
def save_summary(results, task_id_path):
json_path = os.path.join(task_id_path, "final_summary.json")
with open(json_path, "w") as f:
json.dump(results, f, indent=4)
print(f"\nFinal results saved to {json_path}")
if __name__ == "__main__":
queue_id = "1234"
scenario_path = "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml"
task_id = "56754"
# Path logic
queue_path = os.path.join(RESULT_PATH, queue_id)
task_id_path = os.path.join(queue_path, task_id) # Corrected pathing
os.makedirs(task_id_path, exist_ok=True)
# Note: Ensure parse_test_scenario is defined or imported
scenario_data = parse_test_scenario(os.path.join(COMPONENT_DIR, scenario_path))
my_tasks = []
sub_tasks_data = scenario_data['test_cases']
for case_id, exec_cmd in sub_tasks_data.items():
my_tasks.append({
"id": case_id,
"cmd": exec_cmd,
"path": task_id_path
})
final_data = run_test_suite(my_tasks)
save_summary(final_data, task_id_path)
# Generate report INSIDE the task folder
generate_html_report(os.path.basename(scenario_path), final_data, task_id_path)

View File

@@ -0,0 +1,147 @@
import os
import sys
from collections import defaultdict
from pathlib import Path
import json
# Get the directory of the current Python file
current_directory = os.path.dirname(os.path.abspath(__file__))
repo_root = Path(current_directory).parents[1]
COMPONENT_DIR = os.path.join(repo_root, "components")
DEBUG = False
def finalize_output(data_obj):
# Convert defaultdict to standard dict recursively
# This removes the <lambda> and <class 'list'> metadata
standard_dict = json.loads(json.dumps(data_obj))
# Print ONLY the JSON string to stdout
#print(json.dumps(standard_dict, indent=4))
return standard_dict
def find_test_scenarios(root_dir):
"""
Recursively searches the given root directory for files ending with
'.test_scenario.xml' and returns a dictionary mapping scenario names to their
paths relative to the root directory.
Args:
root_dir (str): The absolute path to the starting directory (e.g., 'COMPONENTS').
Returns:
dict[str, str]: A dictionary mapping scenario names (without suffix) to
their relative file paths.
"""
if not os.path.isdir(root_dir):
print(f"Error: Directory not found or not accessible: {root_dir}")
return {} # Return empty dictionary
if DEBUG:
print(f"Scanning directory: '{root_dir}'...")
scenario_suffix = ".test_scenario.xml"
# Dictionary comprehension: {scenario_name: relative_path}
scenarios_map = {
# Key: Scenario name (filename without suffix)
filename.replace(scenario_suffix, ""):
# Value: Relative path
os.path.relpath(os.path.join(dirpath, filename), root_dir)
for dirpath, _, filenames in os.walk(root_dir)
for filename in filenames if filename.endswith(scenario_suffix)
}
return scenarios_map
def organize_by_layer_component(scenarios_map):
"""
Organizes scenario paths into a nested dictionary structure based on the file path:
{Layer_Folder: {Component_Folder: [scenario_name, ...]}}
It assumes the Layer is the first folder and the Component is the folder
preceding the 'test' directory (i.e., the third-to-last segment).
Args:
scenarios_map (dict[str, str]): Dictionary mapping scenario names to their
relative file paths.
Returns:
defaultdict: Nested dictionary (Layer -> Component -> List of Scenario Names).
"""
organized_data = defaultdict(lambda: defaultdict(list))
# Iterate over the scenario name and path
for scenario_name, path in scenarios_map.items():
# Split path into segments using the OS separator
segments = path.split(os.sep)
# Layer is the first segment (e.g., 'application_layer', 'drivers')
layer = segments[0]
# Component is the third-to-last segment (e.g., 'actuator_manager', 'ammonia')
# We assume the file is inside a 'test' folder inside a component folder.
if len(segments) >= 3:
component = segments[-3]
else:
# Fallback for scenarios found too close to the root
component = "Root_Component"
# Populate the nested dictionary
organized_data[layer][component].append(scenario_name)
return organized_data
def scenario_scan(components_root_dir):
"""
Main function to scan for test scenarios, print the organized structure, and
return the resulting dictionaries.
Returns:
tuple[defaultdict, dict]: The organized layer/component structure and the
raw dictionary of scenario names to paths.
"""
# 1. Find all relative paths (now a dictionary: {name: path})
found_scenarios_map = find_test_scenarios(components_root_dir)
if not found_scenarios_map:
print(f"\nNo files ending with '.test_scenario.xml' were found in {components_root_dir}.")
# Return empty structures if nothing is found
return defaultdict(lambda: defaultdict(list)), {}
num_scenarios = len(found_scenarios_map)
if DEBUG:
# 2. Print the simple list of found paths
print(f"\n--- Found {num_scenarios} Test Scenarios ---")
for scenario_name, path in found_scenarios_map.items():
print(f"Scenario: '{scenario_name}' | Relative Path: {os.path.join("components",path)}")
# 3. Organize into the layer/component structure
organized_scenarios = organize_by_layer_component(found_scenarios_map)
if DEBUG:
# 4. Print the organized structure
print("\n--- Organized Layer/Component Structure ---")
for layer, components in organized_scenarios.items():
print(f"\n[LAYER] {layer.upper()}:")
for component, scenarios in components.items():
scenario_list = ", ".join(scenarios)
print(f" [Component] {component}: {scenario_list}")
return organized_scenarios, found_scenarios_map
if __name__ == "__main__":
# The return value from scenario_scan now includes the dictionary you requested
organized_data, scenario_map = scenario_scan(COMPONENT_DIR)
combined_result = {
"organized_data": finalize_output(organized_data),
"scenario_map": finalize_output(scenario_map)
}
# 3. Print the combined object as a single JSON string
# This is what will be captured by the SSH command
print(json.dumps(combined_result))

View File

@@ -0,0 +1,60 @@
#!/bin/bash
# Check if correct number of arguments are provided (now 4)
if [ "$#" -ne 4 ]; then
echo "Usage: $0 <task_id> <command> <result_dir> <repo_path>"
exit 1
fi
TASK_ID=$1
CMD=$2
RESULT_DIR=$3
REPO_PATH=$4
echo $TASK_ID
# Create result directory if it doesn't exist (absolute path)
mkdir -p "$RESULT_DIR"
# Use realpath on the (now-existing) result dir and a clearer filename
LOG_FILE="$(realpath "$RESULT_DIR")/${TASK_ID}-logging.html"
# Initialize HTML file with basic styling
cat <<EOF > "$LOG_FILE"
<html>
<head>
<style>
body { font-family: monospace; background-color: #1e1e1e; color: #d4d4d4; padding: 20px; }
.cmd { color: #569cd6; font-weight: bold; }
.repo { color: #ce9178; }
.output { white-space: pre-wrap; display: block; margin-top: 10px; border-left: 3px solid #666; padding-left: 10px; }
</style>
</head>
<body>
<h2>Execution Log for Task: $TASK_ID</h2>
<p class="repo">Working Directory: $REPO_PATH</p>
<p class="cmd">Executing: $CMD</p>
<hr>
<div class="output">
EOF
# 1. CD into the repo path
# 2. Execute command and capture output
# 3. PIPESTATUS[1] captures the exit code of the CMD, not the 'cd' or 'tee'
cd "$REPO_PATH" && eval "$CMD" 2>&1 | tee -a >(sed 's/$/<br>/' >> "$LOG_FILE")
EXIT_CODE=${PIPESTATUS[0]}
# Close HTML tags
echo "</div></body></html>" >> "$LOG_FILE"
# Determine PASS/FAIL
if [ $EXIT_CODE -eq 0 ]; then
RESULT="PASS"
else
RESULT="FAIL"
fi
EVIDENCE_URL="file://$LOG_FILE"
# Return JSON output
# ... (rest of the script remains the same)
# Return JSON output with a unique marker prefix
printf 'FINAL_JSON_OUTPUT:{"%s": ["%s", "%s"]}\n' "$TASK_ID" "$RESULT" "$EVIDENCE_URL"