"""Authentication service — register, login, refresh, password reset.""" from __future__ import annotations import logging from gotrue.errors import AuthApiError from app.core.config import get_settings from app.core.exceptions import ( BadRequestException, UnauthorizedException, ) from app.core.supabase import get_supabase_admin, get_supabase_client logger = logging.getLogger(__name__) class AuthService: # ── Register ───────────────────────────────────────── def register(self, email: str, password: str, name: str, role: str = "visitor") -> dict: client = get_supabase_client() try: response = client.auth.sign_up({"email": email, "password": password}) except AuthApiError as exc: raise BadRequestException(str(exc)) auth_user = response.user if not auth_user: raise BadRequestException("Registration failed — no user returned") db = get_supabase_admin() user_data = { "id": auth_user.id, "email": email.lower(), "name": name, "role": role, "verified": False, } result = db.table("users").insert(user_data).execute() if not result.data: raise BadRequestException("Failed to create user profile") user = result.data[0] if role == "agency": agency_data = { "user_id": auth_user.id, "name": name, "description": "", "address": "", "phone": "", "email": email.lower(), "verified": False, } db.table("agencies").insert(agency_data).execute() return { "message": "Registration successful. Please check your email to verify your account.", "user": self._sanitize_user(user), } # ── Login ──────────────────────────────────────────── def login(self, email: str, password: str) -> dict: client = get_supabase_client() try: response = client.auth.sign_in_with_password({"email": email, "password": password}) except AuthApiError: raise UnauthorizedException("Invalid email or password") session = response.session auth_user = response.user db = get_supabase_admin() result = db.table("users").select("*").eq("id", auth_user.id).execute() if not result.data: raise UnauthorizedException("User profile not found") user = result.data[0] # Sync verified flag if Supabase has confirmed the email if auth_user.email_confirmed_at and not user.get("verified"): db.table("users").update({"verified": True}).eq("id", auth_user.id).execute() user["verified"] = True return { "access_token": session.access_token, "refresh_token": session.refresh_token, "token_type": "bearer", "user": self._sanitize_user(user), } # ── Refresh ────────────────────────────────────────── def refresh(self, refresh_token: str) -> dict: client = get_supabase_client() try: response = client.auth.refresh_session(refresh_token) except AuthApiError: raise UnauthorizedException("Invalid or expired refresh token") session = response.session auth_user = response.user db = get_supabase_admin() result = db.table("users").select("*").eq("id", auth_user.id).execute() if not result.data: raise UnauthorizedException("User profile not found") user = result.data[0] return { "access_token": session.access_token, "refresh_token": session.refresh_token, "token_type": "bearer", "user": self._sanitize_user(user), } # ── Password reset ─────────────────────────────────── def request_password_reset(self, email: str) -> str: settings = get_settings() redirect_to = f"{settings.FRONTEND_URL}/reset-password" try: get_supabase_client().auth.reset_password_for_email( email, {"redirect_to": redirect_to} ) except AuthApiError as exc: logger.warning("Password reset request error for %s: %s", email, exc) return "If an account with that email exists, a reset link has been sent." def reset_password(self, user_id: str, new_password: str) -> dict: try: get_supabase_admin().auth.admin.update_user_by_id( user_id, {"password": new_password} ) except AuthApiError as exc: raise BadRequestException(str(exc)) return {"message": "Password has been reset successfully"} # ── Change password ────────────────────────────────── def change_password( self, user_id: str, email: str, current_password: str, new_password: str ) -> dict: # Re-authenticate to verify the current password client = get_supabase_client() try: client.auth.sign_in_with_password({"email": email, "password": current_password}) except AuthApiError: raise BadRequestException("Current password is incorrect") try: get_supabase_admin().auth.admin.update_user_by_id( user_id, {"password": new_password} ) except AuthApiError as exc: raise BadRequestException(str(exc)) return {"message": "Password changed successfully"} # ── Resend verification ────────────────────────────── def resend_verification(self, email: str) -> dict: try: get_supabase_client().auth.resend({"type": "signup", "email": email}) except AuthApiError as exc: logger.warning("Failed to resend verification to %s: %s", email, exc) return {"message": "Verification email sent"} # ── Helpers ────────────────────────────────────────── @staticmethod def _sanitize_user(user: dict) -> dict: return { "id": user["id"], "email": user["email"], "name": user["name"], "role": user["role"], "verified": user["verified"], "created_at": user.get("created_at"), }