This commit is contained in:
2026-01-25 14:36:01 +01:00
commit fdf1e0e8ed
22 changed files with 1269 additions and 0 deletions

56
backend/auth_utils.py Normal file
View File

@@ -0,0 +1,56 @@
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from . import models, schemas, database
# Configuration
SECRET_KEY = "ASF_SSO_SUPER_SECRET_KEY_CHANGE_THIS_IN_PROD"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 1 day
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(database.get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
async def get_current_admin_user(current_user: models.User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=400, detail="Inactive user or not admin")
return current_user

19
backend/database.py Normal file
View File

@@ -0,0 +1,19 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sso.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

53
backend/main.py Normal file
View File

@@ -0,0 +1,53 @@
from fastapi import FastAPI, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from .database import engine, Base, SessionLocal
from .routers import auth, users, apps, sso
from . import models, auth_utils
from sqlalchemy.orm import Session
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="ASF SSO Service")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Routers
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(apps.router)
app.include_router(sso.router)
# Mount static files (Frontend)
app.mount("/", StaticFiles(directory="../frontend", html=True), name="static")
# Create initial admin user if not exists
def create_initial_admin():
db = SessionLocal()
try:
admin = db.query(models.User).filter(models.User.username == "admin").first()
if not admin:
print("Creating initial admin user...")
hashed_pwd = auth_utils.get_password_hash("admin") # Default password, should be changed
admin_user = models.User(
username="admin",
email="admin@nabd-co.com",
hashed_password=hashed_pwd,
is_admin=True,
is_active=True
)
db.add(admin_user)
db.commit()
print("Admin user created.")
finally:
db.close()
create_initial_admin()

40
backend/models.py Normal file
View File

@@ -0,0 +1,40 @@
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from datetime import datetime
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
applications = relationship("UserApplication", back_populates="user")
class Application(Base):
__tablename__ = "applications"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True)
api_key = Column(String, unique=True, index=True) # Secret key for the app to talk to SSO
url = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
users = relationship("UserApplication", back_populates="application")
class UserApplication(Base):
__tablename__ = "user_applications"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
application_id = Column(Integer, ForeignKey("applications.id"))
assigned_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="applications")
application = relationship("Application", back_populates="users")

33
backend/routers/apps.py Normal file
View File

@@ -0,0 +1,33 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
import secrets
from .. import database, models, schemas, auth_utils
router = APIRouter(
prefix="/apps",
tags=["Applications"],
dependencies=[Depends(auth_utils.get_current_admin_user)]
)
@router.post("/", response_model=schemas.ApplicationOut)
async def create_application(app: schemas.ApplicationCreate, db: Session = Depends(database.get_db)):
db_app = db.query(models.Application).filter(models.Application.name == app.name).first()
if db_app:
raise HTTPException(status_code=400, detail="Application already exists")
api_key = secrets.token_urlsafe(32)
db_app = models.Application(
name=app.name,
url=app.url,
api_key=api_key
)
db.add(db_app)
db.commit()
db.refresh(db_app)
return db_app
@router.get("/", response_model=List[schemas.ApplicationOut])
async def read_applications(skip: int = 0, limit: int = 100, db: Session = Depends(database.get_db)):
apps = db.query(models.Application).offset(skip).limit(limit).all()
return apps

22
backend/routers/auth.py Normal file
View File

