mirror of
http://88.130.71.182:3000/BlitTech/deals24togo_be.git
synced 2026-06-12 23:33:21 +00:00
76 lines
2.2 KiB
Python
76 lines
2.2 KiB
Python
"""FastAPI dependencies for auth and RBAC."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, Header
|
|
from jose import JWTError, jwt
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.exceptions import ForbiddenException, UnauthorizedException
|
|
from app.core.supabase import get_supabase_admin
|
|
|
|
|
|
async def get_current_user(authorization: Optional[str] = Header(None)) -> dict:
|
|
"""Extract and validate the user from the Supabase-issued JWT."""
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise UnauthorizedException("Missing or invalid authorization header")
|
|
|
|
token = authorization.split(" ", 1)[1]
|
|
settings = get_settings()
|
|
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.SUPABASE_JWT_SECRET,
|
|
algorithms=["HS256"],
|
|
audience="authenticated",
|
|
)
|
|
except JWTError:
|
|
raise UnauthorizedException("Invalid or expired token")
|
|
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
raise UnauthorizedException("Invalid token payload")
|
|
|
|
db = get_supabase_admin()
|
|
result = db.table("users").select("*").eq("id", user_id).execute()
|
|
if not result.data:
|
|
raise UnauthorizedException("User not found")
|
|
|
|
user = result.data[0]
|
|
user.pop("password_hash", None)
|
|
return user
|
|
|
|
|
|
async def get_optional_user(
|
|
authorization: Optional[str] = Header(None),
|
|
) -> Optional[dict]:
|
|
"""Return user if authenticated, None otherwise."""
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
return None
|
|
try:
|
|
return await get_current_user(authorization)
|
|
except UnauthorizedException:
|
|
return None
|
|
|
|
|
|
def require_role(*roles: str):
|
|
"""Return a dependency that enforces one of the given roles."""
|
|
|
|
async def _check(user: dict = Depends(get_current_user)) -> dict:
|
|
if user["role"] not in roles:
|
|
raise ForbiddenException(
|
|
f"Requires one of: {', '.join(roles)}"
|
|
)
|
|
return user
|
|
|
|
return _check
|
|
|
|
|
|
# Convenience aliases
|
|
require_admin = require_role("admin")
|
|
require_agency = require_role("agency")
|
|
require_agency_or_admin = require_role("agency", "admin")
|