from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from sqlalchemy.orm import Session import os import json import uuid from typing import Dict, List from . import models, database app = FastAPI(title="TestArena API") # Mount static files static_dir = os.path.join(os.path.dirname(__file__), "static") os.makedirs(static_dir, exist_ok=True) app.mount("/static", StaticFiles(directory=static_dir), name="static") # Base directory for data as requested BASE_DATA_DIR = "/home/asf/testarena" # For local development on Windows, we might need to adjust this, # but I'll stick to the user's requirement for the final version. if os.name == 'nt': BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data" # Ensure base directory exists os.makedirs(BASE_DATA_DIR, exist_ok=True) # Initialize database models.Base.metadata.create_all(bind=database.engine) @app.post("/api/queue") async def queue_task(payload: Dict, db: Session = Depends(database.get_db)): """ Input json contain {"source": "", :[environment, {"" : ""},]} """ try: source = payload.get("source", "main") # Find the queue_id key (it's the one that isn't "source") queue_id = next(k for k in payload.keys() if k != "source") data = payload[queue_id] environment = data[0] tasks_data = data[1] # This is a dict {"TASK_ID": "path"} # 1. Create folder queue_dir = os.path.join(BASE_DATA_DIR, queue_id) os.makedirs(queue_dir, exist_ok=True) # 2. Create queue_status.json status_file = os.path.join(queue_dir, "queue_status.json") queue_status = { "queue_id": queue_id, "source": source, "status": "Waiting", "tasks": {} } # 3. Save to database and prepare status file new_queue = models.Queue(id=queue_id, environment=environment, source=source, status="Waiting") db.add(new_queue) for task_id, scenario_path in tasks_data.items(): new_task = models.Task(id=task_id, queue_id=queue_id, scenario_path=scenario_path, status="Waiting") db.add(new_task) queue_status["tasks"][task_id] = "Waiting" with open(status_file, 'w') as f: json.dump(queue_status, f, indent=4) db.commit() return {"status": "Queue OK", "queue_id": queue_id} except Exception as e: return {"status": "Error", "message": str(e)} @app.get("/api/status/{id}") async def get_status(id: str, db: Session = Depends(database.get_db)): # Check if it's a queue ID queue = db.query(models.Queue).filter(models.Queue.id == id).first() if queue: return {"id": id, "type": "queue", "status": queue.status} # Check if it's a task ID task = db.query(models.Task).filter(models.Task.id == id).first() if task: return {"id": id, "type": "task", "status": task.status} raise HTTPException(status_code=404, detail="ID not found") @app.post("/api/abort/{id}") async def abort_task(id: str, db: Session = Depends(database.get_db)): # Abort queue queue = db.query(models.Queue).filter(models.Queue.id == id).first() if queue: queue.status = "Aborted" # Abort all tasks in queue tasks = db.query(models.Task).filter(models.Task.queue_id == id).all() for t in tasks: if t.status in ["Waiting", "Running"]: t.status = "Aborted" # Update queue_status.json queue_dir = os.path.join(BASE_DATA_DIR, id) status_file = os.path.join(queue_dir, "queue_status.json") if os.path.exists(status_file): with open(status_file, 'r') as f: data = json.load(f) data["status"] = "Aborted" for tid in data["tasks"]: if data["tasks"][tid] in ["Waiting", "Running"]: data["tasks"][tid] = "Aborted" with open(status_file, 'w') as f: json.dump(data, f, indent=4) db.commit() return {"id": id, "status": "Aborted"} # Abort single task task = db.query(models.Task).filter(models.Task.id == id).first() if task: task.status = "Aborted" # Update queue_status.json queue_dir = os.path.join(BASE_DATA_DIR, task.queue_id) status_file = os.path.join(queue_dir, "queue_status.json") if os.path.exists(status_file): with open(status_file, 'r') as f: data = json.load(f) data["tasks"][id] = "Aborted" with open(status_file, 'w') as f: json.dump(data, f, indent=4) db.commit() return {"id": id, "status": "Aborted"} raise HTTPException(status_code=404, detail="ID not found") @app.get("/api/queues") async def list_queues(db: Session = Depends(database.get_db)): queues = db.query(models.Queue).order_by(models.Queue.created_at.desc()).all() return queues @app.delete("/api/delete/{id}") async def delete_queue(id: str, db: Session = Depends(database.get_db)): # 1. Delete from database queue = db.query(models.Queue).filter(models.Queue.id == id).first() if queue: # Delete associated tasks first db.query(models.Task).filter(models.Task.queue_id == id).delete() db.delete(queue) db.commit() # 2. Delete folder queue_dir = os.path.join(BASE_DATA_DIR, id) if os.path.exists(queue_dir): import shutil shutil.rmtree(queue_dir) return {"id": id, "status": "Deleted"} raise HTTPException(status_code=404, detail="ID not found") @app.get("/api/system/status") async def system_status(): """Check the status of system services""" services = ["testarena-app", "testarena-worker", "nginx"] status = {} for service in services: try: # Use systemctl is-active for a quick check res = os.system(f"systemctl is-active --quiet {service}") status[service] = "online" if res == 0 else "offline" except: status[service] = "unknown" return status @app.get("/api/queue/{id}/tasks") async def get_queue_tasks(id: str, db: Session = Depends(database.get_db)): """Get all tasks for a specific queue""" tasks = db.query(models.Task).filter(models.Task.queue_id == id).all() return tasks @app.get("/") async def root(): return FileResponse(os.path.join(static_dir, "index.html"))