Files
2026-01-04 18:13:53 +01:00

188 lines
6.8 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
import os
import json
import uuid
from typing import Dict, List
from . import models, database
app = FastAPI(title="TestArena API")
# Mount static files
static_dir = os.path.join(os.path.dirname(__file__), "static")
os.makedirs(static_dir, exist_ok=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse(os.path.join(static_dir, "favicon.png"))
# Base directory for data as requested
BASE_DATA_DIR = "/home/asf/testarena"
# For local development on Windows, we might need to adjust this,
# but I'll stick to the user's requirement for the final version.
if os.name == 'nt':
BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data"
# Ensure base directory exists
os.makedirs(BASE_DATA_DIR, exist_ok=True)
# Initialize database
models.Base.metadata.create_all(bind=database.engine)
@app.post("/api/queue")
async def queue_task(payload: Dict, db: Session = Depends(database.get_db)):
"""
Input json contain {"source": "<branch_name>", <queue_ID> :[environment, {"<TASK_ID>" : "<path to scenario>"},]}
"""
try:
source = payload.get("source", "main")
# Find the queue_id key (it's the one that isn't "source")
queue_id = next(k for k in payload.keys() if k != "source")
data = payload[queue_id]
environment = data[0]
tasks_data = data[1] # This is a dict {"TASK_ID": "path"}
# 1. Create folder
queue_dir = os.path.join(BASE_DATA_DIR, queue_id)
os.makedirs(queue_dir, exist_ok=True)
# 2. Create queue_status.json
status_file = os.path.join(queue_dir, "queue_status.json")
queue_status = {
"queue_id": queue_id,
"source": source,
"status": "Waiting",
"tasks": {}
}
# 3. Save to database and prepare status file
new_queue = models.Queue(id=queue_id, environment=environment, source=source, status="Waiting")
db.add(new_queue)
for task_id, scenario_path in tasks_data.items():
new_task = models.Task(id=task_id, queue_id=queue_id, scenario_path=scenario_path, status="Waiting")
db.add(new_task)
queue_status["tasks"][task_id] = "Waiting"
with open(status_file, 'w') as f:
json.dump(queue_status, f, indent=4)
db.commit()
return {"status": "Queue OK", "queue_id": queue_id}
except Exception as e:
return {"status": "Error", "message": str(e)}
@app.get("/api/status/{id}")
async def get_status(id: str, db: Session = Depends(database.get_db)):
# Check if it's a queue ID
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
if queue:
return {"id": id, "type": "queue", "status": queue.status}
# Check if it's a task ID
task = db.query(models.Task).filter(models.Task.id == id).first()
if task:
return {"id": id, "type": "task", "status": task.status}
raise HTTPException(status_code=404, detail="ID not found")
@app.post("/api/abort/{id}")
async def abort_task(id: str, db: Session = Depends(database.get_db)):
# Abort queue
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
if queue:
queue.status = "Aborted"
# Abort all tasks in queue
tasks = db.query(models.Task).filter(models.Task.queue_id == id).all()
for t in tasks:
if t.status in ["Waiting", "Running"]:
t.status = "Aborted"
# Update queue_status.json
queue_dir = os.path.join(BASE_DATA_DIR, id)
status_file = os.path.join(queue_dir, "queue_status.json")
if os.path.exists(status_file):
with open(status_file, 'r') as f:
data = json.load(f)
data["status"] = "Aborted"
for tid in data["tasks"]:
if data["tasks"][tid] in ["Waiting", "Running"]:
data["tasks"][tid] = "Aborted"
with open(status_file, 'w') as f:
json.dump(data, f, indent=4)
db.commit()
return {"id": id, "status": "Aborted"}
# Abort single task
task = db.query(models.Task).filter(models.Task.id == id).first()
if task:
task.status = "Aborted"
# Update queue_status.json
queue_dir = os.path.join(BASE_DATA_DIR, task.queue_id)
status_file = os.path.join(queue_dir, "queue_status.json")
if os.path.exists(status_file):
with open(status_file, 'r') as f:
data = json.load(f)
data["tasks"][id] = "Aborted"
with open(status_file, 'w') as f:
json.dump(data, f, indent=4)
db.commit()
return {"id": id, "status": "Aborted"}
raise HTTPException(status_code=404, detail="ID not found")
@app.get("/api/queues")
async def list_queues(db: Session = Depends(database.get_db)):
queues = db.query(models.Queue).order_by(models.Queue.created_at.desc()).all()
return queues
@app.delete("/api/delete/{id}")
async def delete_queue(id: str, db: Session = Depends(database.get_db)):
# 1. Delete from database
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
if queue:
# Delete associated tasks first
db.query(models.Task).filter(models.Task.queue_id == id).delete()
db.delete(queue)
db.commit()
# 2. Delete folder
queue_dir = os.path.join(BASE_DATA_DIR, id)
if os.path.exists(queue_dir):
import shutil
shutil.rmtree(queue_dir)
return {"id": id, "status": "Deleted"}
raise HTTPException(status_code=404, detail="ID not found")
@app.get("/api/system/status")
async def system_status():
"""Check the status of system services"""
services = ["testarena-app", "testarena-worker", "nginx"]
status = {}
for service in services:
try:
# Use systemctl is-active for a quick check
res = os.system(f"systemctl is-active --quiet {service}")
status[service] = "online" if res == 0 else "offline"
except:
status[service] = "unknown"
return status
@app.get("/api/queue/{id}/tasks")
async def get_queue_tasks(id: str, db: Session = Depends(database.get_db)):
"""Get all tasks for a specific queue"""
tasks = db.query(models.Task).filter(models.Task.queue_id == id).all()
return tasks
@app.get("/")
async def root():
return FileResponse(os.path.join(static_dir, "index.html"))