@@ -0,0 +1,22 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from .. import database, models, auth_utils, schemas
from datetime import timedelta
router = APIRouter(tags=["Authentication"])
@router.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(database.get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not auth_utils.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth_utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth_utils.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

31
backend/routers/sso.py Normal file
View File

@@ -0,0 +1,31 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from .. import database, models, schemas, auth_utils
router = APIRouter(tags=["SSO"])
@router.post("/verify", response_model=schemas.SSOVerifyResponse)
async def verify_user(request: schemas.SSOVerifyRequest, db: Session = Depends(database.get_db)):
# 1. Validate API Key
app = db.query(models.Application).filter(models.Application.api_key == request.api_key).first()
if not app:
raise HTTPException(status_code=403, detail="Invalid API Key")
# 2. Validate User Credentials
user = db.query(models.User).filter(models.User.username == request.username).first()
if not user or not auth_utils.verify_password(request.password, user.hashed_password):
return {"authorized": False, "message": "Invalid username or password"}
if not user.is_active:
return {"authorized": False, "message": "User account is inactive"}
# 3. Check Assignment
assignment = db.query(models.UserApplication).filter(
models.UserApplication.user_id == user.id,
models.UserApplication.application_id == app.id
).first()
if not assignment:
return {"authorized": False, "message": "User not authorized for this application"}
return {"authorized": True, "message": "Authorized", "user": user}

78
backend/routers/users.py Normal file
View File

@@ -0,0 +1,78 @@
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from .. import database, models, schemas, auth_utils
from ..services import email
router = APIRouter(
prefix="/users",
tags=["Users"],
dependencies=[Depends(auth_utils.get_current_admin_user)]
)
@router.post("/", response_model=schemas.UserOut)
async def create_user(user: schemas.UserCreate, background_tasks: BackgroundTasks, db: Session = Depends(database.get_db)):
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
hashed_password = auth_utils.get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
hashed_password=hashed_password,
is_active=user.is_active,
is_admin=user.is_admin
)
db.add(db_user)
db.commit()
db.refresh(db_user)
# Send email
background_tasks.add_task(email.send_welcome_email, user.email, user.username, user.password)
return db_user
@router.get("/", response_model=List[schemas.UserOut])
async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(database.get_db)):
users = db.query(models.User).offset(skip).limit(limit).all()
return users
@router.put("/{user_id}", response_model=schemas.UserOut)
async def update_user(user_id: int, user_update: schemas.UserUpdate, background_tasks: BackgroundTasks, db: Session = Depends(database.get_db)):
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
if user_update.password:
db_user.hashed_password = auth_utils.get_password_hash(user_update.password)
# Send email on password change
background_tasks.add_task(email.send_welcome_email, db_user.email, db_user.username, user_update.password)
if user_update.email:
db_user.email = user_update.email
if user_update.username:
db_user.username = user_update.username
if user_update.is_active is not None:
db_user.is_active = user_update.is_active
if user_update.is_admin is not None:
db_user.is_admin = user_update.is_admin
db.commit()
db.refresh(db_user)
return db_user
@router.post("/{user_id}/assign/{app_id}")
async def assign_app_to_user(user_id: int, app_id: int, db: Session = Depends(database.get_db)):
assignment = db.query(models.UserApplication).filter(
models.UserApplication.user_id == user_id,
models.UserApplication.application_id == app_id
).first()
if assignment:
return {"message": "Already assigned"}
new_assignment = models.UserApplication(user_id=user_id, application_id=app_id)
db.add(new_assignment)
db.commit()
return {"message": "Assigned successfully"}

63
backend/schemas.py Normal file
View File

@@ -0,0 +1,63 @@
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime
# User Schemas
class UserBase(BaseModel):
username: str
email: EmailStr
is_active: Optional[bool] = True
is_admin: Optional[bool] = False
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
is_active: Optional[bool] = None
is_admin: Optional[bool] = None
class UserOut(UserBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
# Application Schemas
class ApplicationBase(BaseModel):
name: str
url: str
class ApplicationCreate(ApplicationBase):
pass
class ApplicationOut(ApplicationBase):
id: int
api_key: str
created_at: datetime
class Config:
orm_mode = True
# Token Schema
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# SSO Verification Schema
class SSOVerifyRequest(BaseModel):
username: str
password: str
api_key: str
class SSOVerifyResponse(BaseModel):
authorized: bool
message: str
user: Optional[UserOut] = None

45
backend/services/email.py Normal file
View File

@@ -0,0 +1,45 @@
import aiosmtplib
from email.message import EmailMessage
import os
SMTP_HOST = os.getenv("SMTP_ADDRESS", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER_NAME", "support@nabd-co.com")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "zwziglbpxyfogafc")
async def send_email(to_email: str, subject: str, body: str):
message = EmailMessage()
message["From"] = SMTP_USER
message["To"] = to_email
message["Subject"] = subject
message.set_content(body)
try:
await aiosmtplib.send(
message,
hostname=SMTP_HOST,
port=SMTP_PORT,
username=SMTP_USER,
password=SMTP_PASSWORD,
start_tls=True,
)
print(f"Email sent to {to_email}")
except Exception as e:
print(f"Failed to send email to {to_email}: {e}")
async def send_welcome_email(to_email: str, username: str, password: str):
subject = "Welcome to ASF SSO"
body = f"""
Hello {username},
Your account has been created/updated on the ASF SSO platform.
Username: {username}
Password: {password}
Please login to change your password if needed.
Regards,
ASF Team
"""
await send_email(to_email, subject, body)