from fastapi import FastAPI, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from typing import List from . import models, schemas, crud, auth, database, tasks from .socket_manager import manager from .dependencies import get_db models.Base.metadata.create_all(bind=database.engine) app = FastAPI(title="ASF TestArena") app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, set to specific domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/auth/login", response_model=schemas.Token) def login_for_access_token(form_data: schemas.UserCreate, db: Session = Depends(get_db)): user = crud.get_user_by_username(db, form_data.username) if not user or not auth.verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = auth.timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = auth.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.post("/auth/reset-password") def reset_password( username: str, new_password: str, current_user: models.User = Depends(auth.get_current_admin_user), db: Session = Depends(get_db) ): user = crud.get_user_by_username(db, username) if not user: raise HTTPException(status_code=404, detail="User not found") user.hashed_password = auth.get_password_hash(new_password) db.commit() return {"message": "Password reset successfully"} @app.get("/admin/users", response_model=List[schemas.User]) def read_users( skip: int = 0, limit: int = 100, current_user: models.User = Depends(auth.get_current_admin_user), db: Session = Depends(get_db) ): users = crud.get_users(db, skip=skip, limit=limit) return users @app.post("/admin/users", response_model=schemas.User) def create_user( user: schemas.UserCreate, current_user: models.User = Depends(auth.get_current_admin_user), db: Session = Depends(get_db) ): db_user = crud.get_user_by_username(db, username=user.username) if db_user: raise HTTPException(status_code=400, detail="Username already registered") hashed_password = auth.get_password_hash(user.password) return crud.create_user(db=db, user=user, hashed_password=hashed_password) @app.delete("/admin/users/{user_id}") def delete_user( user_id: int, current_user: models.User = Depends(auth.get_current_admin_user), db: Session = Depends(get_db) ): crud.delete_user(db, user_id) return {"message": "User deleted"} @app.get("/jobs", response_model=List[schemas.Job]) def read_jobs( skip: int = 0, limit: int = 100, current_user: models.User = Depends(auth.get_current_active_user), db: Session = Depends(get_db) ): if current_user.role == models.UserRole.admin: jobs = crud.get_jobs(db, skip=skip, limit=limit) else: jobs = crud.get_jobs(db, skip=skip, limit=limit, user_id=current_user.id) return jobs @app.get("/jobs/{job_id}", response_model=schemas.Job) def read_job( job_id: int, current_user: models.User = Depends(auth.get_current_active_user), db: Session = Depends(get_db) ): job = crud.get_job(db, job_id=job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found") if current_user.role != models.UserRole.admin and job.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to view this job") return job @app.post("/jobs/submit", response_model=schemas.Job) def submit_job( job: schemas.JobCreate, background_tasks: BackgroundTasks, current_user: models.User = Depends(auth.get_current_active_user), db: Session = Depends(get_db) ): db_job = crud.create_job(db=db, job=job, user_id=current_user.id) background_tasks.add_task(tasks.run_job_task, db_job.id) return db_job @app.post("/jobs/{job_id}/abort") def abort_job( job_id: int, current_user: models.User = Depends(auth.get_current_active_user), db: Session = Depends(get_db) ): job = crud.get_job(db, job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if current_user.role != models.UserRole.admin and job.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") crud.update_job_status(db, job_id, "aborted") return {"message": "Job aborted"} @app.post("/jobs/scenarios") def get_scenarios_endpoint( branch_name: str, current_user: models.User = Depends(auth.get_current_active_user) ): scenarios = tasks.get_scenarios(branch_name) return scenarios @app.websocket("/ws/jobs") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) import asyncio @app.on_event("startup") async def startup_event(): # Start cleanup task asyncio.create_task(tasks.cleanup_old_results()) db = database.SessionLocal() try: user = crud.get_user_by_username(db, "admin") if not user: hashed_password = auth.get_password_hash("admin123") user_in = schemas.UserCreate(username="admin", password="admin123", role=models.UserRole.admin) crud.create_user(db, user_in, hashed_password) finally: db.close()