Files
contexta_be/app/routers/chat.py
belviskhoremk 9dccc83293 updates Mar6
2026-03-06 22:37:40 +00:00

337 lines
12 KiB
Python

import time
from collections import defaultdict
from fastapi import APIRouter, HTTPException, Depends, Request
from app.models import ChatMessage, ChatResponse, ConversationResponse, MessageResponse, FeedbackCreate
from app.database import get_supabase
from app.dependencies import get_current_user, get_optional_user
from app.services.rag import rag_engine
from app.config import PLAN_LIMITS
from typing import List, Optional
from datetime import datetime
import uuid
import logging
logger = logging.getLogger(__name__)
router = APIRouter(tags=["Chat"])
# ── Simple in-memory rate limiter ────────────────────────────────────────────
_rate_store: dict = defaultdict(list)
_RATE_LIMIT = 30 # max requests
_RATE_WINDOW = 60 # per second window
def _check_rate_limit(client_ip: str):
now = time.time()
_rate_store[client_ip] = [t for t in _rate_store[client_ip] if now - t < _RATE_WINDOW]
if len(_rate_store[client_ip]) >= _RATE_LIMIT:
raise HTTPException(
status_code=429,
detail="Too many requests. Please wait before sending more messages.",
)
_rate_store[client_ip].append(now)
def _check_conversation_limit(chatbot: dict, supabase):
"""Check if the chatbot owner has remaining conversation quota this month."""
company_id = chatbot.get("company_id")
if not company_id:
return
company = supabase.table("companies").select("owner_id").eq("id", company_id).execute()
if not company.data:
return
owner_id = company.data[0]["owner_id"]
sub = supabase.table("subscriptions").select("plan").eq("user_id", owner_id).eq("status", "active").execute()
plan = sub.data[0]["plan"] if sub.data else "free"
limit = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("conversations_limit", 100)
if limit >= 999999:
return # unlimited
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat()
chatbots = supabase.table("chatbots").select("id").eq("company_id", company_id).execute()
chatbot_ids = [c["id"] for c in (chatbots.data or [])]
if not chatbot_ids:
return
count_result = supabase.table("conversations").select("id", count="exact") \
.in_("chatbot_id", chatbot_ids) \
.gte("created_at", month_start) \
.execute()
used = count_result.count or 0
if used >= limit:
raise HTTPException(
status_code=429,
detail=f"This chatbot's monthly conversation limit ({limit}) has been reached. Please try again next month.",
)
def _get_public_chatbot(chatbot_id: str, supabase) -> dict:
"""Get a published chatbot (or any chatbot for preview)"""
result = supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Chatbot not found")
return result.data[0]
@router.post("/chat/{chatbot_id}", response_model=ChatResponse)
async def chat(
chatbot_id: str,
message: ChatMessage,
request: Request,
user=Depends(get_optional_user),
):
# Rate limiting by IP
client_ip = request.client.host if request.client else "unknown"
_check_rate_limit(client_ip)
supabase = get_supabase()
chatbot = _get_public_chatbot(chatbot_id, supabase)
# Allow preview access for owner, require published for public
if not chatbot.get("is_published"):
if not user:
raise HTTPException(status_code=403, detail="This chatbot is in preview mode")
# Check ownership
company = supabase.table("companies").select("id").eq("owner_id", user.id).execute()
if not company.data or company.data[0]["id"] != chatbot.get("company_id"):
raise HTTPException(status_code=403, detail="This chatbot is in preview mode")
collection_name = chatbot.get("qdrant_collection_name")
if not collection_name:
raise HTTPException(status_code=400, detail="Chatbot has no knowledge base configured")
# Get or create conversation
session_id = message.session_id or str(uuid.uuid4())
# Only enforce conversation limit for new sessions (not ongoing ones)
is_existing = supabase.table("conversations").select("id") \
.eq("chatbot_id", chatbot_id).eq("session_id", session_id).execute()
if not is_existing.data:
_check_conversation_limit(chatbot, supabase)
conversation = _get_or_create_conversation(
chatbot_id=chatbot_id,
session_id=session_id,
user_id=user.id if user else None,
language=message.language,
supabase=supabase,
)
# Get conversation history
history = _get_conversation_history(conversation["id"], supabase)
# Get company info for context
company_data = chatbot.get("companies", {}) or {}
chatbot_config = {
**chatbot,
"company_name": company_data.get("name", ""),
}
# Run RAG
result = await rag_engine.process_query(
query=message.message,
collection_name=collection_name,
chatbot_config=chatbot_config,
conversation_history=history,
language=message.language,
)
# Compute confidence score
confidence_score = max((s.score for s in result.get("sources", [])), default=0.0)
# Check handoff
is_handoff = False
if chatbot.get("handoff_enabled"):
handoff_keywords = chatbot.get("handoff_keywords", [])
msg_lower = message.message.lower()
if any(kw.lower() in msg_lower for kw in handoff_keywords):
is_handoff = True
# Fire n8n notification (async, non-blocking)
try:
from app.services.n8n_service import send_handoff_notification
from app.config import settings as _settings
company_data_for_handoff = chatbot.get("companies", {}) or {}
await send_handoff_notification(
chatbot_name=chatbot.get("name", ""),
owner_email=chatbot.get("handoff_email") or "",
conversation_history=history,
trigger_message=message.message,
chatbot_id=chatbot_id,
conversation_id=conversation["id"],
webhook_url=_settings.n8n_handoff_webhook_url,
)
except Exception:
pass # never block chat on handoff failure
# Check lead capture
needs_lead_capture = False
if (
chatbot.get("lead_capture_enabled")
and chatbot.get("lead_capture_trigger") == "after_first_message"
and len(history) == 0
):
needs_lead_capture = True
# Save messages
_save_message(conversation["id"], "user", message.message, supabase)
_save_message(
conversation["id"],
"assistant",
result["response"],
supabase,
sources=[s.model_dump() for s in result.get("sources", [])],
model=result.get("model", ""),
confidence_score=confidence_score,
is_handoff=is_handoff,
)
# Update conversation message count
supabase.table("conversations").update({
"message_count": len(history) + 2
}).eq("id", conversation["id"]).execute()
return ChatResponse(
response=result["response"],
session_id=session_id,
sources=result.get("sources", []),
model_used=result.get("model", ""),
tokens_used=result.get("tokens_used", 0),
needs_lead_capture=needs_lead_capture,
handoff=is_handoff,
)
@router.get("/chat/{chatbot_id}/history/{session_id}", response_model=List[MessageResponse])
async def get_chat_history(
chatbot_id: str,
session_id: str,
user=Depends(get_optional_user),
):
supabase = get_supabase()
conversation = supabase.table("conversations").select("*") \
.eq("chatbot_id", chatbot_id) \
.eq("session_id", session_id) \
.execute()
if not conversation.data:
return []
conv_id = conversation.data[0]["id"]
messages = supabase.table("messages").select("*") \
.eq("conversation_id", conv_id) \
.order("created_at", desc=False) \
.execute()
return [
MessageResponse(
id=m["id"],
role=m["role"],
content=m["content"],
sources=m.get("sources"),
created_at=m.get("created_at"),
)
for m in (messages.data or [])
]
@router.post("/chat/{chatbot_id}/feedback")
async def submit_feedback(chatbot_id: str, data: FeedbackCreate):
"""Submit feedback (thumbs up/down) for a message. No auth required."""
import uuid as _uuid
if data.feedback not in ("positive", "negative"):
raise HTTPException(status_code=400, detail="Feedback must be 'positive' or 'negative'")
supabase = get_supabase()
# Verify message exists
msg = supabase.table("messages").select("id, conversation_id").eq("id", data.message_id).execute()
if not msg.data:
raise HTTPException(status_code=404, detail="Message not found")
supabase.table("message_feedback").insert({
"id": str(_uuid.uuid4()),
"message_id": data.message_id,
"chatbot_id": chatbot_id,
"feedback": data.feedback,
}).execute()
return {"success": True}
# ── OLD analytics endpoint REMOVED ───────────────────────────────────────────
# The /analytics/{chatbot_id} endpoint that was here has been replaced by
# the dedicated analytics router (app/routers/analytics.py) which provides:
# GET /api/v1/analytics/overview — All chatbots overview
# GET /api/v1/analytics/chatbot/{id} — Single chatbot detail
# The old endpoint conflicted with the new router because FastAPI matched
# "overview" as a chatbot_id UUID, causing a 500 error.
# ─────────────────────────────────────────────────────────────────────────────
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_or_create_conversation(
chatbot_id: str,
session_id: str,
user_id: Optional[str],
language: str,
supabase,
) -> dict:
existing = supabase.table("conversations").select("*") \
.eq("chatbot_id", chatbot_id) \
.eq("session_id", session_id) \
.execute()
if existing.data:
return existing.data[0]
new_conv = {
"id": str(uuid.uuid4()),
"chatbot_id": chatbot_id,
"user_id": user_id,
"session_id": session_id,
"language": language,
"message_count": 0,
}
result = supabase.table("conversations").insert(new_conv).execute()
return result.data[0]
def _get_conversation_history(conversation_id: str, supabase) -> List[dict]:
"""
Returns conversation history in chronological order (oldest first)
for the LLM to correctly understand the conversation flow.
"""
messages = supabase.table("messages").select("role, content") \
.eq("conversation_id", conversation_id) \
.order("created_at", desc=False) \
.limit(20) \
.execute()
return messages.data or []
def _save_message(
conversation_id: str,
role: str,
content: str,
supabase,
sources: Optional[list] = None,
model: str = "",
confidence_score: Optional[float] = None,
is_handoff: bool = False,
):
supabase.table("messages").insert({
"id": str(uuid.uuid4()),
"conversation_id": conversation_id,
"role": role,
"content": content,
"sources": sources,
"model": model,
"confidence_score": confidence_score,
"is_handoff": is_handoff,
}).execute()