import time import subprocess import json import os from sqlalchemy.orm import Session from . import models, database # Base directory for data BASE_DATA_DIR = "/home/asf/testarena" if os.name == 'nt': BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data" def update_json_status(queue_id, task_id, status, result=None): queue_dir = os.path.join(BASE_DATA_DIR, queue_id) status_file = os.path.join(queue_dir, "queue_status.json") if os.path.exists(status_file): with open(status_file, 'r') as f: data = json.load(f) if task_id: data["tasks"][task_id] = status else: data["status"] = status if result: data["results"] = data.get("results", {}) data["results"][task_id] = result with open(status_file, 'w') as f: json.dump(data, f, indent=4) def run_worker(): print("Worker started...") while True: db = database.SessionLocal() try: # Get next waiting queue queue = db.query(models.Queue).filter(models.Queue.status == "Waiting").order_by(models.Queue.created_at).first() if queue: print(f"Processing queue: {queue.id}") queue.status = "Running" update_json_status(queue.id, None, "Running") db.commit() tasks = db.query(models.Task).filter(models.Task.queue_id == queue.id, models.Task.status == "Waiting").all() for task in tasks: # Check if queue was aborted mid-way db.refresh(queue) if queue.status == "Aborted": break print(f"Running task: {task.id}") task.status = "Running" update_json_status(queue.id, task.id, "Running") db.commit() try: # Run tpf_execution.py [queue_id, scenario_path, task_id] # Assuming tpf_execution.py is in the parent directory or accessible script_path = "tpf_execution.py" # For testing, let's assume it's in the same dir as the app or parent cmd = ["python", script_path, queue.id, task.scenario_path, task.id] result = subprocess.run(cmd, capture_output=True, text=True) # Parse result if it returns json try: execution_result = json.loads(result.stdout) except: execution_result = {"output": result.stdout, "error": result.stderr} task.status = "Finished" task.result = execution_result update_json_status(queue.id, task.id, "Finished", execution_result) except Exception as e: print(f"Error running task {task.id}: {e}") task.status = "Error" update_json_status(queue.id, task.id, "Error") db.commit() if queue.status != "Aborted": queue.status = "Finished" update_json_status(queue.id, None, "Finished") db.commit() time.sleep(5) # Poll every 5 seconds except Exception as e: print(f"Worker error: {e}") time.sleep(10) finally: db.close() if __name__ == "__main__": run_worker()