mirror of
http://88.130.71.182:3000/BlitTech/badoHair_be.git
synced 2026-06-13 08:34:29 +00:00
57 lines
1.6 KiB
Python
57 lines
1.6 KiB
Python
from typing import AsyncGenerator
|
|
import asyncpg
|
|
from fastapi import Depends, Security
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
|
from app.core.security import decode_token
|
|
from app.database import get_pool
|
|
from app.exceptions import UnauthorizedError, ForbiddenError, UserBlockedError
|
|
|
|
_bearer = HTTPBearer()
|
|
_bearer_optional = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
yield conn
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Security(_bearer),
|
|
db: asyncpg.Connection = Depends(get_db),
|
|
) -> dict:
|
|
payload = decode_token(credentials.credentials)
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
raise UnauthorizedError()
|
|
|
|
profile = await db.fetchrow(
|
|
"SELECT id, email, full_name, phone, role, is_blocked FROM profiles WHERE id = $1",
|
|
user_id,
|
|
)
|
|
if not profile:
|
|
raise UnauthorizedError("User profile not found")
|
|
if profile["is_blocked"]:
|
|
raise UserBlockedError()
|
|
|
|
return dict(profile)
|
|
|
|
|
|
async def require_admin(user: dict = Depends(get_current_user)) -> dict:
|
|
if user["role"] != "admin":
|
|
raise ForbiddenError()
|
|
return user
|
|
|
|
|
|
async def get_current_user_optional(
|
|
credentials: HTTPAuthorizationCredentials | None = Security(_bearer_optional),
|
|
db: asyncpg.Connection = Depends(get_db),
|
|
) -> dict | None:
|
|
if not credentials:
|
|
return None
|
|
try:
|
|
return await get_current_user(credentials, db)
|
|
except Exception:
|
|
return None
|