mirror of
http://88.130.71.182:3000/BlitTech/deals24togo_be.git
synced 2026-06-12 23:33:21 +00:00
184 lines
6.8 KiB
Python
184 lines
6.8 KiB
Python
"""Authentication service — register, login, refresh, password reset."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from gotrue.errors import AuthApiError
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.exceptions import (
|
|
BadRequestException,
|
|
UnauthorizedException,
|
|
)
|
|
from app.core.supabase import get_supabase_admin, get_supabase_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuthService:
|
|
# ── Register ─────────────────────────────────────────
|
|
|
|
def register(self, email: str, password: str, name: str, role: str = "visitor") -> dict:
|
|
client = get_supabase_client()
|
|
try:
|
|
response = client.auth.sign_up({"email": email, "password": password})
|
|
except AuthApiError as exc:
|
|
raise BadRequestException(str(exc))
|
|
|
|
auth_user = response.user
|
|
if not auth_user:
|
|
raise BadRequestException("Registration failed — no user returned")
|
|
|
|
db = get_supabase_admin()
|
|
user_data = {
|
|
"id": auth_user.id,
|
|
"email": email.lower(),
|
|
"name": name,
|
|
"role": role,
|
|
"verified": False,
|
|
}
|
|
result = db.table("users").insert(user_data).execute()
|
|
if not result.data:
|
|
raise BadRequestException("Failed to create user profile")
|
|
|
|
user = result.data[0]
|
|
|
|
if role == "agency":
|
|
agency_data = {
|
|
"user_id": auth_user.id,
|
|
"name": name,
|
|
"description": "",
|
|
"address": "",
|
|
"phone": "",
|
|
"email": email.lower(),
|
|
"verified": False,
|
|
}
|
|
db.table("agencies").insert(agency_data).execute()
|
|
|
|
return {
|
|
"message": "Registration successful. Please check your email to verify your account.",
|
|
"user": self._sanitize_user(user),
|
|
}
|
|
|
|
# ── Login ────────────────────────────────────────────
|
|
|
|
def login(self, email: str, password: str) -> dict:
|
|
client = get_supabase_client()
|
|
try:
|
|
response = client.auth.sign_in_with_password({"email": email, "password": password})
|
|
except AuthApiError:
|
|
raise UnauthorizedException("Invalid email or password")
|
|
|
|
session = response.session
|
|
auth_user = response.user
|
|
|
|
db = get_supabase_admin()
|
|
result = db.table("users").select("*").eq("id", auth_user.id).execute()
|
|
if not result.data:
|
|
raise UnauthorizedException("User profile not found")
|
|
|
|
user = result.data[0]
|
|
|
|
# Sync verified flag if Supabase has confirmed the email
|
|
if auth_user.email_confirmed_at and not user.get("verified"):
|
|
db.table("users").update({"verified": True}).eq("id", auth_user.id).execute()
|
|
user["verified"] = True
|
|
|
|
return {
|
|
"access_token": session.access_token,
|
|
"refresh_token": session.refresh_token,
|
|
"token_type": "bearer",
|
|
"user": self._sanitize_user(user),
|
|
}
|
|
|
|
# ── Refresh ──────────────────────────────────────────
|
|
|
|
def refresh(self, refresh_token: str) -> dict:
|
|
client = get_supabase_client()
|
|
try:
|
|
response = client.auth.refresh_session(refresh_token)
|
|
except AuthApiError:
|
|
raise UnauthorizedException("Invalid or expired refresh token")
|
|
|
|
session = response.session
|
|
auth_user = response.user
|
|
|
|
db = get_supabase_admin()
|
|
result = db.table("users").select("*").eq("id", auth_user.id).execute()
|
|
if not result.data:
|
|
raise UnauthorizedException("User profile not found")
|
|
|
|
user = result.data[0]
|
|
return {
|
|
"access_token": session.access_token,
|
|
"refresh_token": session.refresh_token,
|
|
"token_type": "bearer",
|
|
"user": self._sanitize_user(user),
|
|
}
|
|
|
|
# ── Password reset ───────────────────────────────────
|
|
|
|
def request_password_reset(self, email: str) -> str:
|
|
settings = get_settings()
|
|
redirect_to = f"{settings.FRONTEND_URL}/reset-password"
|
|
try:
|
|
get_supabase_client().auth.reset_password_for_email(
|
|
email, {"redirect_to": redirect_to}
|
|
)
|
|
except AuthApiError as exc:
|
|
logger.warning("Password reset request error for %s: %s", email, exc)
|
|
return "If an account with that email exists, a reset link has been sent."
|
|
|
|
def reset_password(self, user_id: str, new_password: str) -> dict:
|
|
try:
|
|
get_supabase_admin().auth.admin.update_user_by_id(
|
|
user_id, {"password": new_password}
|
|
)
|
|
except AuthApiError as exc:
|
|
raise BadRequestException(str(exc))
|
|
return {"message": "Password has been reset successfully"}
|
|
|
|
# ── Change password ──────────────────────────────────
|
|
|
|
def change_password(
|
|
self, user_id: str, email: str, current_password: str, new_password: str
|
|
) -> dict:
|
|
# Re-authenticate to verify the current password
|
|
client = get_supabase_client()
|
|
try:
|
|
client.auth.sign_in_with_password({"email": email, "password": current_password})
|
|
except AuthApiError:
|
|
raise BadRequestException("Current password is incorrect")
|
|
|
|
try:
|
|
get_supabase_admin().auth.admin.update_user_by_id(
|
|
user_id, {"password": new_password}
|
|
)
|
|
except AuthApiError as exc:
|
|
raise BadRequestException(str(exc))
|
|
|
|
return {"message": "Password changed successfully"}
|
|
|
|
# ── Resend verification ──────────────────────────────
|
|
|
|
def resend_verification(self, email: str) -> dict:
|
|
try:
|
|
get_supabase_client().auth.resend({"type": "signup", "email": email})
|
|
except AuthApiError as exc:
|
|
logger.warning("Failed to resend verification to %s: %s", email, exc)
|
|
return {"message": "Verification email sent"}
|
|
|
|
# ── Helpers ──────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _sanitize_user(user: dict) -> dict:
|
|
return {
|
|
"id": user["id"],
|
|
"email": user["email"],
|
|
"name": user["name"],
|
|
"role": user["role"],
|
|
"verified": user["verified"],
|
|
"created_at": user.get("created_at"),
|
|
}
|