import time import subprocess import json import os from sqlalchemy.orm import Session from . import models, database # Base directory for data BASE_DATA_DIR = "/home/asf/testarena" if os.name == 'nt': BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data" def update_json_status(queue_id, task_id, status, result=None): queue_dir = os.path.join(BASE_DATA_DIR, queue_id) status_file = os.path.join(queue_dir, "queue_status.json") if os.path.exists(status_file): with open(status_file, 'r') as f: data = json.load(f) if task_id: data["tasks"][task_id] = status else: data["status"] = status if result: data["results"] = data.get("results", {}) data["results"][task_id] = result with open(status_file, 'w') as f: json.dump(data, f, indent=4) def run_command_with_logging(cmd, log_file, cwd=None, env=None): """Runs a command, logs output to file and stdout.""" with open(log_file, "a") as f: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True, cwd=cwd, env=env ) for line in process.stdout: print(line, end="") f.write(line) f.flush() process.wait() return process.returncode def run_worker(): print("Worker started...") while True: db = database.SessionLocal() try: # Get next waiting queue queue = db.query(models.Queue).filter(models.Queue.status == "Waiting").order_by(models.Queue.created_at).first() if queue: print(f"Processing queue: {queue.id} (Branch: {queue.source})") queue.status = "Running" update_json_status(queue.id, None, "Running") db.commit() queue_dir = os.path.join(BASE_DATA_DIR, queue.id) os.makedirs(queue_dir, exist_ok=True) queue_log = os.path.join(queue_dir, "queue_log.txt") # 0- Checkout branch print(f"Checking out branch: {queue.source}") checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}" run_command_with_logging(checkout_cmd, queue_log) # 1-5 Build software print("Building software...") # We need to source the IDF export script and then build. # Using a single shell command to maintain environment. build_cmd = f"bash -c 'source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'" run_command_with_logging(build_cmd, queue_log) # 9- Loop for each task tasks = db.query(models.Task).filter(models.Task.queue_id == queue.id, models.Task.status == "Waiting").all() for task in tasks: # Check if queue was aborted mid-way db.refresh(queue) if queue.status == "Aborted": break print(f"Running task: {task.id}") task.status = "Running" update_json_status(queue.id, task.id, "Running") db.commit() try: # Run scenario_execution.py queue_id scenario_path task_id # The user said it's in TPF/scenario_execution.py script_path = "./TPF/scenario_execution.py" # Use the same python environment cmd = f"python3 {script_path} {queue.id} {task.scenario_path} {task.id}" # We want to capture this in the task log too task_dir = os.path.join(queue_dir, task.id) os.makedirs(task_dir, exist_ok=True) task_log = os.path.join(task_dir, f"{task.id}-logging.html") # For now, let's just log to stdout and the queue log # scenario_execution.py already generates its own reports. ret = run_command_with_logging(cmd, queue_log) if ret == 0: task.status = "Finished" else: task.status = "Error" # Try to find the summary if it exists summary_path = os.path.join(task_dir, "final_summary.json") if os.path.exists(summary_path): with open(summary_path, 'r') as f: task.result = json.load(f) update_json_status(queue.id, task.id, task.status, task.result) except Exception as e: print(f"Error running task {task.id}: {e}") task.status = "Error" update_json_status(queue.id, task.id, "Error") db.commit() if queue.status != "Aborted": queue.status = "Finished" update_json_status(queue.id, None, "Finished") db.commit() time.sleep(5) # Poll every 5 seconds except Exception as e: print(f"Worker error: {e}") time.sleep(10) finally: db.close() if __name__ == "__main__": run_worker()