diff --git a/asf-pc-server/testarena_pc_backend/gitea_repo_controller.sh b/asf-pc-server/testarena_pc_backend/gitea_repo_controller.sh new file mode 100644 index 0000000..8e43ba3 --- /dev/null +++ b/asf-pc-server/testarena_pc_backend/gitea_repo_controller.sh @@ -0,0 +1,124 @@ +#!/bin/bash +set -e + +# ---------------------------------------- +# 1. Configuration (UPDATE IF NEEDED) +# ---------------------------------------- + +GIT_USERNAME="asfautomation" +GIT_PASSWORD="asfautomation" + +REPO_HOST="gitea.nabd-co.com" +REPO_PATH="ASF-Nabd/ASF-SH" +TARGET_DIR="TPF/Sensor_hub_repo" + +# ---------------------------------------- +# 2. URL Encoding Function +# ---------------------------------------- +urlencode() { + perl -pe 's/([^a-zA-Z0-9_.-])/sprintf("%%%02x", ord($1))/ge' +} + +ENCODED_USERNAME=$(printf '%s' "${GIT_USERNAME}" | urlencode) +ENCODED_PASSWORD=$(printf '%s' "${GIT_PASSWORD}" | urlencode) + + +AUTH_URL="https://${ENCODED_USERNAME}:${ENCODED_PASSWORD}@${REPO_HOST}/${REPO_PATH}.git" + +# ---------------------------------------- +# 3. Command & Arguments +# ---------------------------------------- +COMMAND="$1" +BRANCH_NAME="$2" + +# ---------------------------------------- +# 4. Functions +# ---------------------------------------- + +clone_repo() { + if [ -d "${TARGET_DIR}" ]; then + echo "â„šī¸ Repository already exists. Skipping clone." + return 0 + fi + + echo "đŸ“Ĩ Cloning repository..." + git clone "${AUTH_URL}" "${TARGET_DIR}" + echo "✅ Clone completed." +} + +checkout_branch() { + if [ -z "${BRANCH_NAME}" ]; then + echo "❌ Branch name is required for checkout." + exit 1 + fi + + if [ ! -d "${TARGET_DIR}" ]; then + echo "❌ Repository not found. Run clone first." + exit 1 + fi + + cd "${TARGET_DIR}" + + echo "đŸ“Ļ Stashing local changes (including untracked)..." + git stash push -u -m "automation-stash-before-checkout" || true + + echo "🔄 Fetching latest changes..." + git fetch origin + + echo "đŸŒŋ Checking out main branch..." + git checkout main + + echo "âŦ‡ī¸ Pulling latest main..." + git pull "${AUTH_URL}" main + + echo "đŸŒŋ Checking out target branch: ${BRANCH_NAME}" + if git show-ref --verify --quiet "refs/heads/${BRANCH_NAME}"; then + git checkout "${BRANCH_NAME}" + else + git checkout -b "${BRANCH_NAME}" "origin/${BRANCH_NAME}" + fi + + echo "âŦ†ī¸ Rebasing '${BRANCH_NAME}' onto latest main..." + git rebase main + + cd - >/dev/null + echo "✅ Checkout and rebase completed successfully." +} + +delete_repo() { + if [ -d "${TARGET_DIR}" ]; then + echo "đŸ—‘ī¸ Deleting repository directory..." + rm -rf "${TARGET_DIR}" + echo "✅ Repository deleted." + else + echo "â„šī¸ Repository directory does not exist." + fi +} + +# ---------------------------------------- +# 5. Main Execution +# ---------------------------------------- + +case "${COMMAND}" in + clone) + clone_repo + ;; + checkout) + checkout_branch + ;; + delete) + delete_repo + ;; + *) + echo "❌ Invalid command." + echo "Usage:" + echo " $0 clone" + echo " $0 checkout " + echo " $0 delete" + exit 1 + ;; +esac + +echo "----------------------------------------" +echo "✔ Automation script finished successfully" +echo "----------------------------------------" diff --git a/asf-pc-server/testarena_pc_backend/scenario_exe_parser.py b/asf-pc-server/testarena_pc_backend/scenario_exe_parser.py new file mode 100644 index 0000000..64e8d94 --- /dev/null +++ b/asf-pc-server/testarena_pc_backend/scenario_exe_parser.py @@ -0,0 +1,103 @@ +import xml.etree.ElementTree as ET +import os +import sys +import json +from collections import defaultdict +from pathlib import Path + +# Get the directory of the current Python file +current_directory = os.path.dirname(os.path.abspath(__file__)) +COMPONENT_DIR = os.path.join(current_directory, "Sensor_hub_repo", "components") + +def finalize_output(data_obj): + # Convert defaultdict to standard dict recursively + # This removes the and metadata + standard_dict = json.loads(json.dumps(data_obj)) + + # Print ONLY the JSON string to stdout + #print(json.dumps(standard_dict, indent=4)) + return standard_dict + +def parse_test_scenario(xml_file_path): + """ + Parses a test scenario XML file and extracts the configuration and all + test case IDs mapped to their execution commands. + + Args: + xml_file_path (str): The path to the XML file to parse. + + Returns: + dict: A dictionary in the format: + { + 'config': , + 'test_cases': { + : , + ... + } + } + Returns an empty dictionary on error. + """ + if not os.path.exists(xml_file_path): + print(f"Error: File not found at '{xml_file_path}'") + return {} + + try: + # 1. Parse the XML file + tree = ET.parse(xml_file_path) + root = tree.getroot() + except ET.ParseError as e: + print(f"Error: Failed to parse XML file. Details: {e}") + return {} + except Exception as e: + print(f"An unexpected error occurred during file parsing: {e}") + return {} + + # Initialize the final structured output + parsed_data = { + 'config': '', + 'test_cases': {} + } + + # 2. Extract the mandatory value + config_element = root.find('config') + if config_element is not None and config_element.text: + parsed_data['config'] = config_element.text.strip() + + # 3. Iterate over all elements and extract ID and Exec + for tc in root.findall('test_case'): + tc_id_element = tc.find('test_case_id') + tc_exec_element = tc.find('test_exec') + + # Use strip() and check against None for safety, even if validation passed + tc_id = tc_id_element.text.strip() if tc_id_element is not None and tc_id_element.text else "UNKNOWN_ID" + tc_exec = tc_exec_element.text.strip() if tc_exec_element is not None and tc_exec_element.text else "UNKNOWN_EXEC" + + # Add to the test_cases dictionary + parsed_data['test_cases'][tc_id] = tc_exec + + return parsed_data + +if __name__ == "__main__": + # Define a default path to test against + default_test_file = 'sample_scenario.xml' + + # Allow passing the file path as a command-line argument for flexibility + file_to_check = sys.argv[1] if len(sys.argv) > 1 else print({}) + file_path = os.path.join(COMPONENT_DIR, file_to_check) + + print(f"--- XML Test Scenario Parser ---") + print(f"Parsing file: {file_to_check}\n") + + # Run the parser + scenario_data = parse_test_scenario(file_path) + + # Print results + # if scenario_data: + # print("✅ Parsing Successful. Extracted Data Structure:") + # print(f"CONFIG: {scenario_data['config']}") + # print("\nTEST CASES:") + # for test_id, command in scenario_data['test_cases'].items(): + # print(f" - {test_id}:\n '{command}'") + + print(finalize_output(scenario_data)) + #return finalize_output(scenario_data['test_cases']) diff --git a/asf-pc-server/testarena_pc_backend/scenario_execution.py b/asf-pc-server/testarena_pc_backend/scenario_execution.py new file mode 100644 index 0000000..60947bf --- /dev/null +++ b/asf-pc-server/testarena_pc_backend/scenario_execution.py @@ -0,0 +1,165 @@ +import os +import sys +import json +from scenario_exe_parser import parse_test_scenario +import subprocess +import os +import sys +import json +import subprocess +# Assuming parse_test_scenario is imported correctly +# from scenario_exe_parser import parse_test_scenario + +# --- Global Paths --- +current_directory = os.path.dirname(os.path.abspath(__file__)) +REPO_PATH = os.path.join(current_directory, "Sensor_hub_repo") +COMPONENT_DIR = os.path.join(REPO_PATH, "components") +RESULT_PATH = "/home/asf/testarena" + +# The HTML Template +REPORT_TEMPLATE = """ + + + + + ESP32 Test Execution Report + + + +

