79 lines
3.0 KiB
Python
79 lines
3.0 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from .. import database, models, schemas, auth_utils
|
|
from ..services import email
|
|
|
|
router = APIRouter(
|
|
prefix="/users",
|
|
tags=["Users"],
|
|
dependencies=[Depends(auth_utils.get_current_admin_user)]
|
|
)
|
|
|
|
@router.post("/", response_model=schemas.UserOut)
|
|
async def create_user(user: schemas.UserCreate, background_tasks: BackgroundTasks, db: Session = Depends(database.get_db)):
|
|
db_user = db.query(models.User).filter(models.User.username == user.username).first()
|
|
if db_user:
|
|
raise HTTPException(status_code=400, detail="Username already registered")
|
|
|
|
hashed_password = auth_utils.get_password_hash(user.password)
|
|
db_user = models.User(
|
|
username=user.username,
|
|
email=user.email,
|
|
hashed_password=hashed_password,
|
|
is_active=user.is_active,
|
|
is_admin=user.is_admin
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
# Send email
|
|
background_tasks.add_task(email.send_welcome_email, user.email, user.username, user.password)
|
|
|
|
return db_user
|
|
|
|
@router.get("/", response_model=List[schemas.UserOut])
|
|
async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(database.get_db)):
|
|
users = db.query(models.User).offset(skip).limit(limit).all()
|
|
return users
|
|
|
|
@router.put("/{user_id}", response_model=schemas.UserOut)
|
|
async def update_user(user_id: int, user_update: schemas.UserUpdate, background_tasks: BackgroundTasks, db: Session = Depends(database.get_db)):
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not db_user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if user_update.password:
|
|
db_user.hashed_password = auth_utils.get_password_hash(user_update.password)
|
|
# Send email on password change
|
|
background_tasks.add_task(email.send_welcome_email, db_user.email, db_user.username, user_update.password)
|
|
|
|
if user_update.email:
|
|
db_user.email = user_update.email
|
|
if user_update.username:
|
|
db_user.username = user_update.username
|
|
if user_update.is_active is not None:
|
|
db_user.is_active = user_update.is_active
|
|
if user_update.is_admin is not None:
|
|
db_user.is_admin = user_update.is_admin
|
|
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
@router.post("/{user_id}/assign/{app_id}")
|
|
async def assign_app_to_user(user_id: int, app_id: int, db: Session = Depends(database.get_db)):
|
|
assignment = db.query(models.UserApplication).filter(
|
|
models.UserApplication.user_id == user_id,
|
|
models.UserApplication.application_id == app_id
|
|
).first()
|
|
|
|
if assignment:
|
|
return {"message": "Already assigned"}
|
|
|
|
new_assignment = models.UserApplication(user_id=user_id, application_id=app_id)
|
|
db.add(new_assignment)
|
|
db.commit()
|
|
return {"message": "Assigned successfully"}
|