Files
ASF_tools/asf-cloud-server/testarena/backend/app/main.py
2025-11-25 00:40:22 +01:00

170 lines
5.8 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, BackgroundTasks
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
from . import models, schemas, crud, auth, database, tasks
from .socket_manager import manager
from .dependencies import get_db
models.Base.metadata.create_all(bind=database.engine)
app = FastAPI(title="ASF TestArena")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, set to specific domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/auth/login", response_model=schemas.Token)
def login_for_access_token(form_data: schemas.UserCreate, db: Session = Depends(get_db)):
user = crud.get_user_by_username(db, form_data.username)
if not user or not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = auth.timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
class PasswordResetRequest(BaseModel):
username: str
new_password: str
@app.post("/auth/reset-password")
def reset_password(
request: PasswordResetRequest,
current_user: models.User = Depends(auth.get_current_admin_user),
db: Session = Depends(get_db)
):
user = crud.get_user_by_username(db, request.username)
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.hashed_password = auth.get_password_hash(request.new_password)
db.commit()
return {"message": "Password reset successfully"}
@app.get("/admin/users", response_model=List[schemas.User])
def read_users(
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(auth.get_current_admin_user),
db: Session = Depends(get_db)
):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.post("/admin/users", response_model=schemas.User)
def create_user(
user: schemas.UserCreate,
current_user: models.User = Depends(auth.get_current_admin_user),
db: Session = Depends(get_db)
):
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
hashed_password = auth.get_password_hash(user.password)
return crud.create_user(db=db, user=user, hashed_password=hashed_password)
@app.delete("/admin/users/{user_id}")
def delete_user(
user_id: int,
current_user: models.User = Depends(auth.get_current_admin_user),
db: Session = Depends(get_db)
):
crud.delete_user(db, user_id)
return {"message": "User deleted"}
@app.get("/jobs", response_model=List[schemas.Job])
def read_jobs(
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(auth.get_current_active_user),
db: Session = Depends(get_db)
):
if current_user.role == models.UserRole.admin:
jobs = crud.get_jobs(db, skip=skip, limit=limit)
else:
jobs = crud.get_jobs(db, skip=skip, limit=limit, user_id=current_user.id)
return jobs
@app.get("/jobs/{job_id}", response_model=schemas.Job)
def read_job(
job_id: int,
current_user: models.User = Depends(auth.get_current_active_user),
db: Session = Depends(get_db)
):
job = crud.get_job(db, job_id=job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
if current_user.role != models.UserRole.admin and job.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to view this job")
return job
@app.post("/jobs/submit", response_model=schemas.Job)
def submit_job(
job: schemas.JobCreate,
background_tasks: BackgroundTasks,
current_user: models.User = Depends(auth.get_current_active_user),
db: Session = Depends(get_db)
):
db_job = crud.create_job(db=db, job=job, user_id=current_user.id)
background_tasks.add_task(tasks.run_job_task, db_job.id)
return db_job
@app.post("/jobs/{job_id}/abort")
def abort_job(
job_id: int,
current_user: models.User = Depends(auth.get_current_active_user),
db: Session = Depends(get_db)
):
job = crud.get_job(db, job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if current_user.role != models.UserRole.admin and job.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
crud.update_job_status(db, job_id, "aborted")
return {"message": "Job aborted"}
@app.post("/jobs/scenarios")
def get_scenarios_endpoint(
branch_name: str,
current_user: models.User = Depends(auth.get_current_active_user)
):
scenarios = tasks.get_scenarios(branch_name)
return scenarios
@app.websocket("/ws/jobs")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
import asyncio
@app.on_event("startup")
async def startup_event():
# Start cleanup task
asyncio.create_task(tasks.cleanup_old_results())
db = database.SessionLocal()
try:
user = crud.get_user_by_username(db, "admin")
if not user:
hashed_password = auth.get_password_hash("admin123")
user_in = schemas.UserCreate(username="admin", password="admin123", role=models.UserRole.admin)
crud.create_user(db, user_in, hashed_password)
finally:
db.close()