188 lines
6.8 KiB
Python
188 lines
6.8 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from sqlalchemy.orm import Session
|
|
import os
|
|
import json
|
|
import uuid
|
|
from typing import Dict, List
|
|
from . import models, database
|
|
|
|
app = FastAPI(title="TestArena API")
|
|
|
|
# Mount static files
|
|
static_dir = os.path.join(os.path.dirname(__file__), "static")
|
|
os.makedirs(static_dir, exist_ok=True)
|
|
app.mount("/static", StaticFiles(directory=static_dir), name="static")
|
|
|
|
@app.get("/favicon.ico", include_in_schema=False)
|
|
async def favicon():
|
|
return FileResponse(os.path.join(static_dir, "favicon.png"))
|
|
|
|
# Base directory for data as requested
|
|
BASE_DATA_DIR = "/home/asf/testarena"
|
|
# For local development on Windows, we might need to adjust this,
|
|
# but I'll stick to the user's requirement for the final version.
|
|
if os.name == 'nt':
|
|
BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data"
|
|
|
|
# Ensure base directory exists
|
|
os.makedirs(BASE_DATA_DIR, exist_ok=True)
|
|
|
|
# Initialize database
|
|
models.Base.metadata.create_all(bind=database.engine)
|
|
|
|
@app.post("/api/queue")
|
|
async def queue_task(payload: Dict, db: Session = Depends(database.get_db)):
|
|
"""
|
|
Input json contain {"source": "<branch_name>", <queue_ID> :[environment, {"<TASK_ID>" : "<path to scenario>"},]}
|
|
"""
|
|
try:
|
|
source = payload.get("source", "main")
|
|
# Find the queue_id key (it's the one that isn't "source")
|
|
queue_id = next(k for k in payload.keys() if k != "source")
|
|
|
|
data = payload[queue_id]
|
|
environment = data[0]
|
|
tasks_data = data[1] # This is a dict {"TASK_ID": "path"}
|
|
|
|
# 1. Create folder
|
|
queue_dir = os.path.join(BASE_DATA_DIR, queue_id)
|
|
os.makedirs(queue_dir, exist_ok=True)
|
|
|
|
# 2. Create queue_status.json
|
|
status_file = os.path.join(queue_dir, "queue_status.json")
|
|
queue_status = {
|
|
"queue_id": queue_id,
|
|
"source": source,
|
|
"status": "Waiting",
|
|
"tasks": {}
|
|
}
|
|
|
|
# 3. Save to database and prepare status file
|
|
new_queue = models.Queue(id=queue_id, environment=environment, source=source, status="Waiting")
|
|
db.add(new_queue)
|
|
|
|
for task_id, scenario_path in tasks_data.items():
|
|
new_task = models.Task(id=task_id, queue_id=queue_id, scenario_path=scenario_path, status="Waiting")
|
|
db.add(new_task)
|
|
queue_status["tasks"][task_id] = "Waiting"
|
|
|
|
with open(status_file, 'w') as f:
|
|
json.dump(queue_status, f, indent=4)
|
|
|
|
db.commit()
|
|
return {"status": "Queue OK", "queue_id": queue_id}
|
|
except Exception as e:
|
|
return {"status": "Error", "message": str(e)}
|
|
|
|
@app.get("/api/status/{id}")
|
|
async def get_status(id: str, db: Session = Depends(database.get_db)):
|
|
# Check if it's a queue ID
|
|
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
|
|
if queue:
|
|
return {"id": id, "type": "queue", "status": queue.status}
|
|
|
|
# Check if it's a task ID
|
|
task = db.query(models.Task).filter(models.Task.id == id).first()
|
|
if task:
|
|
return {"id": id, "type": "task", "status": task.status}
|
|
|
|
raise HTTPException(status_code=404, detail="ID not found")
|
|
|
|
@app.post("/api/abort/{id}")
|
|
async def abort_task(id: str, db: Session = Depends(database.get_db)):
|
|
# Abort queue
|
|
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
|
|
if queue:
|
|
queue.status = "Aborted"
|
|
# Abort all tasks in queue
|
|
tasks = db.query(models.Task).filter(models.Task.queue_id == id).all()
|
|
for t in tasks:
|
|
if t.status in ["Waiting", "Running"]:
|
|
t.status = "Aborted"
|
|
|
|
# Update queue_status.json
|
|
queue_dir = os.path.join(BASE_DATA_DIR, id)
|
|
status_file = os.path.join(queue_dir, "queue_status.json")
|
|
if os.path.exists(status_file):
|
|
with open(status_file, 'r') as f:
|
|
data = json.load(f)
|
|
data["status"] = "Aborted"
|
|
for tid in data["tasks"]:
|
|
if data["tasks"][tid] in ["Waiting", "Running"]:
|
|
data["tasks"][tid] = "Aborted"
|
|
with open(status_file, 'w') as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
db.commit()
|
|
return {"id": id, "status": "Aborted"}
|
|
|
|
# Abort single task
|
|
task = db.query(models.Task).filter(models.Task.id == id).first()
|
|
if task:
|
|
task.status = "Aborted"
|
|
# Update queue_status.json
|
|
queue_dir = os.path.join(BASE_DATA_DIR, task.queue_id)
|
|
status_file = os.path.join(queue_dir, "queue_status.json")
|
|
if os.path.exists(status_file):
|
|
with open(status_file, 'r') as f:
|
|
data = json.load(f)
|
|
data["tasks"][id] = "Aborted"
|
|
with open(status_file, 'w') as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
db.commit()
|
|
return {"id": id, "status": "Aborted"}
|
|
|
|
raise HTTPException(status_code=404, detail="ID not found")
|
|
|
|
@app.get("/api/queues")
|
|
async def list_queues(db: Session = Depends(database.get_db)):
|
|
queues = db.query(models.Queue).order_by(models.Queue.created_at.desc()).all()
|
|
return queues
|
|
|
|
@app.delete("/api/delete/{id}")
|
|
async def delete_queue(id: str, db: Session = Depends(database.get_db)):
|
|
# 1. Delete from database
|
|
queue = db.query(models.Queue).filter(models.Queue.id == id).first()
|
|
if queue:
|
|
# Delete associated tasks first
|
|
db.query(models.Task).filter(models.Task.queue_id == id).delete()
|
|
db.delete(queue)
|
|
db.commit()
|
|
|
|
# 2. Delete folder
|
|
queue_dir = os.path.join(BASE_DATA_DIR, id)
|
|
if os.path.exists(queue_dir):
|
|
import shutil
|
|
shutil.rmtree(queue_dir)
|
|
|
|
return {"id": id, "status": "Deleted"}
|
|
|
|
raise HTTPException(status_code=404, detail="ID not found")
|
|
|
|
@app.get("/api/system/status")
|
|
async def system_status():
|
|
"""Check the status of system services"""
|
|
services = ["testarena-app", "testarena-worker", "nginx"]
|
|
status = {}
|
|
for service in services:
|
|
try:
|
|
# Use systemctl is-active for a quick check
|
|
res = os.system(f"systemctl is-active --quiet {service}")
|
|
status[service] = "online" if res == 0 else "offline"
|
|
except:
|
|
status[service] = "unknown"
|
|
return status
|
|
|
|
@app.get("/api/queue/{id}/tasks")
|
|
async def get_queue_tasks(id: str, db: Session = Depends(database.get_db)):
|
|
"""Get all tasks for a specific queue"""
|
|
tasks = db.query(models.Task).filter(models.Task.queue_id == id).all()
|
|
return tasks
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return FileResponse(os.path.join(static_dir, "index.html"))
|