""" Admin router — all endpoints require is_admin = TRUE in user_profiles. Bootstrap: after running migration 001, set your admin user in Supabase: UPDATE user_profiles SET is_admin = TRUE WHERE user_id = ''; """ import logging import time from collections import defaultdict from datetime import datetime from typing import Optional, List, Dict from fastapi import APIRouter, Depends, HTTPException, Query from app.dependencies import get_admin_user from app.database import get_supabase from app.models import ( AdminStatsResponse, AdminUserListItem, AdminUserDetail, AdminChangePlanRequest, AdminSuspendRequest, AdminChatbotListItem, AdminSystemHealth, AdminConversationListItem, SuccessResponse, ) from app.services.vector_store import vector_store from app.services.storage import delete_from_storage from app.config import settings router = APIRouter(prefix="/admin", tags=["Admin"]) logger = logging.getLogger(__name__) _app_start_time = time.time() # ── Stats ────────────────────────────────────────────────────────────────────── @router.get("/stats", response_model=AdminStatsResponse) async def get_stats(admin=Depends(get_admin_user)): """Platform-wide statistics.""" supabase = get_supabase() # Total users try: users_resp = supabase.table("user_profiles").select("user_id", count="exact").execute() total_users = users_resp.count or 0 except Exception: total_users = 0 # Total chatbots try: cb_resp = supabase.table("chatbots").select("id", count="exact").execute() total_chatbots = cb_resp.count or 0 pub_resp = supabase.table("chatbots").select("id", count="exact").eq("is_published", True).execute() total_published = pub_resp.count or 0 except Exception: total_chatbots = 0 total_published = 0 # Total conversations try: conv_resp = supabase.table("conversations").select("id", count="exact").execute() total_convos = conv_resp.count or 0 except Exception: total_convos = 0 # Total messages try: msg_resp = supabase.table("messages").select("id", count="exact").execute() total_messages = msg_resp.count or 0 except Exception: total_messages = 0 # Active subscriptions by plan active_subs: Dict[str, int] = defaultdict(int) try: subs_resp = supabase.table("subscriptions").select("plan, status").eq("status", "active").execute() for s in (subs_resp.data or []): active_subs[s["plan"]] += 1 except Exception: pass return AdminStatsResponse( total_users=total_users, total_chatbots=total_chatbots, total_published_chatbots=total_published, total_conversations=total_convos, total_messages=total_messages, active_subscriptions=dict(active_subs), ) # ── Users ────────────────────────────────────────────────────────────────────── @router.get("/users") async def list_users( admin=Depends(get_admin_user), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), search: Optional[str] = None, plan: Optional[str] = None, ): """Paginated list of all users with company and subscription info.""" supabase = get_supabase() offset = (page - 1) * limit try: # Fetch companies (contains owner_id and name) companies_resp = supabase.table("companies").select("id, owner_id, name, website, industry").execute() companies = {c["owner_id"]: c for c in (companies_resp.data or [])} # Fetch subscriptions subs_resp = supabase.table("subscriptions").select("user_id, plan, status").execute() subs = {s["user_id"]: s for s in (subs_resp.data or [])} # Fetch user profiles (suspension, admin flag) profiles_resp = supabase.table("user_profiles").select("user_id, is_admin, suspended_at").execute() profiles = {p["user_id"]: p for p in (profiles_resp.data or [])} # Fetch auth users via admin API try: auth_users_resp = supabase.auth.admin.list_users() auth_users = auth_users_resp if isinstance(auth_users_resp, list) else getattr(auth_users_resp, 'users', []) except Exception as e: logger.warning(f"Could not list auth users: {e}") auth_users = [] # Fetch chatbot counts per company cb_resp = supabase.table("chatbots").select("company_id").execute() cb_by_company: Dict[str, int] = defaultdict(int) for cb in (cb_resp.data or []): cb_by_company[cb["company_id"]] += 1 # Fetch conversation counts per chatbot (to get per-user conv count) chatbots_resp = supabase.table("chatbots").select("id, company_id").execute() chatbot_company_map = {cb["id"]: cb["company_id"] for cb in (chatbots_resp.data or [])} conv_resp = supabase.table("conversations").select("chatbot_id", count="exact").execute() conv_by_chatbot: Dict[str, int] = defaultdict(int) for conv in (conv_resp.data or []): conv_by_chatbot[conv["chatbot_id"]] += 1 conv_by_company: Dict[str, int] = defaultdict(int) for cb_id, count in conv_by_chatbot.items(): company_id = chatbot_company_map.get(cb_id) if company_id: conv_by_company[company_id] += count # Build user list users_list = [] for auth_user in auth_users: uid = getattr(auth_user, "id", None) or auth_user.get("id", "") email = getattr(auth_user, "email", None) or auth_user.get("email", "") created_at = getattr(auth_user, "created_at", None) or auth_user.get("created_at") # Apply filters if search and search.lower() not in email.lower(): company_info = companies.get(uid, {}) if search.lower() not in (company_info.get("name") or "").lower(): continue sub_info = subs.get(uid, {}) user_plan = sub_info.get("plan", "free") if plan and user_plan != plan: continue company_info = companies.get(uid, {}) profile_info = profiles.get(uid, {}) users_list.append(AdminUserListItem( id=uid, email=email, company_name=company_info.get("name"), plan=user_plan, subscription_status=sub_info.get("status", "active"), chatbot_count=cb_by_company.get(company_info.get("id", ""), 0), conversations_count=conv_by_company.get(company_info.get("id", ""), 0), is_suspended=bool(profile_info.get("suspended_at")), is_admin=bool(profile_info.get("is_admin", False)), created_at=created_at, )) total = len(users_list) paginated = users_list[offset:offset + limit] return { "users": [u.model_dump() for u in paginated], "total": total, "page": page, "pages": max(1, (total + limit - 1) // limit), } except Exception as e: logger.error(f"Admin list_users error: {e}") raise HTTPException(status_code=500, detail="Failed to fetch users") @router.get("/users/{user_id}", response_model=AdminUserDetail) async def get_user(user_id: str, admin=Depends(get_admin_user)): """Detailed info about a specific user.""" supabase = get_supabase() try: auth_user = supabase.auth.admin.get_user_by_id(user_id) auth_u = getattr(auth_user, "user", auth_user) email = getattr(auth_u, "email", "") or auth_u.get("email", "") created_at = getattr(auth_u, "created_at", None) or auth_u.get("created_at") except Exception: email = "" created_at = None company = supabase.table("companies").select("*").eq("owner_id", user_id).execute() company_info = company.data[0] if company.data else {} sub = supabase.table("subscriptions").select("plan, status").eq("user_id", user_id).execute() sub_info = sub.data[0] if sub.data else {} profile = supabase.table("user_profiles").select("is_admin, suspended_at").eq("user_id", user_id).execute() profile_info = profile.data[0] if profile.data else {} chatbots = [] if company_info.get("id"): cb_resp = supabase.table("chatbots").select("id, name, is_published, created_at") \ .eq("company_id", company_info["id"]).execute() chatbots = cb_resp.data or [] return AdminUserDetail( id=user_id, email=email, company_name=company_info.get("name"), website=company_info.get("website"), industry=company_info.get("industry"), plan=sub_info.get("plan", "free"), subscription_status=sub_info.get("status", "active"), chatbot_count=len(chatbots), conversations_count=0, is_suspended=bool(profile_info.get("suspended_at")), is_admin=bool(profile_info.get("is_admin", False)), created_at=created_at, chatbots=chatbots, ) @router.patch("/users/{user_id}/plan") async def change_user_plan(user_id: str, data: AdminChangePlanRequest, admin=Depends(get_admin_user)): """Manually grant or change a user's subscription plan.""" valid_plans = ["free", "starter", "business", "agency", "enterprise"] if data.plan not in valid_plans: raise HTTPException(status_code=400, detail=f"Invalid plan. Must be one of: {valid_plans}") supabase = get_supabase() try: supabase.table("subscriptions").upsert({ "user_id": user_id, "plan": data.plan, "status": "active", }, on_conflict="user_id").execute() except Exception as e: logger.error(f"Failed to change plan for {user_id}: {e}") raise HTTPException(status_code=500, detail="Failed to update plan") logger.info(f"Admin {admin.id} changed plan for user {user_id} to {data.plan}. Reason: {data.reason}") return {"message": f"Plan updated to {data.plan}", "user_id": user_id, "plan": data.plan} @router.post("/users/{user_id}/suspend") async def suspend_user(user_id: str, data: AdminSuspendRequest, admin=Depends(get_admin_user)): """Suspend or unsuspend a user account.""" supabase = get_supabase() update_data: dict = {"updated_at": datetime.utcnow().isoformat()} if data.suspend: update_data["suspended_at"] = datetime.utcnow().isoformat() if data.reason: update_data["suspended_reason"] = data.reason action = "suspended" else: update_data["suspended_at"] = None update_data["suspended_reason"] = None action = "unsuspended" try: supabase.table("user_profiles").upsert( {"user_id": user_id, **update_data}, on_conflict="user_id" ).execute() except Exception as e: logger.error(f"Failed to {action} user {user_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to {action} user") logger.info(f"Admin {admin.id} {action} user {user_id}") return {"message": f"User {action}", "user_id": user_id} @router.delete("/users/{user_id}", response_model=SuccessResponse) async def delete_user(user_id: str, admin=Depends(get_admin_user)): """Permanently delete a user and all their data.""" supabase = get_supabase() # 1. Get company company = supabase.table("companies").select("id").eq("owner_id", user_id).execute() company_id = company.data[0]["id"] if company.data else None if company_id: # 2. Get all chatbots and clean up Qdrant + storage chatbots = supabase.table("chatbots").select("id, qdrant_collection_name, logo_url") \ .eq("company_id", company_id).execute() for cb in (chatbots.data or []): if cb.get("qdrant_collection_name"): try: vector_store.delete_collection(cb["qdrant_collection_name"]) except Exception as e: logger.warning(f"Failed to delete Qdrant collection for chatbot {cb['id']}: {e}") if cb.get("logo_url"): delete_from_storage(supabase, "logos", cb["logo_url"]) # 3. Delete documents from storage docs = supabase.table("documents").select("file_url") \ .in_("chatbot_id", [cb["id"] for cb in (chatbots.data or [])]).execute() for doc in (docs.data or []): if doc.get("file_url"): delete_from_storage(supabase, "documents", doc["file_url"]) # 4. Delete company (cascades to chatbots, documents, conversations) supabase.table("companies").delete().eq("id", company_id).execute() # 5. Delete subscription supabase.table("subscriptions").delete().eq("user_id", user_id).execute() # 6. Delete user profile supabase.table("user_profiles").delete().eq("user_id", user_id).execute() # 7. Delete auth user try: supabase.auth.admin.delete_user(user_id) except Exception as e: logger.error(f"Failed to delete auth user {user_id}: {e}") raise HTTPException(status_code=500, detail="Failed to delete auth user") logger.info(f"Admin {admin.id} deleted user {user_id}") return SuccessResponse(success=True, message="User deleted successfully") # ── Chatbots ─────────────────────────────────────────────────────────────────── @router.get("/chatbots") async def list_all_chatbots( admin=Depends(get_admin_user), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), search: Optional[str] = None, ): """Paginated list of ALL chatbots across all users.""" supabase = get_supabase() offset = (page - 1) * limit try: # Get chatbots with company info q = supabase.table("chatbots").select("*, companies(name, owner_id)") \ .order("created_at", desc=True) result = q.execute() all_chatbots = result.data or [] # Apply search filter if search: s = search.lower() all_chatbots = [ cb for cb in all_chatbots if s in (cb.get("name") or "").lower() or s in (cb.get("companies", {}) or {}).get("name", "").lower() ] total = len(all_chatbots) paginated = all_chatbots[offset:offset + limit] # Get owner emails for paginated set owner_ids = list({(cb.get("companies") or {}).get("owner_id") for cb in paginated if cb.get("companies")}) owner_emails: Dict[str, str] = {} if owner_ids: try: for oid in owner_ids: try: u = supabase.auth.admin.get_user_by_id(oid) u_obj = getattr(u, "user", u) owner_emails[oid] = getattr(u_obj, "email", "") or u_obj.get("email", "") except Exception: pass except Exception: pass # Get doc and conv counts for paginated chatbots cb_ids = [cb["id"] for cb in paginated] doc_counts: Dict[str, int] = defaultdict(int) conv_counts: Dict[str, int] = defaultdict(int) if cb_ids: docs_resp = supabase.table("documents").select("chatbot_id").in_("chatbot_id", cb_ids).execute() for d in (docs_resp.data or []): doc_counts[d["chatbot_id"]] += 1 convs_resp = supabase.table("conversations").select("chatbot_id").in_("chatbot_id", cb_ids).execute() for c in (convs_resp.data or []): conv_counts[c["chatbot_id"]] += 1 items = [] for cb in paginated: company_info = cb.get("companies") or {} owner_id = company_info.get("owner_id") items.append(AdminChatbotListItem( id=cb["id"], name=cb.get("name", ""), owner_email=owner_emails.get(owner_id), company_name=company_info.get("name"), is_published=cb.get("is_published", False), document_count=doc_counts[cb["id"]], conversation_count=conv_counts[cb["id"]], created_at=cb.get("created_at"), )) return { "chatbots": [i.model_dump() for i in items], "total": total, "page": page, "pages": max(1, (total + limit - 1) // limit), } except Exception as e: logger.error(f"Admin list_chatbots error: {e}") raise HTTPException(status_code=500, detail="Failed to fetch chatbots") @router.delete("/chatbots/{chatbot_id}", response_model=SuccessResponse) async def delete_chatbot_admin(chatbot_id: str, admin=Depends(get_admin_user)): """Force-delete any chatbot regardless of ownership.""" supabase = get_supabase() cb = supabase.table("chatbots").select("*").eq("id", chatbot_id).execute() if not cb.data: raise HTTPException(status_code=404, detail="Chatbot not found") chatbot = cb.data[0] # Delete Qdrant collection if chatbot.get("qdrant_collection_name"): try: vector_store.delete_collection(chatbot["qdrant_collection_name"]) except Exception as e: logger.warning(f"Failed to delete Qdrant collection: {e}") # Delete logo from storage if chatbot.get("logo_url"): delete_from_storage(supabase, "logos", chatbot["logo_url"]) supabase.table("chatbots").delete().eq("id", chatbot_id).execute() logger.info(f"Admin {admin.id} deleted chatbot {chatbot_id}") return SuccessResponse(success=True, message="Chatbot deleted") # ── Conversations ────────────────────────────────────────────────────────────── @router.get("/conversations") async def list_conversations( admin=Depends(get_admin_user), page: int = Query(1, ge=1), limit: int = Query(50, ge=1, le=200), ): """Recent conversations across all chatbots.""" supabase = get_supabase() offset = (page - 1) * limit try: result = supabase.table("conversations") \ .select("*, chatbots(name)") \ .order("created_at", desc=True) \ .range(offset, offset + limit - 1) \ .execute() convos = result.data or [] # Get first message for each conversation conv_ids = [c["id"] for c in convos] first_msgs: Dict[str, str] = {} if conv_ids: msgs_resp = supabase.table("messages") \ .select("conversation_id, content, role") \ .in_("conversation_id", conv_ids) \ .eq("role", "user") \ .order("created_at", desc=False) \ .execute() seen = set() for m in (msgs_resp.data or []): cid = m["conversation_id"] if cid not in seen: seen.add(cid) first_msgs[cid] = (m.get("content") or "")[:120] # Total count count_resp = supabase.table("conversations").select("id", count="exact").execute() total = count_resp.count or 0 items = [ AdminConversationListItem( id=c["id"], chatbot_name=(c.get("chatbots") or {}).get("name"), session_id=c.get("session_id"), language=c.get("language"), message_count=c.get("message_count", 0), created_at=c.get("created_at"), first_message=first_msgs.get(c["id"]), ) for c in convos ] return { "conversations": [i.model_dump() for i in items], "total": total, "page": page, "pages": max(1, (total + limit - 1) // limit), } except Exception as e: logger.error(f"Admin list_conversations error: {e}") raise HTTPException(status_code=500, detail="Failed to fetch conversations") # ── System Health ────────────────────────────────────────────────────────────── @router.get("/system/health", response_model=AdminSystemHealth) async def system_health(admin=Depends(get_admin_user)): """Check health of all system components.""" # Check database db_status = "unhealthy" try: supabase = get_supabase() supabase.table("subscriptions").select("id").limit(1).execute() db_status = "healthy" except Exception as e: logger.warning(f"DB health check failed: {e}") # Check Qdrant qdrant_status = "unhealthy" try: vector_store.client.get_collections() qdrant_status = "healthy" except Exception as e: logger.warning(f"Qdrant health check failed: {e}") # Check LLM provider API key availability llm_providers = { "openai": bool(getattr(settings, "openai_api_key", None)), "anthropic": bool(getattr(settings, "anthropic_api_key", None)), "google": bool(getattr(settings, "google_api_key", None)), "fireworks": bool(getattr(settings, "fireworks_api_key", None)), } return AdminSystemHealth( db=db_status, qdrant=qdrant_status, llm_providers=llm_providers, timestamp=datetime.utcnow(), )