mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-12 23:23:21 +00:00
- Add new routers: admin, appointments, campaigns - Add storage service and logging config - Add migrations directory and test suite with pytest config - Add supabase_migration_features.sql - Update models, dependencies, config, and existing routers - Remove whatsapp_service (deleted) - Update pyproject.toml and uv.lock dependencies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
503 lines
20 KiB
Python
503 lines
20 KiB
Python
"""
|
|
Analytics router - provides chatbot performance data for Starter+ users.
|
|
|
|
Available to: Starter, Pro, Enterprise plans only.
|
|
No LLM cost data is exposed to users.
|
|
"""
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from app.database import get_supabase
|
|
from app.dependencies import get_current_user
|
|
from app.config import PLAN_LIMITS
|
|
from typing import List, Optional, Dict
|
|
from collections import defaultdict
|
|
from pydantic import BaseModel
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/analytics", tags=["Analytics"])
|
|
|
|
|
|
# ─── Response Models ───────────────────────────────────────────────────────────
|
|
|
|
class DailyConversations(BaseModel):
|
|
date: str
|
|
count: int
|
|
|
|
|
|
class TopQuery(BaseModel):
|
|
query: str
|
|
count: int
|
|
|
|
|
|
class ChatbotAnalyticsResponse(BaseModel):
|
|
chatbot_id: str
|
|
chatbot_name: str
|
|
total_conversations: int
|
|
unique_sessions: int
|
|
total_messages: int
|
|
average_messages_per_conversation: float
|
|
average_rating: Optional[float]
|
|
total_ratings: int
|
|
conversations_today: int
|
|
conversations_this_week: int
|
|
conversations_this_month: int
|
|
daily_conversations: List[DailyConversations]
|
|
top_queries: List[TopQuery]
|
|
languages_used: Dict[str, int]
|
|
peak_hour: Optional[int] # 0-23
|
|
unanswered_count: int = 0
|
|
unanswered_queries: List[TopQuery] = []
|
|
feedback_positive: int = 0
|
|
feedback_negative: int = 0
|
|
|
|
|
|
class OverviewAnalyticsResponse(BaseModel):
|
|
total_chatbots: int
|
|
published_chatbots: int
|
|
total_conversations: int
|
|
total_messages: int
|
|
unique_sessions: int
|
|
conversations_this_month: int
|
|
average_rating: Optional[float]
|
|
chatbots: List[ChatbotAnalyticsResponse]
|
|
plan: str
|
|
conversations_limit: int
|
|
conversations_used: int
|
|
|
|
|
|
# ─── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _get_user_plan(user_id: str) -> str:
|
|
supabase = get_supabase()
|
|
result = supabase.table("subscriptions") \
|
|
.select("plan") \
|
|
.eq("user_id", user_id) \
|
|
.eq("status", "active") \
|
|
.execute()
|
|
return result.data[0]["plan"] if result.data else "free"
|
|
|
|
|
|
def _check_analytics_access(plan: str):
|
|
"""Ensure user has analytics access (Starter+)."""
|
|
plan_config = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"])
|
|
if not plan_config.get("analytics", False):
|
|
raise HTTPException(
|
|
status_code=402,
|
|
detail="Analytics is available on Starter and Pro plans. Upgrade to access your chatbot analytics."
|
|
)
|
|
|
|
|
|
# ─── Endpoints ─────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/overview", response_model=OverviewAnalyticsResponse)
|
|
async def get_analytics_overview(user=Depends(get_current_user)):
|
|
"""
|
|
Get analytics overview across all chatbots for the current user.
|
|
Requires Starter+ plan.
|
|
"""
|
|
plan = _get_user_plan(user.id)
|
|
_check_analytics_access(plan)
|
|
|
|
supabase = get_supabase()
|
|
|
|
# Get user's company
|
|
company = supabase.table("companies").select("id").eq("owner_id", user.id).execute()
|
|
if not company.data:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
company_id = company.data[0]["id"]
|
|
|
|
# Get all chatbots
|
|
chatbots = supabase.table("chatbots").select("*").eq("company_id", company_id).execute()
|
|
chatbot_list = chatbots.data or []
|
|
chatbot_ids = [c["id"] for c in chatbot_list]
|
|
|
|
if not chatbot_ids:
|
|
plan_config = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"])
|
|
return OverviewAnalyticsResponse(
|
|
total_chatbots=0,
|
|
published_chatbots=0,
|
|
total_conversations=0,
|
|
total_messages=0,
|
|
unique_sessions=0,
|
|
conversations_this_month=0,
|
|
average_rating=None,
|
|
chatbots=[],
|
|
plan=plan,
|
|
conversations_limit=plan_config.get("conversations_limit", 0),
|
|
conversations_used=0,
|
|
)
|
|
|
|
# ── Batch queries (fixes N+1) ────────────────────────────────────────────────
|
|
now = datetime.utcnow()
|
|
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
week_start = now - timedelta(days=now.weekday())
|
|
week_start = week_start.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
thirty_days_ago = now - timedelta(days=30)
|
|
|
|
# Batch query 1: ALL conversations for all chatbots (single query)
|
|
all_convos_resp = supabase.table("conversations") \
|
|
.select("id, chatbot_id, session_id, language, created_at") \
|
|
.in_("chatbot_id", chatbot_ids) \
|
|
.execute()
|
|
all_convos = all_convos_resp.data or []
|
|
all_conv_ids = [c["id"] for c in all_convos]
|
|
|
|
# Batch query 2: ALL messages for all conversations (single query)
|
|
all_msgs: List[Dict] = []
|
|
if all_conv_ids:
|
|
# Split into chunks of 500 to avoid URL length limits
|
|
for i in range(0, len(all_conv_ids), 500):
|
|
chunk = all_conv_ids[i:i + 500]
|
|
msgs_resp = supabase.table("messages") \
|
|
.select("id, conversation_id, role, content, created_at") \
|
|
.in_("conversation_id", chunk) \
|
|
.execute()
|
|
all_msgs.extend(msgs_resp.data or [])
|
|
|
|
# Batch query 3: ALL feedback for all chatbots (single query)
|
|
all_feedback: List[Dict] = []
|
|
if chatbot_ids:
|
|
fb_resp = supabase.table("message_feedback") \
|
|
.select("chatbot_id, feedback") \
|
|
.in_("chatbot_id", chatbot_ids) \
|
|
.execute()
|
|
all_feedback = fb_resp.data or []
|
|
|
|
# Index data by chatbot_id for O(1) lookups
|
|
convos_by_chatbot: Dict[str, List[Dict]] = defaultdict(list)
|
|
for c in all_convos:
|
|
convos_by_chatbot[c["chatbot_id"]].append(c)
|
|
|
|
msgs_by_conv: Dict[str, List[Dict]] = defaultdict(list)
|
|
for m in all_msgs:
|
|
msgs_by_conv[m["conversation_id"]].append(m)
|
|
|
|
fb_by_chatbot: Dict[str, List[Dict]] = defaultdict(list)
|
|
for f in all_feedback:
|
|
fb_by_chatbot[f["chatbot_id"]].append(f)
|
|
|
|
# ── Aggregate per chatbot ────────────────────────────────────────────────────
|
|
chatbot_analytics = []
|
|
total_convos = 0
|
|
total_msgs = 0
|
|
total_sessions = 0
|
|
month_convos = 0
|
|
all_ratings = []
|
|
|
|
for chatbot in chatbot_list:
|
|
cid = chatbot["id"]
|
|
conv_data = convos_by_chatbot[cid]
|
|
conv_count = len(conv_data)
|
|
total_convos += conv_count
|
|
|
|
# Unique sessions
|
|
sessions = set(c.get("session_id") for c in conv_data if c.get("session_id"))
|
|
unique_sess = len(sessions)
|
|
total_sessions += unique_sess
|
|
|
|
# Messages for this chatbot
|
|
chatbot_msgs = []
|
|
for c in conv_data:
|
|
chatbot_msgs.extend(msgs_by_conv[c["id"]])
|
|
msg_count = len(chatbot_msgs)
|
|
total_msgs += msg_count
|
|
|
|
# Time-based conversation counts
|
|
today_str = today_start.strftime("%Y-%m-%d")
|
|
today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_str)
|
|
week_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= week_start.isoformat())
|
|
month_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= month_start.isoformat())
|
|
month_convos += month_count
|
|
|
|
# Daily conversations (last 30 days)
|
|
daily: Dict[str, int] = {}
|
|
for c in conv_data:
|
|
if c.get("created_at") and c["created_at"] >= thirty_days_ago.isoformat():
|
|
day = c["created_at"][:10]
|
|
daily[day] = daily.get(day, 0) + 1
|
|
daily_list = [DailyConversations(date=d, count=n) for d, n in sorted(daily.items())]
|
|
|
|
# Languages
|
|
lang_counts: Dict[str, int] = {}
|
|
for c in conv_data:
|
|
lang = c.get("language", "en")
|
|
lang_counts[lang] = lang_counts.get(lang, 0) + 1
|
|
|
|
# Peak hour
|
|
hour_counts: Dict[int, int] = {}
|
|
for c in conv_data:
|
|
if c.get("created_at") and len(c["created_at"]) > 13:
|
|
try:
|
|
hour = int(c["created_at"][11:13])
|
|
hour_counts[hour] = hour_counts.get(hour, 0) + 1
|
|
except (ValueError, IndexError):
|
|
pass
|
|
peak = max(hour_counts, key=hour_counts.get) if hour_counts else None
|
|
|
|
# Top queries from user messages
|
|
query_counts: Dict[str, int] = {}
|
|
for m in chatbot_msgs:
|
|
if m.get("role") == "user":
|
|
content = (m.get("content") or "")[:100].strip()
|
|
if content:
|
|
query_counts[content] = query_counts.get(content, 0) + 1
|
|
top_queries = [TopQuery(query=q, count=n) for q, n in sorted(query_counts.items(), key=lambda x: -x[1])[:5]]
|
|
|
|
# Rating
|
|
rating = chatbot.get("average_rating")
|
|
if rating:
|
|
all_ratings.append(rating)
|
|
|
|
# Feedback
|
|
chatbot_fb = fb_by_chatbot[cid]
|
|
fb_pos = sum(1 for f in chatbot_fb if f.get("feedback") == "positive")
|
|
fb_neg = len(chatbot_fb) - fb_pos
|
|
|
|
avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0
|
|
|
|
chatbot_analytics.append(ChatbotAnalyticsResponse(
|
|
chatbot_id=cid,
|
|
chatbot_name=chatbot.get("name", "Untitled"),
|
|
total_conversations=conv_count,
|
|
unique_sessions=unique_sess,
|
|
total_messages=msg_count,
|
|
average_messages_per_conversation=avg_msgs,
|
|
average_rating=rating,
|
|
total_ratings=len(chatbot_fb),
|
|
conversations_today=today_count,
|
|
conversations_this_week=week_count,
|
|
conversations_this_month=month_count,
|
|
daily_conversations=daily_list,
|
|
top_queries=top_queries,
|
|
languages_used=lang_counts,
|
|
peak_hour=peak,
|
|
feedback_positive=fb_pos,
|
|
feedback_negative=fb_neg,
|
|
))
|
|
|
|
# Overall average rating
|
|
avg_rating = round(sum(all_ratings) / len(all_ratings), 1) if all_ratings else None
|
|
|
|
plan_config = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"])
|
|
|
|
return OverviewAnalyticsResponse(
|
|
total_chatbots=len(chatbot_list),
|
|
published_chatbots=sum(1 for c in chatbot_list if c.get("is_published")),
|
|
total_conversations=total_convos,
|
|
total_messages=total_msgs,
|
|
unique_sessions=total_sessions,
|
|
conversations_this_month=month_convos,
|
|
average_rating=avg_rating,
|
|
chatbots=chatbot_analytics,
|
|
plan=plan,
|
|
conversations_limit=plan_config.get("conversations_limit", 0),
|
|
conversations_used=month_convos,
|
|
)
|
|
|
|
|
|
@router.get("/chatbot/{chatbot_id}", response_model=ChatbotAnalyticsResponse)
|
|
async def get_chatbot_analytics(chatbot_id: str, user=Depends(get_current_user)):
|
|
"""
|
|
Get detailed analytics for a specific chatbot.
|
|
Requires Starter+ plan and ownership of the chatbot.
|
|
"""
|
|
plan = _get_user_plan(user.id)
|
|
_check_analytics_access(plan)
|
|
|
|
supabase = get_supabase()
|
|
|
|
# Verify ownership
|
|
company = supabase.table("companies").select("id").eq("owner_id", user.id).execute()
|
|
if not company.data:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
|
|
chatbot = supabase.table("chatbots").select("*") \
|
|
.eq("id", chatbot_id) \
|
|
.eq("company_id", company.data[0]["id"]).execute()
|
|
|
|
if not chatbot.data:
|
|
raise HTTPException(status_code=404, detail="Chatbot not found")
|
|
|
|
cb = chatbot.data[0]
|
|
now = datetime.utcnow()
|
|
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
week_start = now - timedelta(days=now.weekday())
|
|
week_start = week_start.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
thirty_days_ago = now - timedelta(days=30)
|
|
|
|
# Conversations
|
|
convos = supabase.table("conversations").select("id, session_id, language, created_at", count="exact") \
|
|
.eq("chatbot_id", chatbot_id).execute()
|
|
conv_count = convos.count or 0
|
|
conv_data = convos.data or []
|
|
|
|
sessions = set(c.get("session_id") for c in conv_data if c.get("session_id"))
|
|
|
|
# Messages
|
|
conv_ids = [c["id"] for c in conv_data] if conv_data else [""]
|
|
msgs = supabase.table("messages").select("id", count="exact") \
|
|
.in_("conversation_id", conv_ids).execute()
|
|
msg_count = msgs.count or 0
|
|
|
|
today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_start.strftime("%Y-%m-%d"))
|
|
week_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= week_start.isoformat())
|
|
month_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= month_start.isoformat())
|
|
|
|
# Daily
|
|
daily = {}
|
|
for c in conv_data:
|
|
if c.get("created_at") and c["created_at"] >= thirty_days_ago.isoformat():
|
|
day = c["created_at"][:10]
|
|
daily[day] = daily.get(day, 0) + 1
|
|
daily_list = [DailyConversations(date=d, count=n) for d, n in sorted(daily.items())]
|
|
|
|
# Languages
|
|
lang_counts: Dict[str, int] = {}
|
|
for c in conv_data:
|
|
lang = c.get("language", "en")
|
|
lang_counts[lang] = lang_counts.get(lang, 0) + 1
|
|
|
|
# Peak hour
|
|
hour_counts: Dict[int, int] = {}
|
|
for c in conv_data:
|
|
if c.get("created_at") and len(c["created_at"]) > 13:
|
|
try:
|
|
hour = int(c["created_at"][11:13])
|
|
hour_counts[hour] = hour_counts.get(hour, 0) + 1
|
|
except (ValueError, IndexError):
|
|
pass
|
|
peak = max(hour_counts, key=hour_counts.get) if hour_counts else None
|
|
|
|
# Top queries
|
|
top_queries: List[TopQuery] = []
|
|
if conv_data:
|
|
recent_ids = [c["id"] for c in conv_data[:100]]
|
|
user_msgs = supabase.table("messages").select("content") \
|
|
.in_("conversation_id", recent_ids) \
|
|
.eq("role", "user") \
|
|
.limit(200).execute()
|
|
query_counts: Dict[str, int] = {}
|
|
for m in (user_msgs.data or []):
|
|
content = (m.get("content") or "")[:100].strip()
|
|
if content:
|
|
query_counts[content] = query_counts.get(content, 0) + 1
|
|
top_sorted = sorted(query_counts.items(), key=lambda x: -x[1])[:10]
|
|
top_queries = [TopQuery(query=q, count=n) for q, n in top_sorted]
|
|
|
|
avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0
|
|
|
|
# Feedback counts
|
|
fb_pos = 0
|
|
fb_neg = 0
|
|
if conv_ids and conv_ids != [""]:
|
|
feedback = supabase.table("message_feedback").select("feedback") \
|
|
.eq("chatbot_id", chatbot_id).execute()
|
|
for f in (feedback.data or []):
|
|
if f["feedback"] == "positive":
|
|
fb_pos += 1
|
|
else:
|
|
fb_neg += 1
|
|
|
|
# Unanswered queries (low confidence)
|
|
unanswered_queries: List[TopQuery] = []
|
|
unanswered_count = 0
|
|
if conv_ids and conv_ids != [""]:
|
|
try:
|
|
low_conf_msgs = supabase.table("messages").select("id, conversation_id, confidence_score") \
|
|
.in_("conversation_id", conv_ids[:100]) \
|
|
.eq("role", "assistant") \
|
|
.lt("confidence_score", 0.2) \
|
|
.limit(200).execute()
|
|
unanswered_count = len(low_conf_msgs.data or [])
|
|
# For each low-confidence assistant message, find the preceding user message
|
|
if low_conf_msgs.data:
|
|
unanswered_q_counts: Dict[str, int] = {}
|
|
for lm in low_conf_msgs.data[:20]: # limit work
|
|
prev_user = supabase.table("messages").select("content") \
|
|
.eq("conversation_id", lm["conversation_id"]) \
|
|
.eq("role", "user") \
|
|
.lt("created_at", lm.get("created_at", "9999")) \
|
|
.order("created_at", desc=True) \
|
|
.limit(1).execute()
|
|
if prev_user.data:
|
|
q = (prev_user.data[0].get("content") or "")[:100].strip()
|
|
if q:
|
|
unanswered_q_counts[q] = unanswered_q_counts.get(q, 0) + 1
|
|
top_unanswered = sorted(unanswered_q_counts.items(), key=lambda x: -x[1])[:5]
|
|
unanswered_queries = [TopQuery(query=q, count=n) for q, n in top_unanswered]
|
|
except Exception:
|
|
pass # unanswered queries is optional
|
|
|
|
return ChatbotAnalyticsResponse(
|
|
chatbot_id=chatbot_id,
|
|
chatbot_name=cb.get("name", "Untitled"),
|
|
total_conversations=conv_count,
|
|
unique_sessions=len(sessions),
|
|
total_messages=msg_count,
|
|
average_messages_per_conversation=avg_msgs,
|
|
average_rating=cb.get("average_rating"),
|
|
total_ratings=fb_pos + fb_neg,
|
|
conversations_today=today_count,
|
|
conversations_this_week=week_count,
|
|
conversations_this_month=month_count,
|
|
daily_conversations=daily_list,
|
|
top_queries=top_queries,
|
|
languages_used=lang_counts,
|
|
peak_hour=peak,
|
|
unanswered_count=unanswered_count,
|
|
unanswered_queries=unanswered_queries,
|
|
feedback_positive=fb_pos,
|
|
feedback_negative=fb_neg,
|
|
)
|
|
|
|
|
|
@router.get("/chatbot/{chatbot_id}/gaps", response_model=List[TopQuery])
|
|
async def get_knowledge_gaps(chatbot_id: str, user=Depends(get_current_user)):
|
|
"""Returns top queries where the bot had low confidence (knowledge gaps). Starter+ only."""
|
|
plan = _get_user_plan(user.id)
|
|
_check_analytics_access(plan)
|
|
|
|
supabase = get_supabase()
|
|
company = supabase.table("companies").select("id").eq("owner_id", user.id).execute()
|
|
if not company.data:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
|
|
chatbot = supabase.table("chatbots").select("id") \
|
|
.eq("id", chatbot_id).eq("company_id", company.data[0]["id"]).execute()
|
|
if not chatbot.data:
|
|
raise HTTPException(status_code=404, detail="Chatbot not found")
|
|
|
|
# Find conversations
|
|
convs = supabase.table("conversations").select("id").eq("chatbot_id", chatbot_id).execute()
|
|
conv_ids = [c["id"] for c in (convs.data or [])]
|
|
if not conv_ids:
|
|
return []
|
|
|
|
# Low confidence assistant messages
|
|
low_conf = supabase.table("messages").select("id, conversation_id, created_at") \
|
|
.in_("conversation_id", conv_ids[:100]) \
|
|
.eq("role", "assistant") \
|
|
.lt("confidence_score", 0.2) \
|
|
.limit(100).execute()
|
|
|
|
if not low_conf.data:
|
|
return []
|
|
|
|
q_counts: Dict[str, int] = {}
|
|
for msg in low_conf.data[:30]:
|
|
prev = supabase.table("messages").select("content") \
|
|
.eq("conversation_id", msg["conversation_id"]) \
|
|
.eq("role", "user") \
|
|
.order("created_at", desc=True) \
|
|
.limit(1).execute()
|
|
if prev.data:
|
|
content = (prev.data[0].get("content") or "")[:100].strip()
|
|
if content:
|
|
q_counts[content] = q_counts.get(content, 0) + 1
|
|
|
|
sorted_gaps = sorted(q_counts.items(), key=lambda x: -x[1])[:10]
|
|
return [TopQuery(query=q, count=n) for q, n in sorted_gaps]
|