Overall Scenario Summary

+ + + + + + + + + + + + + +
Scenario NameFinal Result
{{scenario_name}}{{overall_status}}
+ +

Detailed Test Cases

+ + + + + + + + + + {{test_case_rows}} + +
Test Case IDResultExecution Log
+ + +""" + +def run_test_suite(tasks): + aggregated_results = {} + shell_script = "./TPF/test_execution.sh" + if os.name != 'nt': + subprocess.run(["chmod", "+x", shell_script]) + + for task in tasks: + print(f"--- Starting Task: {task['id']} ---") + result = subprocess.run( + [shell_script, task['id'], task['cmd'], task['path'], REPO_PATH], + capture_output=True, text=True + ) + print(result.stdout) + + json_found = False + for line in result.stdout.splitlines(): + if line.startswith("FINAL_JSON_OUTPUT:"): + json_string = line.replace("FINAL_JSON_OUTPUT:", "").strip() + try: + task_json = json.loads(json_string) + aggregated_results.update(task_json) + json_found = True + except json.JSONDecodeError as e: + print(f"!!! JSON Parsing Error: {e}") + + if not json_found: + aggregated_results[task['id']] = ["ERROR", "N/A"] + return aggregated_results + +def generate_html_report(scenario_name, results, output_path): + all_passed = all(info[0] == "PASS" for info in results.values()) + overall_status = "PASS" if all_passed else "FAIL" + overall_class = "status-pass" if all_passed else "status-fail" + + test_case_rows = "" + for tc_id, info in results.items(): + status = info[0] + log_url = info[1] + status_class = "status-pass" if status == "PASS" else "status-fail" + + test_case_rows += f""" + + {tc_id} + {status} + View Log + + """ + + # Use the global REPORT_TEMPLATE + report_content = REPORT_TEMPLATE.replace("{{scenario_name}}", scenario_name) \ + .replace("{{overall_status}}", overall_status) \ + .replace("{{overall_class}}", overall_class) \ + .replace("{{test_case_rows}}", test_case_rows) + + report_file = os.path.join(output_path, "execution_report.html") + with open(report_file, "w") as f: + f.write(report_content) + print(f"HTML Report generated at: {report_file}") + +def save_summary(results, task_id_path): + json_path = os.path.join(task_id_path, "final_summary.json") + with open(json_path, "w") as f: + json.dump(results, f, indent=4) + print(f"\nFinal results saved to {json_path}") + +if __name__ == "__main__": + queue_id = "1234" + scenario_path = "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml" + task_id = "56754" + + # Path logic + queue_path = os.path.join(RESULT_PATH, queue_id) + task_id_path = os.path.join(queue_path, task_id) # Corrected pathing + + os.makedirs(task_id_path, exist_ok=True) + + # Note: Ensure parse_test_scenario is defined or imported + scenario_data = parse_test_scenario(os.path.join(COMPONENT_DIR, scenario_path)) + + my_tasks = [] + sub_tasks_data = scenario_data['test_cases'] + for case_id, exec_cmd in sub_tasks_data.items(): + my_tasks.append({ + "id": case_id, + "cmd": exec_cmd, + "path": task_id_path + }) + + final_data = run_test_suite(my_tasks) + save_summary(final_data, task_id_path) + + # Generate report INSIDE the task folder + generate_html_report(os.path.basename(scenario_path), final_data, task_id_path) \ No newline at end of file diff --git a/asf-pc-server/testarena_pc_backend/scenario_scan.py b/asf-pc-server/testarena_pc_backend/scenario_scan.py new file mode 100644 index 0000000..2b4a877 --- /dev/null +++ b/asf-pc-server/testarena_pc_backend/scenario_scan.py @@ -0,0 +1,147 @@ +import os +import sys +from collections import defaultdict +from pathlib import Path +import json + + +# Get the directory of the current Python file +current_directory = os.path.dirname(os.path.abspath(__file__)) +repo_root = Path(current_directory).parents[1] +COMPONENT_DIR = os.path.join(repo_root, "components") +DEBUG = False + + +def finalize_output(data_obj): + # Convert defaultdict to standard dict recursively + # This removes the and metadata + standard_dict = json.loads(json.dumps(data_obj)) + + # Print ONLY the JSON string to stdout + #print(json.dumps(standard_dict, indent=4)) + return standard_dict + +def find_test_scenarios(root_dir): + """ + Recursively searches the given root directory for files ending with + '.test_scenario.xml' and returns a dictionary mapping scenario names to their + paths relative to the root directory. + + Args: + root_dir (str): The absolute path to the starting directory (e.g., 'COMPONENTS'). + + Returns: + dict[str, str]: A dictionary mapping scenario names (without suffix) to + their relative file paths. + """ + if not os.path.isdir(root_dir): + print(f"Error: Directory not found or not accessible: {root_dir}") + return {} # Return empty dictionary + + if DEBUG: + print(f"Scanning directory: '{root_dir}'...") + + scenario_suffix = ".test_scenario.xml" + + # Dictionary comprehension: {scenario_name: relative_path} + scenarios_map = { + # Key: Scenario name (filename without suffix) + filename.replace(scenario_suffix, ""): + # Value: Relative path + os.path.relpath(os.path.join(dirpath, filename), root_dir) + + for dirpath, _, filenames in os.walk(root_dir) + for filename in filenames if filename.endswith(scenario_suffix) + } + + return scenarios_map + +def organize_by_layer_component(scenarios_map): + """ + Organizes scenario paths into a nested dictionary structure based on the file path: + {Layer_Folder: {Component_Folder: [scenario_name, ...]}} + + It assumes the Layer is the first folder and the Component is the folder + preceding the 'test' directory (i.e., the third-to-last segment). + + Args: + scenarios_map (dict[str, str]): Dictionary mapping scenario names to their + relative file paths. + + Returns: + defaultdict: Nested dictionary (Layer -> Component -> List of Scenario Names). + """ + organized_data = defaultdict(lambda: defaultdict(list)) + + # Iterate over the scenario name and path + for scenario_name, path in scenarios_map.items(): + # Split path into segments using the OS separator + segments = path.split(os.sep) + + # Layer is the first segment (e.g., 'application_layer', 'drivers') + layer = segments[0] + + # Component is the third-to-last segment (e.g., 'actuator_manager', 'ammonia') + # We assume the file is inside a 'test' folder inside a component folder. + if len(segments) >= 3: + component = segments[-3] + else: + # Fallback for scenarios found too close to the root + component = "Root_Component" + + # Populate the nested dictionary + organized_data[layer][component].append(scenario_name) + + return organized_data + +def scenario_scan(components_root_dir): + """ + Main function to scan for test scenarios, print the organized structure, and + return the resulting dictionaries. + + Returns: + tuple[defaultdict, dict]: The organized layer/component structure and the + raw dictionary of scenario names to paths. + """ + # 1. Find all relative paths (now a dictionary: {name: path}) + found_scenarios_map = find_test_scenarios(components_root_dir) + + if not found_scenarios_map: + print(f"\nNo files ending with '.test_scenario.xml' were found in {components_root_dir}.") + # Return empty structures if nothing is found + return defaultdict(lambda: defaultdict(list)), {} + + num_scenarios = len(found_scenarios_map) + + if DEBUG: + # 2. Print the simple list of found paths + print(f"\n--- Found {num_scenarios} Test Scenarios ---") + for scenario_name, path in found_scenarios_map.items(): + print(f"Scenario: '{scenario_name}' | Relative Path: {os.path.join("components",path)}") + + # 3. Organize into the layer/component structure + organized_scenarios = organize_by_layer_component(found_scenarios_map) + + if DEBUG: + # 4. Print the organized structure + print("\n--- Organized Layer/Component Structure ---") + for layer, components in organized_scenarios.items(): + print(f"\n[LAYER] {layer.upper()}:") + for component, scenarios in components.items(): + scenario_list = ", ".join(scenarios) + print(f" [Component] {component}: {scenario_list}") + + return organized_scenarios, found_scenarios_map + +if __name__ == "__main__": + # The return value from scenario_scan now includes the dictionary you requested + organized_data, scenario_map = scenario_scan(COMPONENT_DIR) + combined_result = { + "organized_data": finalize_output(organized_data), + "scenario_map": finalize_output(scenario_map) + } + + # 3. Print the combined object as a single JSON string + # This is what will be captured by the SSH command + print(json.dumps(combined_result)) + \ No newline at end of file diff --git a/asf-pc-server/testarena_pc_backend/test_execution.sh b/asf-pc-server/testarena_pc_backend/test_execution.sh new file mode 100644 index 0000000..9fc7a97 --- /dev/null +++ b/asf-pc-server/testarena_pc_backend/test_execution.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Check if correct number of arguments are provided (now 4) +if [ "$#" -ne 4 ]; then + echo "Usage: $0 " + exit 1 +fi + +TASK_ID=$1 +CMD=$2 +RESULT_DIR=$3 +REPO_PATH=$4 +echo $TASK_ID +# Create result directory if it doesn't exist (absolute path) +mkdir -p "$RESULT_DIR" +# Use realpath on the (now-existing) result dir and a clearer filename +LOG_FILE="$(realpath "$RESULT_DIR")/${TASK_ID}-logging.html" + +# Initialize HTML file with basic styling +cat < "$LOG_FILE" + + + + + +

Execution Log for Task: $TASK_ID

+

Working Directory: $REPO_PATH

+

Executing: $CMD

+
+
+EOF + +# 1. CD into the repo path +# 2. Execute command and capture output +# 3. PIPESTATUS[1] captures the exit code of the CMD, not the 'cd' or 'tee' +cd "$REPO_PATH" && eval "$CMD" 2>&1 | tee -a >(sed 's/$/
/' >> "$LOG_FILE") +EXIT_CODE=${PIPESTATUS[0]} + +# Close HTML tags +echo "
" >> "$LOG_FILE" + +# Determine PASS/FAIL +if [ $EXIT_CODE -eq 0 ]; then + RESULT="PASS" +else + RESULT="FAIL" +fi + +EVIDENCE_URL="file://$LOG_FILE" + +# Return JSON output +# ... (rest of the script remains the same) + +# Return JSON output with a unique marker prefix +printf 'FINAL_JSON_OUTPUT:{"%s": ["%s", "%s"]}\n' "$TASK_ID" "$RESULT" "$EVIDENCE_URL" \ No newline at end of file