"""FastAPI dependencies for auth and RBAC.""" from __future__ import annotations from typing import Optional from fastapi import Depends, Header from jose import JWTError, jwt from app.core.config import get_settings from app.core.exceptions import ForbiddenException, UnauthorizedException from app.core.supabase import get_supabase_admin async def get_current_user(authorization: Optional[str] = Header(None)) -> dict: """Extract and validate the user from the Supabase-issued JWT.""" if not authorization or not authorization.startswith("Bearer "): raise UnauthorizedException("Missing or invalid authorization header") token = authorization.split(" ", 1)[1] settings = get_settings() try: payload = jwt.decode( token, settings.SUPABASE_JWT_SECRET, algorithms=["HS256"], audience="authenticated", ) except JWTError: raise UnauthorizedException("Invalid or expired token") user_id = payload.get("sub") if not user_id: raise UnauthorizedException("Invalid token payload") db = get_supabase_admin() result = db.table("users").select("*").eq("id", user_id).execute() if not result.data: raise UnauthorizedException("User not found") user = result.data[0] user.pop("password_hash", None) return user async def get_optional_user( authorization: Optional[str] = Header(None), ) -> Optional[dict]: """Return user if authenticated, None otherwise.""" if not authorization or not authorization.startswith("Bearer "): return None try: return await get_current_user(authorization) except UnauthorizedException: return None def require_role(*roles: str): """Return a dependency that enforces one of the given roles.""" async def _check(user: dict = Depends(get_current_user)) -> dict: if user["role"] not in roles: raise ForbiddenException( f"Requires one of: {', '.join(roles)}" ) return user return _check # Convenience aliases require_admin = require_role("admin") require_agency = require_role("agency") require_agency_or_admin = require_role("agency", "admin")