mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-13 10:06:14 +00:00
- Add new routers: admin, appointments, campaigns - Add storage service and logging config - Add migrations directory and test suite with pytest config - Add supabase_migration_features.sql - Update models, dependencies, config, and existing routers - Remove whatsapp_service (deleted) - Update pyproject.toml and uv.lock dependencies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
363 lines
13 KiB
Python
363 lines
13 KiB
Python
import time
|
|
from collections import defaultdict
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Request
|
|
from app.models import ChatMessage, ChatResponse, ConversationResponse, MessageResponse, FeedbackCreate
|
|
from app.database import get_supabase
|
|
from app.dependencies import get_current_user, get_optional_user
|
|
from app.services.rag import rag_engine
|
|
from app.config import PLAN_LIMITS
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(tags=["Chat"])
|
|
|
|
# ── Simple in-memory rate limiter ────────────────────────────────────────────
|
|
_rate_store: dict = defaultdict(list)
|
|
_RATE_LIMIT = 30 # max requests
|
|
_RATE_WINDOW = 60 # per second window
|
|
|
|
|
|
def _check_rate_limit(client_ip: str):
|
|
now = time.time()
|
|
_rate_store[client_ip] = [t for t in _rate_store[client_ip] if now - t < _RATE_WINDOW]
|
|
if len(_rate_store[client_ip]) >= _RATE_LIMIT:
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="Too many requests. Please wait before sending more messages.",
|
|
)
|
|
_rate_store[client_ip].append(now)
|
|
|
|
|
|
def _check_conversation_limit(chatbot: dict, supabase):
|
|
"""Check if the chatbot owner has remaining conversation quota this month."""
|
|
company_id = chatbot.get("company_id")
|
|
if not company_id:
|
|
return
|
|
|
|
company = supabase.table("companies").select("owner_id").eq("id", company_id).execute()
|
|
if not company.data:
|
|
return
|
|
|
|
owner_id = company.data[0]["owner_id"]
|
|
sub = supabase.table("subscriptions").select("plan").eq("user_id", owner_id).eq("status", "active").execute()
|
|
plan = sub.data[0]["plan"] if sub.data else "free"
|
|
limit = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("conversations_limit", 100)
|
|
|
|
if limit >= 999999:
|
|
return # unlimited
|
|
|
|
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat()
|
|
chatbots = supabase.table("chatbots").select("id").eq("company_id", company_id).execute()
|
|
chatbot_ids = [c["id"] for c in (chatbots.data or [])]
|
|
if not chatbot_ids:
|
|
return
|
|
|
|
count_result = supabase.table("conversations").select("id", count="exact") \
|
|
.in_("chatbot_id", chatbot_ids) \
|
|
.gte("created_at", month_start) \
|
|
.execute()
|
|
|
|
used = count_result.count or 0
|
|
if used >= limit:
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail=f"This chatbot's monthly conversation limit ({limit}) has been reached. Please try again next month.",
|
|
)
|
|
|
|
|
|
def _get_public_chatbot(chatbot_id: str, supabase) -> dict:
|
|
"""Get a published chatbot (or any chatbot for preview)"""
|
|
result = supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute()
|
|
if not result.data:
|
|
raise HTTPException(status_code=404, detail="Chatbot not found")
|
|
return result.data[0]
|
|
|
|
|
|
@router.post("/chat/{chatbot_id}", response_model=ChatResponse)
|
|
async def chat(
|
|
chatbot_id: str,
|
|
message: ChatMessage,
|
|
request: Request,
|
|
user=Depends(get_optional_user),
|
|
):
|
|
# Rate limiting by IP
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
_check_rate_limit(client_ip)
|
|
|
|
supabase = get_supabase()
|
|
chatbot = _get_public_chatbot(chatbot_id, supabase)
|
|
|
|
# Allow preview access for owner, require published for public
|
|
if not chatbot.get("is_published"):
|
|
if not user:
|
|
raise HTTPException(status_code=403, detail="This chatbot is in preview mode")
|
|
# Check ownership
|
|
company = supabase.table("companies").select("id").eq("owner_id", user.id).execute()
|
|
if not company.data or company.data[0]["id"] != chatbot.get("company_id"):
|
|
raise HTTPException(status_code=403, detail="This chatbot is in preview mode")
|
|
|
|
collection_name = chatbot.get("qdrant_collection_name")
|
|
if not collection_name:
|
|
raise HTTPException(status_code=400, detail="Chatbot has no knowledge base configured")
|
|
|
|
# Get or create conversation
|
|
session_id = message.session_id or str(uuid.uuid4())
|
|
|
|
# Only enforce conversation limit for new sessions (not ongoing ones)
|
|
is_existing = supabase.table("conversations").select("id") \
|
|
.eq("chatbot_id", chatbot_id).eq("session_id", session_id).execute()
|
|
if not is_existing.data:
|
|
_check_conversation_limit(chatbot, supabase)
|
|
|
|
conversation = _get_or_create_conversation(
|
|
chatbot_id=chatbot_id,
|
|
session_id=session_id,
|
|
user_id=user.id if user else None,
|
|
language=message.language,
|
|
supabase=supabase,
|
|
)
|
|
|
|
# Get conversation history
|
|
history = _get_conversation_history(conversation["id"], supabase)
|
|
|
|
# If an agent has taken over this conversation, stop the bot from responding
|
|
conv_status = conversation.get("status", "open")
|
|
if conv_status == "agent_handling":
|
|
return ChatResponse(
|
|
response="",
|
|
session_id=session_id,
|
|
sources=[],
|
|
model_used="",
|
|
tokens_used=0,
|
|
needs_lead_capture=False,
|
|
handoff=False,
|
|
)
|
|
|
|
# Get company info for context
|
|
company_data = chatbot.get("companies", {}) or {}
|
|
chatbot_config = {
|
|
**chatbot,
|
|
"company_name": company_data.get("name", ""),
|
|
}
|
|
|
|
# If booking is enabled, inject a note into the system prompt so the bot
|
|
# can guide users to the booking page
|
|
if chatbot.get("booking_enabled"):
|
|
from app.config import settings as _cfg
|
|
booking_url = f"{_cfg.app_url}/book/{chatbot_id}"
|
|
booking_note = (
|
|
f"\n\nAppointment booking: This business accepts appointments online. "
|
|
f"If the user wants to book an appointment, meeting, or consultation, "
|
|
f"provide them this booking link: {booking_url}"
|
|
)
|
|
existing_prompt = chatbot_config.get("system_prompt") or ""
|
|
chatbot_config["system_prompt"] = existing_prompt + booking_note
|
|
|
|
# Run RAG
|
|
result = await rag_engine.process_query(
|
|
query=message.message,
|
|
collection_name=collection_name,
|
|
chatbot_config=chatbot_config,
|
|
conversation_history=history,
|
|
language=message.language,
|
|
)
|
|
|
|
# Compute confidence score
|
|
confidence_score = max((s.score for s in result.get("sources", [])), default=0.0)
|
|
|
|
# Check handoff
|
|
is_handoff = False
|
|
if chatbot.get("handoff_enabled"):
|
|
handoff_keywords = chatbot.get("handoff_keywords", [])
|
|
msg_lower = message.message.lower()
|
|
if any(kw.lower() in msg_lower for kw in handoff_keywords):
|
|
is_handoff = True
|
|
# Fire n8n notification (async, non-blocking)
|
|
try:
|
|
from app.services.n8n_service import send_handoff_notification
|
|
from app.config import settings as _settings
|
|
company_data_for_handoff = chatbot.get("companies", {}) or {}
|
|
await send_handoff_notification(
|
|
chatbot_name=chatbot.get("name", ""),
|
|
owner_email=chatbot.get("handoff_email") or "",
|
|
conversation_history=history,
|
|
trigger_message=message.message,
|
|
chatbot_id=chatbot_id,
|
|
conversation_id=conversation["id"],
|
|
webhook_url=_settings.n8n_handoff_webhook_url,
|
|
)
|
|
except Exception:
|
|
pass # never block chat on handoff failure
|
|
|
|
# Check lead capture
|
|
needs_lead_capture = False
|
|
if (
|
|
chatbot.get("lead_capture_enabled")
|
|
and chatbot.get("lead_capture_trigger") == "after_first_message"
|
|
and len(history) == 0
|
|
):
|
|
needs_lead_capture = True
|
|
|
|
# Save messages
|
|
_save_message(conversation["id"], "user", message.message, supabase)
|
|
_save_message(
|
|
conversation["id"],
|
|
"assistant",
|
|
result["response"],
|
|
supabase,
|
|
sources=[s.model_dump() for s in result.get("sources", [])],
|
|
model=result.get("model", ""),
|
|
confidence_score=confidence_score,
|
|
is_handoff=is_handoff,
|
|
)
|
|
|
|
# Update conversation message count
|
|
supabase.table("conversations").update({
|
|
"message_count": len(history) + 2
|
|
}).eq("id", conversation["id"]).execute()
|
|
|
|
return ChatResponse(
|
|
response=result["response"],
|
|
session_id=session_id,
|
|
sources=result.get("sources", []),
|
|
model_used=result.get("model", ""),
|
|
tokens_used=result.get("tokens_used", 0),
|
|
needs_lead_capture=needs_lead_capture,
|
|
handoff=is_handoff,
|
|
)
|
|
|
|
|
|
@router.get("/chat/{chatbot_id}/history/{session_id}", response_model=List[MessageResponse])
|
|
async def get_chat_history(
|
|
chatbot_id: str,
|
|
session_id: str,
|
|
user=Depends(get_optional_user),
|
|
):
|
|
supabase = get_supabase()
|
|
|
|
conversation = supabase.table("conversations").select("*") \
|
|
.eq("chatbot_id", chatbot_id) \
|
|
.eq("session_id", session_id) \
|
|
.execute()
|
|
|
|
if not conversation.data:
|
|
return []
|
|
|
|
conv_id = conversation.data[0]["id"]
|
|
messages = supabase.table("messages").select("*") \
|
|
.eq("conversation_id", conv_id) \
|
|
.order("created_at", desc=False) \
|
|
.execute()
|
|
|
|
return [
|
|
MessageResponse(
|
|
id=m["id"],
|
|
role=m["role"],
|
|
content=m["content"],
|
|
sources=m.get("sources"),
|
|
created_at=m.get("created_at"),
|
|
)
|
|
for m in (messages.data or [])
|
|
]
|
|
|
|
|
|
@router.post("/chat/{chatbot_id}/feedback")
|
|
async def submit_feedback(chatbot_id: str, data: FeedbackCreate):
|
|
"""Submit feedback (thumbs up/down) for a message. No auth required."""
|
|
import uuid as _uuid
|
|
if data.feedback not in ("positive", "negative"):
|
|
raise HTTPException(status_code=400, detail="Feedback must be 'positive' or 'negative'")
|
|
|
|
supabase = get_supabase()
|
|
|
|
# Verify message exists
|
|
msg = supabase.table("messages").select("id, conversation_id").eq("id", data.message_id).execute()
|
|
if not msg.data:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
supabase.table("message_feedback").insert({
|
|
"id": str(_uuid.uuid4()),
|
|
"message_id": data.message_id,
|
|
"chatbot_id": chatbot_id,
|
|
"feedback": data.feedback,
|
|
}).execute()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
# ── OLD analytics endpoint REMOVED ───────────────────────────────────────────
|
|
# The /analytics/{chatbot_id} endpoint that was here has been replaced by
|
|
# the dedicated analytics router (app/routers/analytics.py) which provides:
|
|
# GET /api/v1/analytics/overview — All chatbots overview
|
|
# GET /api/v1/analytics/chatbot/{id} — Single chatbot detail
|
|
# The old endpoint conflicted with the new router because FastAPI matched
|
|
# "overview" as a chatbot_id UUID, causing a 500 error.
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _get_or_create_conversation(
|
|
chatbot_id: str,
|
|
session_id: str,
|
|
user_id: Optional[str],
|
|
language: str,
|
|
supabase,
|
|
) -> dict:
|
|
existing = supabase.table("conversations").select("*") \
|
|
.eq("chatbot_id", chatbot_id) \
|
|
.eq("session_id", session_id) \
|
|
.execute()
|
|
|
|
if existing.data:
|
|
return existing.data[0]
|
|
|
|
new_conv = {
|
|
"id": str(uuid.uuid4()),
|
|
"chatbot_id": chatbot_id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
"language": language,
|
|
"message_count": 0,
|
|
}
|
|
result = supabase.table("conversations").insert(new_conv).execute()
|
|
return result.data[0]
|
|
|
|
|
|
def _get_conversation_history(conversation_id: str, supabase) -> List[dict]:
|
|
"""
|
|
Returns conversation history in chronological order (oldest first)
|
|
for the LLM to correctly understand the conversation flow.
|
|
"""
|
|
messages = supabase.table("messages").select("role, content") \
|
|
.eq("conversation_id", conversation_id) \
|
|
.order("created_at", desc=False) \
|
|
.limit(20) \
|
|
.execute()
|
|
return messages.data or []
|
|
|
|
|
|
def _save_message(
|
|
conversation_id: str,
|
|
role: str,
|
|
content: str,
|
|
supabase,
|
|
sources: Optional[list] = None,
|
|
model: str = "",
|
|
confidence_score: Optional[float] = None,
|
|
is_handoff: bool = False,
|
|
):
|
|
supabase.table("messages").insert({
|
|
"id": str(uuid.uuid4()),
|
|
"conversation_id": conversation_id,
|
|
"role": role,
|
|
"content": content,
|
|
"sources": sources,
|
|
"model": model,
|
|
"confidence_score": confidence_score,
|
|
"is_handoff": is_handoff,
|
|
}).execute() |