"""User CRUD service.""" from __future__ import annotations from datetime import datetime, timezone from typing import Optional from app.core.exceptions import ConflictException, ForbiddenException, NotFoundException from app.core.supabase import get_supabase_admin class UserService: def __init__(self): self.db = get_supabase_admin() def get_user(self, user_id: str) -> dict: result = self.db.table("users").select("*").eq("id", user_id).execute() if not result.data: raise NotFoundException("User not found") user = result.data[0] user.pop("password_hash", None) return user def update_user(self, user_id: str, data: dict) -> dict: # Remove None values update_data = {k: v for k, v in data.items() if v is not None} if not update_data: return self.get_user(user_id) # Check email uniqueness if changing email if "email" in update_data: existing = ( self.db.table("users") .select("id") .eq("email", update_data["email"].lower()) .neq("id", user_id) .execute() ) if existing.data: raise ConflictException("A user with this email already exists") update_data["email"] = update_data["email"].lower() update_data["updated_at"] = datetime.now(timezone.utc).isoformat() result = ( self.db.table("users") .update(update_data) .eq("id", user_id) .execute() ) if not result.data: raise NotFoundException("User not found") user = result.data[0] user.pop("password_hash", None) return user def list_users( self, page: int = 1, page_size: int = 20, role: Optional[str] = None, search: Optional[str] = None, ) -> dict: query = self.db.table("users").select("*", count="exact") if role: query = query.eq("role", role) if search: query = query.or_(f"name.ilike.%{search}%,email.ilike.%{search}%") offset = (page - 1) * page_size result = ( query.order("created_at", desc=True) .range(offset, offset + page_size - 1) .execute() ) users = [] for u in result.data: u.pop("password_hash", None) users.append(u) return { "users": users, "total": result.count or 0, "page": page, "page_size": page_size, } def verify_user(self, user_id: str) -> dict: result = ( self.db.table("users") .update({"verified": True, "updated_at": datetime.now(timezone.utc).isoformat()}) .eq("id", user_id) .execute() ) if not result.data: raise NotFoundException("User not found") user = result.data[0] user.pop("password_hash", None) return user def delete_user(self, user_id: str, requester_id: str, requester_role: str) -> dict: if requester_role != "admin" and requester_id != user_id: raise ForbiddenException("Cannot delete other users") if requester_role == "admin" and requester_id == user_id: raise ForbiddenException("Admins cannot delete their own account") result = self.db.table("users").delete().eq("id", user_id).execute() if not result.data: raise NotFoundException("User not found") return {"message": "User deleted"}