import time from collections import defaultdict from fastapi import APIRouter, HTTPException, Depends, Request from app.models import ChatMessage, ChatResponse, ConversationResponse, MessageResponse, FeedbackCreate from app.database import get_supabase from app.dependencies import get_current_user, get_optional_user from app.services.rag import rag_engine from app.config import PLAN_LIMITS from typing import List, Optional from datetime import datetime import uuid import logging logger = logging.getLogger(__name__) router = APIRouter(tags=["Chat"]) # ── Simple in-memory rate limiter ──────────────────────────────────────────── _rate_store: dict = defaultdict(list) _RATE_LIMIT = 30 # max requests _RATE_WINDOW = 60 # per second window def _check_rate_limit(client_ip: str): now = time.time() _rate_store[client_ip] = [t for t in _rate_store[client_ip] if now - t < _RATE_WINDOW] if len(_rate_store[client_ip]) >= _RATE_LIMIT: raise HTTPException( status_code=429, detail="Too many requests. Please wait before sending more messages.", ) _rate_store[client_ip].append(now) def _check_conversation_limit(chatbot: dict, supabase): """Check if the chatbot owner has remaining conversation quota this month.""" company_id = chatbot.get("company_id") if not company_id: return company = supabase.table("companies").select("owner_id").eq("id", company_id).execute() if not company.data: return owner_id = company.data[0]["owner_id"] sub = supabase.table("subscriptions").select("plan").eq("user_id", owner_id).eq("status", "active").execute() plan = sub.data[0]["plan"] if sub.data else "free" limit = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("conversations_limit", 100) if limit >= 999999: return # unlimited month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat() chatbots = supabase.table("chatbots").select("id").eq("company_id", company_id).execute() chatbot_ids = [c["id"] for c in (chatbots.data or [])] if not chatbot_ids: return count_result = supabase.table("conversations").select("id", count="exact") \ .in_("chatbot_id", chatbot_ids) \ .gte("created_at", month_start) \ .execute() used = count_result.count or 0 if used >= limit: raise HTTPException( status_code=429, detail=f"This chatbot's monthly conversation limit ({limit}) has been reached. Please try again next month.", ) def _get_public_chatbot(chatbot_id: str, supabase) -> dict: """Get a published chatbot (or any chatbot for preview)""" result = supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute() if not result.data: raise HTTPException(status_code=404, detail="Chatbot not found") return result.data[0] @router.post("/chat/{chatbot_id}", response_model=ChatResponse) async def chat( chatbot_id: str, message: ChatMessage, request: Request, user=Depends(get_optional_user), ): # Rate limiting by IP client_ip = request.client.host if request.client else "unknown" _check_rate_limit(client_ip) supabase = get_supabase() chatbot = _get_public_chatbot(chatbot_id, supabase) # Allow preview access for owner, require published for public if not chatbot.get("is_published"): if not user: raise HTTPException(status_code=403, detail="This chatbot is in preview mode") # Check ownership company = supabase.table("companies").select("id").eq("owner_id", user.id).execute() if not company.data or company.data[0]["id"] != chatbot.get("company_id"): raise HTTPException(status_code=403, detail="This chatbot is in preview mode") collection_name = chatbot.get("qdrant_collection_name") if not collection_name: raise HTTPException(status_code=400, detail="Chatbot has no knowledge base configured") # Get or create conversation session_id = message.session_id or str(uuid.uuid4()) # Only enforce conversation limit for new sessions (not ongoing ones) is_existing = supabase.table("conversations").select("id") \ .eq("chatbot_id", chatbot_id).eq("session_id", session_id).execute() if not is_existing.data: _check_conversation_limit(chatbot, supabase) conversation = _get_or_create_conversation( chatbot_id=chatbot_id, session_id=session_id, user_id=user.id if user else None, language=message.language, supabase=supabase, ) # Get conversation history history = _get_conversation_history(conversation["id"], supabase) # If an agent has taken over this conversation, stop the bot from responding conv_status = conversation.get("status", "open") if conv_status == "agent_handling": return ChatResponse( response="", session_id=session_id, sources=[], model_used="", tokens_used=0, needs_lead_capture=False, handoff=False, ) # Get company info for context company_data = chatbot.get("companies", {}) or {} chatbot_config = { **chatbot, "company_name": company_data.get("name", ""), } # If booking is enabled, inject a note into the system prompt so the bot # can guide users to the booking page if chatbot.get("booking_enabled"): from app.config import settings as _cfg booking_url = f"{_cfg.app_url}/book/{chatbot_id}" booking_note = ( f"\n\nAppointment booking: This business accepts appointments online. " f"If the user wants to book an appointment, meeting, or consultation, " f"provide them this booking link: {booking_url}" ) existing_prompt = chatbot_config.get("system_prompt") or "" chatbot_config["system_prompt"] = existing_prompt + booking_note # Run RAG result = await rag_engine.process_query( query=message.message, collection_name=collection_name, chatbot_config=chatbot_config, conversation_history=history, language=message.language, ) # Compute confidence score confidence_score = max((s.score for s in result.get("sources", [])), default=0.0) # Check handoff is_handoff = False if chatbot.get("handoff_enabled"): handoff_keywords = chatbot.get("handoff_keywords", []) msg_lower = message.message.lower() if any(kw.lower() in msg_lower for kw in handoff_keywords): is_handoff = True # Fire n8n notification (async, non-blocking) try: from app.services.n8n_service import send_handoff_notification from app.config import settings as _settings company_data_for_handoff = chatbot.get("companies", {}) or {} await send_handoff_notification( chatbot_name=chatbot.get("name", ""), owner_email=chatbot.get("handoff_email") or "", conversation_history=history, trigger_message=message.message, chatbot_id=chatbot_id, conversation_id=conversation["id"], webhook_url=_settings.n8n_handoff_webhook_url, ) except Exception: pass # never block chat on handoff failure # Check lead capture needs_lead_capture = False if ( chatbot.get("lead_capture_enabled") and chatbot.get("lead_capture_trigger") == "after_first_message" and len(history) == 0 ): needs_lead_capture = True # Save messages _save_message(conversation["id"], "user", message.message, supabase) _save_message( conversation["id"], "assistant", result["response"], supabase, sources=[s.model_dump() for s in result.get("sources", [])], model=result.get("model", ""), confidence_score=confidence_score, is_handoff=is_handoff, ) # Update conversation message count supabase.table("conversations").update({ "message_count": len(history) + 2 }).eq("id", conversation["id"]).execute() return ChatResponse( response=result["response"], session_id=session_id, sources=result.get("sources", []), model_used=result.get("model", ""), tokens_used=result.get("tokens_used", 0), needs_lead_capture=needs_lead_capture, handoff=is_handoff, ) @router.get("/chat/{chatbot_id}/history/{session_id}", response_model=List[MessageResponse]) async def get_chat_history( chatbot_id: str, session_id: str, user=Depends(get_optional_user), ): supabase = get_supabase() conversation = supabase.table("conversations").select("*") \ .eq("chatbot_id", chatbot_id) \ .eq("session_id", session_id) \ .execute() if not conversation.data: return [] conv_id = conversation.data[0]["id"] messages = supabase.table("messages").select("*") \ .eq("conversation_id", conv_id) \ .order("created_at", desc=False) \ .execute() return [ MessageResponse( id=m["id"], role=m["role"], content=m["content"], sources=m.get("sources"), created_at=m.get("created_at"), ) for m in (messages.data or []) ] @router.post("/chat/{chatbot_id}/feedback") async def submit_feedback(chatbot_id: str, data: FeedbackCreate): """Submit feedback (thumbs up/down) for a message. No auth required.""" import uuid as _uuid if data.feedback not in ("positive", "negative"): raise HTTPException(status_code=400, detail="Feedback must be 'positive' or 'negative'") supabase = get_supabase() # Verify message exists msg = supabase.table("messages").select("id, conversation_id").eq("id", data.message_id).execute() if not msg.data: raise HTTPException(status_code=404, detail="Message not found") supabase.table("message_feedback").insert({ "id": str(_uuid.uuid4()), "message_id": data.message_id, "chatbot_id": chatbot_id, "feedback": data.feedback, }).execute() return {"success": True} # ── OLD analytics endpoint REMOVED ─────────────────────────────────────────── # The /analytics/{chatbot_id} endpoint that was here has been replaced by # the dedicated analytics router (app/routers/analytics.py) which provides: # GET /api/v1/analytics/overview — All chatbots overview # GET /api/v1/analytics/chatbot/{id} — Single chatbot detail # The old endpoint conflicted with the new router because FastAPI matched # "overview" as a chatbot_id UUID, causing a 500 error. # ───────────────────────────────────────────────────────────────────────────── # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_or_create_conversation( chatbot_id: str, session_id: str, user_id: Optional[str], language: str, supabase, ) -> dict: existing = supabase.table("conversations").select("*") \ .eq("chatbot_id", chatbot_id) \ .eq("session_id", session_id) \ .execute() if existing.data: return existing.data[0] new_conv = { "id": str(uuid.uuid4()), "chatbot_id": chatbot_id, "user_id": user_id, "session_id": session_id, "language": language, "message_count": 0, } result = supabase.table("conversations").insert(new_conv).execute() return result.data[0] def _get_conversation_history(conversation_id: str, supabase) -> List[dict]: """ Returns conversation history in chronological order (oldest first) for the LLM to correctly understand the conversation flow. """ messages = supabase.table("messages").select("role, content") \ .eq("conversation_id", conversation_id) \ .order("created_at", desc=False) \ .limit(20) \ .execute() return messages.data or [] def _save_message( conversation_id: str, role: str, content: str, supabase, sources: Optional[list] = None, model: str = "", confidence_score: Optional[float] = None, is_handoff: bool = False, ): supabase.table("messages").insert({ "id": str(uuid.uuid4()), "conversation_id": conversation_id, "role": role, "content": content, "sources": sources, "model": model, "confidence_score": confidence_score, "is_handoff": is_handoff, }).execute()