diff --git a/app/config.py b/app/config.py
index ebdf32b..1867743 100644
--- a/app/config.py
+++ b/app/config.py
@@ -55,13 +55,6 @@ class Settings(BaseSettings):
# Supabase Storage
supabase_storage_url: str = ""
- # WhatsApp Cloud API
- whatsapp_access_token: str = ""
- whatsapp_phone_number_id: str = ""
- whatsapp_verify_token: str = "contexta_whatsapp_verify"
- whatsapp_app_secret: str = ""
- whatsapp_display_number: str = "" # E.164 without '+', e.g. "15551234567"
-
@property
def allowed_origins_list(self) -> List[str]:
return [o.strip() for o in self.allowed_origins.split(",")]
@@ -188,7 +181,7 @@ DEFAULT_MODELS = {
# ═══════════════════════════════════════════════════════════════════════════════
-# PLAN LIMITS — Pricing: Starter $12/mo, Business $29/mo, Agency $79/mo
+# PLAN LIMITS — Pricing: Starter $19/mo, Business $49/mo, Agency $99/mo
# ═══════════════════════════════════════════════════════════════════════════════
#
# Cost analysis (per 1M tokens approx):
@@ -207,9 +200,9 @@ DEFAULT_MODELS = {
# Fireworks models: ~$0.001-$0.004 per conversation
# GPT-4o: ~$0.015 per conversation
#
-# Starter $12/mo, 1500 convos: max cost ~$6/mo (fireworks mix) → margin OK
-# Business $29/mo, 5000 convos: max cost ~$15/mo (mixed models) → margin OK
-# Agency $79/mo, 20000 convos: max cost ~$30/mo (fireworks) → healthy margin
+# Starter $19/mo, 1500 convos: max cost ~$6/mo (fireworks mix) → margin OK
+# Business $49/mo, 5000 convos: max cost ~$15/mo (mixed models) → margin OK
+# Agency $99/mo, 20000 convos: max cost ~$30/mo (fireworks) → healthy margin
# ═══════════════════════════════════════════════════════════════════════════════
_ALL_FIREWORKS = [
@@ -229,52 +222,76 @@ PLAN_LIMITS = {
# Build, test, and go live with one chatbot — no card needed.
"free": {
"max_chatbots": 999999,
- "max_published": 1, # can publish 1 chatbot
+ "max_published": 1, # can publish 1 chatbot
"max_documents_per_chatbot": 3,
"max_document_size_mb": 5,
"models": ["accounts/fireworks/models/llama-v3p3-70b-instruct"],
- "conversations_limit": 100, # 100 real conversations/month
+ "conversations_limit": 100, # 100 real conversations/month
"code_export": False,
"analytics": False,
- "channels": [], # no messaging channels
+ "gap_suggestions": False,
+ "channels": [], # no messaging channels
"url_sources": 0,
"leads_per_month": 0,
- "show_branding": True, # cannot remove badge
+ "inbox_replies": False, # read-only inbox
+ "leads_editing": False, # view-only leads
+ "show_branding": True, # cannot remove badge
+ "appointments": False,
+ "appointments_chatbots": 0,
+ "campaigns": False,
+ "campaigns_per_month": 0,
+ "max_campaign_recipients": 0,
},
- # ── Starter $12/mo ───────────────────────────────────────────────────────
- # For individuals and solo businesses going live.
+ # ── Starter $19/mo ───────────────────────────────────────────────────────
+ # For solo operators: live chat, leads, booking, and campaigns.
"starter": {
"max_chatbots": 999999,
- "max_published": 1,
+ "max_published": 3,
"max_documents_per_chatbot": 10,
"max_document_size_mb": 10,
"models": _ALL_FIREWORKS,
"conversations_limit": 1500,
"code_export": False,
"analytics": True,
+ "gap_suggestions": False,
"channels": ["telegram"],
"url_sources": 5,
"leads_per_month": 500,
- "show_branding": True, # badge stays
+ "inbox_replies": True,
+ "leads_editing": True,
+ "show_branding": True, # badge stays on Starter
+ "appointments": True,
+ "appointments_chatbots": 1, # booking on 1 chatbot
+ "campaigns": True,
+ "campaigns_per_month": 3,
+ "max_campaign_recipients": 500,
},
- # ── Business $29/mo ──────────────────────────────────────────────────────
- # For growing businesses that need more chatbots and WhatsApp reach.
+ # ── Business $49/mo ──────────────────────────────────────────────────────
+ # For growing businesses: premium AI, unlimited booking, full analytics.
"business": {
"max_chatbots": 999999,
- "max_published": 3,
+ "max_published": 10,
"max_documents_per_chatbot": 50,
"max_document_size_mb": 50,
"models": _ALL_FIREWORKS + _ALL_PREMIUM,
"conversations_limit": 5000,
"code_export": False,
"analytics": True,
- "channels": ["telegram", "whatsapp"],
+ "gap_suggestions": True,
+ "channels": ["telegram"],
"url_sources": 999999,
"leads_per_month": 999999,
- "show_branding": False, # can remove badge
+ "inbox_replies": True,
+ "leads_editing": True,
+ "show_branding": False, # can remove badge
+ "appointments": True,
+ "appointments_chatbots": 999999,
+ "campaigns": True,
+ "campaigns_per_month": 999999,
+ "max_campaign_recipients": 5000,
},
- # ── Agency $79/mo ────────────────────────────────────────────────────────
- # For agencies and large businesses managing many chatbots.
+ # ── Agency $99/mo ────────────────────────────────────────────────────────
+ # For agencies: unlimited everything, unlimited campaign recipients.
"agency": {
"max_chatbots": 999999,
"max_published": 999999,
@@ -284,10 +301,18 @@ PLAN_LIMITS = {
"conversations_limit": 20000,
"code_export": True,
"analytics": True,
- "channels": ["telegram", "whatsapp"],
+ "gap_suggestions": True,
+ "channels": ["telegram"],
"url_sources": 999999,
"leads_per_month": 999999,
+ "inbox_replies": True,
+ "leads_editing": True,
"show_branding": False,
+ "appointments": True,
+ "appointments_chatbots": 999999,
+ "campaigns": True,
+ "campaigns_per_month": 999999,
+ "max_campaign_recipients": 999999,
},
# ── Enterprise (custom) ───────────────────────────────────────────────────
"enterprise": {
@@ -299,9 +324,17 @@ PLAN_LIMITS = {
"conversations_limit": 999999,
"code_export": True,
"analytics": True,
- "channels": ["telegram", "whatsapp"],
+ "gap_suggestions": True,
+ "channels": ["telegram"],
"url_sources": 999999,
"leads_per_month": 999999,
+ "inbox_replies": True,
+ "leads_editing": True,
"show_branding": False,
+ "appointments": True,
+ "appointments_chatbots": 999999,
+ "campaigns": True,
+ "campaigns_per_month": 999999,
+ "max_campaign_recipients": 999999,
},
}
\ No newline at end of file
diff --git a/app/dependencies.py b/app/dependencies.py
index a28ce3e..b9ba036 100644
--- a/app/dependencies.py
+++ b/app/dependencies.py
@@ -29,7 +29,24 @@ async def get_current_user(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
- return response.user
+ user = response.user
+
+ # Check for suspension
+ try:
+ profile = supabase.table("user_profiles").select("suspended_at").eq("user_id", user.id).execute()
+ if profile.data and profile.data[0].get("suspended_at"):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Account suspended. Please contact support.",
+ )
+ except HTTPException:
+ raise
+ except Exception:
+ pass # Don't block login if profile lookup fails
+
+ return user
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Auth error: {e}")
raise HTTPException(
@@ -38,6 +55,29 @@ async def get_current_user(
)
+async def get_admin_user(
+ current_user=Depends(get_current_user),
+):
+ """Require the current user to be an admin."""
+ supabase = get_supabase()
+ try:
+ profile = supabase.table("user_profiles").select("is_admin").eq("user_id", current_user.id).execute()
+ if not profile.data or not profile.data[0].get("is_admin"):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Admin access required",
+ )
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Admin check failed: {e}")
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Admin access required",
+ )
+ return current_user
+
+
async def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
):
diff --git a/app/logging_config.py b/app/logging_config.py
new file mode 100644
index 0000000..d97ef27
--- /dev/null
+++ b/app/logging_config.py
@@ -0,0 +1,34 @@
+import logging
+import os
+
+
+def configure_logging():
+ """Configure structured JSON logging for the application."""
+ log_level = logging.DEBUG if os.getenv("APP_ENV", "development") == "development" else logging.INFO
+
+ try:
+ from pythonjsonlogger import jsonlogger
+
+ handler = logging.StreamHandler()
+ formatter = jsonlogger.JsonFormatter(
+ fmt="%(asctime)s %(levelname)s %(name)s %(message)s",
+ datefmt="%Y-%m-%dT%H:%M:%S",
+ rename_fields={"asctime": "timestamp", "levelname": "level", "name": "logger"},
+ )
+ handler.setFormatter(formatter)
+ except ImportError:
+ # Fallback to plain text if pythonjsonlogger not installed yet
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+ )
+
+ root_logger = logging.getLogger()
+ root_logger.handlers.clear()
+ root_logger.addHandler(handler)
+ root_logger.setLevel(log_level)
+
+ # Silence noisy third-party loggers
+ logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
+ logging.getLogger("httpx").setLevel(logging.WARNING)
+ logging.getLogger("httpcore").setLevel(logging.WARNING)
diff --git a/app/main.py b/app/main.py
index 7f1876e..314b1e5 100644
--- a/app/main.py
+++ b/app/main.py
@@ -4,17 +4,18 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
import logging
+from app.logging_config import configure_logging
+configure_logging() # Must be called before any logger is created
+
from app.config import settings
from app.routers import auth, chatbots, documents, chat, marketplace, billing, models, analytics, inbox, leads, upload
from app.routers.documents import router_url_sources
from app.routers.leads import leads_public_router
from app.routers.channels import router as channels_router, webhook_router as channels_webhook_router
+from app.routers import admin as admin_router
+from app.routers.appointments import router as appointments_router, public_router as appointments_public_router
+from app.routers.campaigns import router as campaigns_router
-# Configure logging
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
-)
logger = logging.getLogger(__name__)
@@ -62,13 +63,28 @@ app.include_router(router_url_sources, prefix="/api/v1")
app.include_router(leads_public_router, prefix="/api/v1")
app.include_router(channels_router, prefix="/api/v1")
app.include_router(channels_webhook_router, prefix="/api/v1")
+app.include_router(appointments_router, prefix="/api/v1")
+app.include_router(appointments_public_router, prefix="/api/v1")
+app.include_router(campaigns_router, prefix="/api/v1")
+app.include_router(admin_router.router, prefix="/api/v1")
# ── Widget ─────────────────────────────────────────────────────────────────────
-@app.get("/widget.js")
+@app.get("/widget.js", include_in_schema=False)
async def serve_widget():
from app.services.widget import generate_widget_js
- return Response(generate_widget_js(settings.app_url), media_type="application/javascript")
+ return Response(
+ content=generate_widget_js(settings.app_url),
+ media_type="application/javascript",
+ headers={
+ # Allow any site to load this script tag cross-origin
+ "Access-Control-Allow-Origin": "*",
+ # Cache for 1 hour in browsers / CDN; revalidate when stale
+ "Cache-Control": "public, max-age=3600, stale-while-revalidate=86400",
+ # Prevent MIME sniffing
+ "X-Content-Type-Options": "nosniff",
+ },
+ )
# ── Health & Info ──────────────────────────────────────────────────────────────
@@ -87,6 +103,15 @@ async def health():
return {"status": "healthy", "environment": settings.app_env}
+# ── Prometheus Metrics ──────────────────────────────────────────────────────────
+try:
+ from prometheus_fastapi_instrumentator import Instrumentator
+ Instrumentator().instrument(app).expose(app, endpoint="/metrics", include_in_schema=False)
+ logger.info("Prometheus metrics enabled at /metrics")
+except ImportError:
+ logger.info("prometheus-fastapi-instrumentator not installed, metrics endpoint disabled")
+
+
# ── Sentry ─────────────────────────────────────────────────────────────────────
if settings.sentry_dsn:
import sentry_sdk
diff --git a/app/models.py b/app/models.py
index 03647da..99f5230 100644
--- a/app/models.py
+++ b/app/models.py
@@ -1,8 +1,9 @@
-from pydantic import BaseModel, EmailStr, Field
+from pydantic import BaseModel, EmailStr, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
import uuid
+import re
# ─── Enums ────────────────────────────────────────────────────────────────────
@@ -59,6 +60,7 @@ class UserResponse(BaseModel):
email: str
company_name: Optional[str] = None
plan: str = "free"
+ is_admin: bool = False
created_at: Optional[datetime] = None
@@ -100,6 +102,39 @@ class ChatbotCreate(BaseModel):
description: Optional[str] = None
system_prompt: Optional[str] = None
model: str = "accounts/fireworks/models/kimi-k2-instruct-0905"
+
+ @field_validator("name", mode="before")
+ @classmethod
+ def sanitize_name(cls, v: Any) -> Any:
+ if v:
+ v = str(v).strip()
+ if len(v) > 100:
+ raise ValueError("Name must be 100 characters or less")
+ return v
+
+ @field_validator("system_prompt", mode="before")
+ @classmethod
+ def sanitize_system_prompt(cls, v: Any) -> Any:
+ if v:
+ v = re.sub(r"", "", str(v), flags=re.DOTALL | re.IGNORECASE)
+ v = re.sub(r"javascript:", "", v, flags=re.IGNORECASE)
+ if len(v) > 10000:
+ raise ValueError("System prompt must be 10000 characters or less")
+ return v
+
+ @field_validator("description", mode="before")
+ @classmethod
+ def sanitize_description(cls, v: Any) -> Any:
+ if v and len(str(v)) > 2000:
+ raise ValueError("Description must be 2000 characters or less")
+ return v
+
+ @field_validator("welcome_message", mode="before")
+ @classmethod
+ def sanitize_welcome_message(cls, v: Any) -> Any:
+ if v and len(str(v)) > 500:
+ raise ValueError("Welcome message must be 500 characters or less")
+ return v
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=1000, ge=100, le=8000)
primary_color: str = "#6366f1"
@@ -116,12 +151,46 @@ class ChatbotCreate(BaseModel):
handoff_message: str = "I'll connect you with our team. Please wait."
handoff_email: Optional[str] = None
handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"]
+ booking_enabled: bool = False
class ChatbotUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
system_prompt: Optional[str] = None
+
+ @field_validator("name", mode="before")
+ @classmethod
+ def sanitize_name(cls, v: Any) -> Any:
+ if v:
+ v = str(v).strip()
+ if len(v) > 100:
+ raise ValueError("Name must be 100 characters or less")
+ return v
+
+ @field_validator("system_prompt", mode="before")
+ @classmethod
+ def sanitize_system_prompt(cls, v: Any) -> Any:
+ if v:
+ v = re.sub(r"", "", str(v), flags=re.DOTALL | re.IGNORECASE)
+ v = re.sub(r"javascript:", "", v, flags=re.IGNORECASE)
+ if len(v) > 10000:
+ raise ValueError("System prompt must be 10000 characters or less")
+ return v
+
+ @field_validator("description", mode="before")
+ @classmethod
+ def sanitize_description(cls, v: Any) -> Any:
+ if v and len(str(v)) > 2000:
+ raise ValueError("Description must be 2000 characters or less")
+ return v
+
+ @field_validator("welcome_message", mode="before")
+ @classmethod
+ def sanitize_welcome_message(cls, v: Any) -> Any:
+ if v and len(str(v)) > 500:
+ raise ValueError("Welcome message must be 500 characters or less")
+ return v
model: Optional[str] = None
temperature: Optional[float] = None
max_tokens: Optional[int] = None
@@ -139,6 +208,7 @@ class ChatbotUpdate(BaseModel):
handoff_message: Optional[str] = None
handoff_email: Optional[str] = None
handoff_keywords: Optional[List[str]] = None
+ booking_enabled: Optional[bool] = None
class ChatbotResponse(BaseModel):
@@ -172,6 +242,7 @@ class ChatbotResponse(BaseModel):
handoff_message: str = "I'll connect you with our team. Please wait."
handoff_email: Optional[str] = None
handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"]
+ booking_enabled: bool = False
class ChatbotPublicResponse(BaseModel):
@@ -355,9 +426,16 @@ class LeadResponse(BaseModel):
name: Optional[str] = None
phone: Optional[str] = None
company: Optional[str] = None
+ status: str = "new"
+ notes: Optional[str] = None
created_at: Optional[datetime] = None
+class LeadUpdate(BaseModel):
+ status: Optional[str] = None # new, contacted, qualified, closed, lost
+ notes: Optional[str] = None
+
+
# ─── URL Source Models ─────────────────────────────────────────────────────────
class UrlSourceCreate(BaseModel):
@@ -392,6 +470,8 @@ class InboxConversation(BaseModel):
language: str
message_count: int
first_message: Optional[str] = None
+ status: str = "open"
+ last_agent_reply_at: Optional[datetime] = None
created_at: Optional[datetime] = None
@@ -405,13 +485,148 @@ class InboxMessage(BaseModel):
created_at: Optional[datetime] = None
+class ConversationStatusUpdate(BaseModel):
+ status: str # open, agent_handling, resolved
+
+
+class AgentReplyCreate(BaseModel):
+ message: str = Field(min_length=1, max_length=4000)
+
+
# ─── Channel Models ────────────────────────────────────────────────────────────
class ChannelConnectionResponse(BaseModel):
id: str
channel: str
bot_username: Optional[str] = None
- wa_keyword: Optional[str] = None
- wa_link: Optional[str] = None
is_active: bool
- created_at: Optional[datetime] = None
\ No newline at end of file
+ created_at: Optional[datetime] = None
+
+
+# ─── Admin Models ──────────────────────────────────────────────────────────────
+
+class AdminUserListItem(BaseModel):
+ id: str
+ email: str
+ company_name: Optional[str] = None
+ plan: str = "free"
+ subscription_status: str = "active"
+ chatbot_count: int = 0
+ conversations_count: int = 0
+ is_suspended: bool = False
+ is_admin: bool = False
+ created_at: Optional[datetime] = None
+
+
+class AdminUserDetail(AdminUserListItem):
+ website: Optional[str] = None
+ industry: Optional[str] = None
+ chatbots: List[Dict[str, Any]] = []
+
+
+class AdminChangePlanRequest(BaseModel):
+ plan: str
+ reason: Optional[str] = None
+
+
+class AdminSuspendRequest(BaseModel):
+ suspend: bool
+ reason: Optional[str] = None
+
+
+class AdminStatsResponse(BaseModel):
+ total_users: int
+ total_chatbots: int
+ total_published_chatbots: int
+ total_conversations: int
+ total_messages: int
+ active_subscriptions: Dict[str, int]
+
+
+class AdminChatbotListItem(BaseModel):
+ id: str
+ name: str
+ owner_email: Optional[str] = None
+ company_name: Optional[str] = None
+ is_published: bool = False
+ document_count: int = 0
+ conversation_count: int = 0
+ created_at: Optional[datetime] = None
+
+
+class AdminSystemHealth(BaseModel):
+ db: str
+ qdrant: str
+ llm_providers: Dict[str, bool]
+ timestamp: datetime
+
+
+class AdminConversationListItem(BaseModel):
+ id: str
+ chatbot_name: Optional[str] = None
+ session_id: Optional[str] = None
+ language: Optional[str] = None
+ message_count: int = 0
+ created_at: Optional[datetime] = None
+ first_message: Optional[str] = None
+
+
+# ─── Appointment Models ────────────────────────────────────────────────────────
+
+class BusinessHoursEntry(BaseModel):
+ day_of_week: int = Field(ge=0, le=6) # 0=Mon, 6=Sun
+ is_open: bool = True
+ open_time: str = "09:00" # HH:MM
+ close_time: str = "17:00"
+ slot_duration_minutes: int = Field(default=60, ge=15, le=480)
+
+
+class BusinessHoursSave(BaseModel):
+ hours: List[BusinessHoursEntry]
+
+
+class AppointmentCreate(BaseModel):
+ customer_name: str = Field(min_length=1, max_length=200)
+ customer_contact: str = Field(min_length=1, max_length=200)
+ service: Optional[str] = None
+ slot_start: datetime
+ notes: Optional[str] = None
+ conversation_id: Optional[str] = None
+
+
+class AppointmentResponse(BaseModel):
+ id: str
+ chatbot_id: str
+ conversation_id: Optional[str] = None
+ customer_name: str
+ customer_contact: str
+ service: Optional[str] = None
+ slot_start: datetime
+ slot_end: datetime
+ status: str
+ notes: Optional[str] = None
+ created_at: Optional[datetime] = None
+
+
+class AppointmentStatusUpdate(BaseModel):
+ status: str # pending, confirmed, cancelled, completed
+
+
+# ─── Campaign Models ───────────────────────────────────────────────────────────
+
+class CampaignCreate(BaseModel):
+ chatbot_id: str
+ title: str = Field(min_length=1, max_length=200)
+ message: str = Field(min_length=1, max_length=4000)
+
+
+class CampaignResponse(BaseModel):
+ id: str
+ chatbot_id: str
+ title: str
+ message: str
+ status: str
+ recipients_count: int
+ sent_count: int
+ created_at: Optional[datetime] = None
+ sent_at: Optional[datetime] = None
\ No newline at end of file
diff --git a/app/routers/admin.py b/app/routers/admin.py
new file mode 100644
index 0000000..07f0f60
--- /dev/null
+++ b/app/routers/admin.py
@@ -0,0 +1,555 @@
+"""
+Admin router — all endpoints require is_admin = TRUE in user_profiles.
+
+Bootstrap: after running migration 001, set your admin user in Supabase:
+ UPDATE user_profiles SET is_admin = TRUE WHERE user_id = '';
+"""
+import logging
+import time
+from collections import defaultdict
+from datetime import datetime
+from typing import Optional, List, Dict
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from app.dependencies import get_admin_user
+from app.database import get_supabase
+from app.models import (
+ AdminStatsResponse, AdminUserListItem, AdminUserDetail,
+ AdminChangePlanRequest, AdminSuspendRequest, AdminChatbotListItem,
+ AdminSystemHealth, AdminConversationListItem, SuccessResponse,
+)
+from app.services.vector_store import vector_store
+from app.services.storage import delete_from_storage
+from app.config import settings
+
+router = APIRouter(prefix="/admin", tags=["Admin"])
+logger = logging.getLogger(__name__)
+_app_start_time = time.time()
+
+
+# ── Stats ──────────────────────────────────────────────────────────────────────
+
+@router.get("/stats", response_model=AdminStatsResponse)
+async def get_stats(admin=Depends(get_admin_user)):
+ """Platform-wide statistics."""
+ supabase = get_supabase()
+
+ # Total users
+ try:
+ users_resp = supabase.table("user_profiles").select("user_id", count="exact").execute()
+ total_users = users_resp.count or 0
+ except Exception:
+ total_users = 0
+
+ # Total chatbots
+ try:
+ cb_resp = supabase.table("chatbots").select("id", count="exact").execute()
+ total_chatbots = cb_resp.count or 0
+ pub_resp = supabase.table("chatbots").select("id", count="exact").eq("is_published", True).execute()
+ total_published = pub_resp.count or 0
+ except Exception:
+ total_chatbots = 0
+ total_published = 0
+
+ # Total conversations
+ try:
+ conv_resp = supabase.table("conversations").select("id", count="exact").execute()
+ total_convos = conv_resp.count or 0
+ except Exception:
+ total_convos = 0
+
+ # Total messages
+ try:
+ msg_resp = supabase.table("messages").select("id", count="exact").execute()
+ total_messages = msg_resp.count or 0
+ except Exception:
+ total_messages = 0
+
+ # Active subscriptions by plan
+ active_subs: Dict[str, int] = defaultdict(int)
+ try:
+ subs_resp = supabase.table("subscriptions").select("plan, status").eq("status", "active").execute()
+ for s in (subs_resp.data or []):
+ active_subs[s["plan"]] += 1
+ except Exception:
+ pass
+
+ return AdminStatsResponse(
+ total_users=total_users,
+ total_chatbots=total_chatbots,
+ total_published_chatbots=total_published,
+ total_conversations=total_convos,
+ total_messages=total_messages,
+ active_subscriptions=dict(active_subs),
+ )
+
+
+# ── Users ──────────────────────────────────────────────────────────────────────
+
+@router.get("/users")
+async def list_users(
+ admin=Depends(get_admin_user),
+ page: int = Query(1, ge=1),
+ limit: int = Query(20, ge=1, le=100),
+ search: Optional[str] = None,
+ plan: Optional[str] = None,
+):
+ """Paginated list of all users with company and subscription info."""
+ supabase = get_supabase()
+ offset = (page - 1) * limit
+
+ try:
+ # Fetch companies (contains owner_id and name)
+ companies_resp = supabase.table("companies").select("id, owner_id, name, website, industry").execute()
+ companies = {c["owner_id"]: c for c in (companies_resp.data or [])}
+
+ # Fetch subscriptions
+ subs_resp = supabase.table("subscriptions").select("user_id, plan, status").execute()
+ subs = {s["user_id"]: s for s in (subs_resp.data or [])}
+
+ # Fetch user profiles (suspension, admin flag)
+ profiles_resp = supabase.table("user_profiles").select("user_id, is_admin, suspended_at").execute()
+ profiles = {p["user_id"]: p for p in (profiles_resp.data or [])}
+
+ # Fetch auth users via admin API
+ try:
+ auth_users_resp = supabase.auth.admin.list_users()
+ auth_users = auth_users_resp if isinstance(auth_users_resp, list) else getattr(auth_users_resp, 'users', [])
+ except Exception as e:
+ logger.warning(f"Could not list auth users: {e}")
+ auth_users = []
+
+ # Fetch chatbot counts per company
+ cb_resp = supabase.table("chatbots").select("company_id").execute()
+ cb_by_company: Dict[str, int] = defaultdict(int)
+ for cb in (cb_resp.data or []):
+ cb_by_company[cb["company_id"]] += 1
+
+ # Fetch conversation counts per chatbot (to get per-user conv count)
+ chatbots_resp = supabase.table("chatbots").select("id, company_id").execute()
+ chatbot_company_map = {cb["id"]: cb["company_id"] for cb in (chatbots_resp.data or [])}
+
+ conv_resp = supabase.table("conversations").select("chatbot_id", count="exact").execute()
+ conv_by_chatbot: Dict[str, int] = defaultdict(int)
+ for conv in (conv_resp.data or []):
+ conv_by_chatbot[conv["chatbot_id"]] += 1
+
+ conv_by_company: Dict[str, int] = defaultdict(int)
+ for cb_id, count in conv_by_chatbot.items():
+ company_id = chatbot_company_map.get(cb_id)
+ if company_id:
+ conv_by_company[company_id] += count
+
+ # Build user list
+ users_list = []
+ for auth_user in auth_users:
+ uid = getattr(auth_user, "id", None) or auth_user.get("id", "")
+ email = getattr(auth_user, "email", None) or auth_user.get("email", "")
+ created_at = getattr(auth_user, "created_at", None) or auth_user.get("created_at")
+
+ # Apply filters
+ if search and search.lower() not in email.lower():
+ company_info = companies.get(uid, {})
+ if search.lower() not in (company_info.get("name") or "").lower():
+ continue
+
+ sub_info = subs.get(uid, {})
+ user_plan = sub_info.get("plan", "free")
+ if plan and user_plan != plan:
+ continue
+
+ company_info = companies.get(uid, {})
+ profile_info = profiles.get(uid, {})
+
+ users_list.append(AdminUserListItem(
+ id=uid,
+ email=email,
+ company_name=company_info.get("name"),
+ plan=user_plan,
+ subscription_status=sub_info.get("status", "active"),
+ chatbot_count=cb_by_company.get(company_info.get("id", ""), 0),
+ conversations_count=conv_by_company.get(company_info.get("id", ""), 0),
+ is_suspended=bool(profile_info.get("suspended_at")),
+ is_admin=bool(profile_info.get("is_admin", False)),
+ created_at=created_at,
+ ))
+
+ total = len(users_list)
+ paginated = users_list[offset:offset + limit]
+
+ return {
+ "users": [u.model_dump() for u in paginated],
+ "total": total,
+ "page": page,
+ "pages": max(1, (total + limit - 1) // limit),
+ }
+
+ except Exception as e:
+ logger.error(f"Admin list_users error: {e}")
+ raise HTTPException(status_code=500, detail="Failed to fetch users")
+
+
+@router.get("/users/{user_id}", response_model=AdminUserDetail)
+async def get_user(user_id: str, admin=Depends(get_admin_user)):
+ """Detailed info about a specific user."""
+ supabase = get_supabase()
+
+ try:
+ auth_user = supabase.auth.admin.get_user_by_id(user_id)
+ auth_u = getattr(auth_user, "user", auth_user)
+ email = getattr(auth_u, "email", "") or auth_u.get("email", "")
+ created_at = getattr(auth_u, "created_at", None) or auth_u.get("created_at")
+ except Exception:
+ email = ""
+ created_at = None
+
+ company = supabase.table("companies").select("*").eq("owner_id", user_id).execute()
+ company_info = company.data[0] if company.data else {}
+
+ sub = supabase.table("subscriptions").select("plan, status").eq("user_id", user_id).execute()
+ sub_info = sub.data[0] if sub.data else {}
+
+ profile = supabase.table("user_profiles").select("is_admin, suspended_at").eq("user_id", user_id).execute()
+ profile_info = profile.data[0] if profile.data else {}
+
+ chatbots = []
+ if company_info.get("id"):
+ cb_resp = supabase.table("chatbots").select("id, name, is_published, created_at") \
+ .eq("company_id", company_info["id"]).execute()
+ chatbots = cb_resp.data or []
+
+ return AdminUserDetail(
+ id=user_id,
+ email=email,
+ company_name=company_info.get("name"),
+ website=company_info.get("website"),
+ industry=company_info.get("industry"),
+ plan=sub_info.get("plan", "free"),
+ subscription_status=sub_info.get("status", "active"),
+ chatbot_count=len(chatbots),
+ conversations_count=0,
+ is_suspended=bool(profile_info.get("suspended_at")),
+ is_admin=bool(profile_info.get("is_admin", False)),
+ created_at=created_at,
+ chatbots=chatbots,
+ )
+
+
+@router.patch("/users/{user_id}/plan")
+async def change_user_plan(user_id: str, data: AdminChangePlanRequest, admin=Depends(get_admin_user)):
+ """Manually grant or change a user's subscription plan."""
+ valid_plans = ["free", "starter", "business", "agency", "enterprise"]
+ if data.plan not in valid_plans:
+ raise HTTPException(status_code=400, detail=f"Invalid plan. Must be one of: {valid_plans}")
+
+ supabase = get_supabase()
+
+ try:
+ supabase.table("subscriptions").upsert({
+ "user_id": user_id,
+ "plan": data.plan,
+ "status": "active",
+ }, on_conflict="user_id").execute()
+ except Exception as e:
+ logger.error(f"Failed to change plan for {user_id}: {e}")
+ raise HTTPException(status_code=500, detail="Failed to update plan")
+
+ logger.info(f"Admin {admin.id} changed plan for user {user_id} to {data.plan}. Reason: {data.reason}")
+ return {"message": f"Plan updated to {data.plan}", "user_id": user_id, "plan": data.plan}
+
+
+@router.post("/users/{user_id}/suspend")
+async def suspend_user(user_id: str, data: AdminSuspendRequest, admin=Depends(get_admin_user)):
+ """Suspend or unsuspend a user account."""
+ supabase = get_supabase()
+
+ update_data: dict = {"updated_at": datetime.utcnow().isoformat()}
+ if data.suspend:
+ update_data["suspended_at"] = datetime.utcnow().isoformat()
+ if data.reason:
+ update_data["suspended_reason"] = data.reason
+ action = "suspended"
+ else:
+ update_data["suspended_at"] = None
+ update_data["suspended_reason"] = None
+ action = "unsuspended"
+
+ try:
+ supabase.table("user_profiles").upsert(
+ {"user_id": user_id, **update_data},
+ on_conflict="user_id"
+ ).execute()
+ except Exception as e:
+ logger.error(f"Failed to {action} user {user_id}: {e}")
+ raise HTTPException(status_code=500, detail=f"Failed to {action} user")
+
+ logger.info(f"Admin {admin.id} {action} user {user_id}")
+ return {"message": f"User {action}", "user_id": user_id}
+
+
+@router.delete("/users/{user_id}", response_model=SuccessResponse)
+async def delete_user(user_id: str, admin=Depends(get_admin_user)):
+ """Permanently delete a user and all their data."""
+ supabase = get_supabase()
+
+ # 1. Get company
+ company = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
+ company_id = company.data[0]["id"] if company.data else None
+
+ if company_id:
+ # 2. Get all chatbots and clean up Qdrant + storage
+ chatbots = supabase.table("chatbots").select("id, qdrant_collection_name, logo_url") \
+ .eq("company_id", company_id).execute()
+ for cb in (chatbots.data or []):
+ if cb.get("qdrant_collection_name"):
+ try:
+ vector_store.delete_collection(cb["qdrant_collection_name"])
+ except Exception as e:
+ logger.warning(f"Failed to delete Qdrant collection for chatbot {cb['id']}: {e}")
+ if cb.get("logo_url"):
+ delete_from_storage(supabase, "logos", cb["logo_url"])
+
+ # 3. Delete documents from storage
+ docs = supabase.table("documents").select("file_url") \
+ .in_("chatbot_id", [cb["id"] for cb in (chatbots.data or [])]).execute()
+ for doc in (docs.data or []):
+ if doc.get("file_url"):
+ delete_from_storage(supabase, "documents", doc["file_url"])
+
+ # 4. Delete company (cascades to chatbots, documents, conversations)
+ supabase.table("companies").delete().eq("id", company_id).execute()
+
+ # 5. Delete subscription
+ supabase.table("subscriptions").delete().eq("user_id", user_id).execute()
+
+ # 6. Delete user profile
+ supabase.table("user_profiles").delete().eq("user_id", user_id).execute()
+
+ # 7. Delete auth user
+ try:
+ supabase.auth.admin.delete_user(user_id)
+ except Exception as e:
+ logger.error(f"Failed to delete auth user {user_id}: {e}")
+ raise HTTPException(status_code=500, detail="Failed to delete auth user")
+
+ logger.info(f"Admin {admin.id} deleted user {user_id}")
+ return SuccessResponse(success=True, message="User deleted successfully")
+
+
+# ── Chatbots ───────────────────────────────────────────────────────────────────
+
+@router.get("/chatbots")
+async def list_all_chatbots(
+ admin=Depends(get_admin_user),
+ page: int = Query(1, ge=1),
+ limit: int = Query(20, ge=1, le=100),
+ search: Optional[str] = None,
+):
+ """Paginated list of ALL chatbots across all users."""
+ supabase = get_supabase()
+ offset = (page - 1) * limit
+
+ try:
+ # Get chatbots with company info
+ q = supabase.table("chatbots").select("*, companies(name, owner_id)") \
+ .order("created_at", desc=True)
+ result = q.execute()
+ all_chatbots = result.data or []
+
+ # Apply search filter
+ if search:
+ s = search.lower()
+ all_chatbots = [
+ cb for cb in all_chatbots
+ if s in (cb.get("name") or "").lower()
+ or s in (cb.get("companies", {}) or {}).get("name", "").lower()
+ ]
+
+ total = len(all_chatbots)
+ paginated = all_chatbots[offset:offset + limit]
+
+ # Get owner emails for paginated set
+ owner_ids = list({(cb.get("companies") or {}).get("owner_id") for cb in paginated if cb.get("companies")})
+ owner_emails: Dict[str, str] = {}
+ if owner_ids:
+ try:
+ for oid in owner_ids:
+ try:
+ u = supabase.auth.admin.get_user_by_id(oid)
+ u_obj = getattr(u, "user", u)
+ owner_emails[oid] = getattr(u_obj, "email", "") or u_obj.get("email", "")
+ except Exception:
+ pass
+ except Exception:
+ pass
+
+ # Get doc and conv counts for paginated chatbots
+ cb_ids = [cb["id"] for cb in paginated]
+ doc_counts: Dict[str, int] = defaultdict(int)
+ conv_counts: Dict[str, int] = defaultdict(int)
+
+ if cb_ids:
+ docs_resp = supabase.table("documents").select("chatbot_id").in_("chatbot_id", cb_ids).execute()
+ for d in (docs_resp.data or []):
+ doc_counts[d["chatbot_id"]] += 1
+
+ convs_resp = supabase.table("conversations").select("chatbot_id").in_("chatbot_id", cb_ids).execute()
+ for c in (convs_resp.data or []):
+ conv_counts[c["chatbot_id"]] += 1
+
+ items = []
+ for cb in paginated:
+ company_info = cb.get("companies") or {}
+ owner_id = company_info.get("owner_id")
+ items.append(AdminChatbotListItem(
+ id=cb["id"],
+ name=cb.get("name", ""),
+ owner_email=owner_emails.get(owner_id),
+ company_name=company_info.get("name"),
+ is_published=cb.get("is_published", False),
+ document_count=doc_counts[cb["id"]],
+ conversation_count=conv_counts[cb["id"]],
+ created_at=cb.get("created_at"),
+ ))
+
+ return {
+ "chatbots": [i.model_dump() for i in items],
+ "total": total,
+ "page": page,
+ "pages": max(1, (total + limit - 1) // limit),
+ }
+
+ except Exception as e:
+ logger.error(f"Admin list_chatbots error: {e}")
+ raise HTTPException(status_code=500, detail="Failed to fetch chatbots")
+
+
+@router.delete("/chatbots/{chatbot_id}", response_model=SuccessResponse)
+async def delete_chatbot_admin(chatbot_id: str, admin=Depends(get_admin_user)):
+ """Force-delete any chatbot regardless of ownership."""
+ supabase = get_supabase()
+
+ cb = supabase.table("chatbots").select("*").eq("id", chatbot_id).execute()
+ if not cb.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+ chatbot = cb.data[0]
+
+ # Delete Qdrant collection
+ if chatbot.get("qdrant_collection_name"):
+ try:
+ vector_store.delete_collection(chatbot["qdrant_collection_name"])
+ except Exception as e:
+ logger.warning(f"Failed to delete Qdrant collection: {e}")
+
+ # Delete logo from storage
+ if chatbot.get("logo_url"):
+ delete_from_storage(supabase, "logos", chatbot["logo_url"])
+
+ supabase.table("chatbots").delete().eq("id", chatbot_id).execute()
+ logger.info(f"Admin {admin.id} deleted chatbot {chatbot_id}")
+ return SuccessResponse(success=True, message="Chatbot deleted")
+
+
+# ── Conversations ──────────────────────────────────────────────────────────────
+
+@router.get("/conversations")
+async def list_conversations(
+ admin=Depends(get_admin_user),
+ page: int = Query(1, ge=1),
+ limit: int = Query(50, ge=1, le=200),
+):
+ """Recent conversations across all chatbots."""
+ supabase = get_supabase()
+ offset = (page - 1) * limit
+
+ try:
+ result = supabase.table("conversations") \
+ .select("*, chatbots(name)") \
+ .order("created_at", desc=True) \
+ .range(offset, offset + limit - 1) \
+ .execute()
+ convos = result.data or []
+
+ # Get first message for each conversation
+ conv_ids = [c["id"] for c in convos]
+ first_msgs: Dict[str, str] = {}
+ if conv_ids:
+ msgs_resp = supabase.table("messages") \
+ .select("conversation_id, content, role") \
+ .in_("conversation_id", conv_ids) \
+ .eq("role", "user") \
+ .order("created_at", desc=False) \
+ .execute()
+ seen = set()
+ for m in (msgs_resp.data or []):
+ cid = m["conversation_id"]
+ if cid not in seen:
+ seen.add(cid)
+ first_msgs[cid] = (m.get("content") or "")[:120]
+
+ # Total count
+ count_resp = supabase.table("conversations").select("id", count="exact").execute()
+ total = count_resp.count or 0
+
+ items = [
+ AdminConversationListItem(
+ id=c["id"],
+ chatbot_name=(c.get("chatbots") or {}).get("name"),
+ session_id=c.get("session_id"),
+ language=c.get("language"),
+ message_count=c.get("message_count", 0),
+ created_at=c.get("created_at"),
+ first_message=first_msgs.get(c["id"]),
+ )
+ for c in convos
+ ]
+
+ return {
+ "conversations": [i.model_dump() for i in items],
+ "total": total,
+ "page": page,
+ "pages": max(1, (total + limit - 1) // limit),
+ }
+
+ except Exception as e:
+ logger.error(f"Admin list_conversations error: {e}")
+ raise HTTPException(status_code=500, detail="Failed to fetch conversations")
+
+
+# ── System Health ──────────────────────────────────────────────────────────────
+
+@router.get("/system/health", response_model=AdminSystemHealth)
+async def system_health(admin=Depends(get_admin_user)):
+ """Check health of all system components."""
+
+ # Check database
+ db_status = "unhealthy"
+ try:
+ supabase = get_supabase()
+ supabase.table("subscriptions").select("id").limit(1).execute()
+ db_status = "healthy"
+ except Exception as e:
+ logger.warning(f"DB health check failed: {e}")
+
+ # Check Qdrant
+ qdrant_status = "unhealthy"
+ try:
+ vector_store.client.get_collections()
+ qdrant_status = "healthy"
+ except Exception as e:
+ logger.warning(f"Qdrant health check failed: {e}")
+
+ # Check LLM provider API key availability
+ llm_providers = {
+ "openai": bool(getattr(settings, "openai_api_key", None)),
+ "anthropic": bool(getattr(settings, "anthropic_api_key", None)),
+ "google": bool(getattr(settings, "google_api_key", None)),
+ "fireworks": bool(getattr(settings, "fireworks_api_key", None)),
+ }
+
+ return AdminSystemHealth(
+ db=db_status,
+ qdrant=qdrant_status,
+ llm_providers=llm_providers,
+ timestamp=datetime.utcnow(),
+ )
diff --git a/app/routers/analytics.py b/app/routers/analytics.py
index 78687d6..20843ec 100644
--- a/app/routers/analytics.py
+++ b/app/routers/analytics.py
@@ -9,6 +9,7 @@ from app.database import get_supabase
from app.dependencies import get_current_user
from app.config import PLAN_LIMITS
from typing import List, Optional, Dict
+from collections import defaultdict
from pydantic import BaseModel
from datetime import datetime, timedelta
import logging
@@ -127,14 +128,7 @@ async def get_analytics_overview(user=Depends(get_current_user)):
conversations_used=0,
)
- # Gather per-chatbot analytics
- chatbot_analytics = []
- total_convos = 0
- total_msgs = 0
- total_sessions = 0
- month_convos = 0
- all_ratings = []
-
+ # ── Batch queries (fixes N+1) ────────────────────────────────────────────────
now = datetime.utcnow()
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
week_start = now - timedelta(days=now.weekday())
@@ -142,14 +136,60 @@ async def get_analytics_overview(user=Depends(get_current_user)):
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
thirty_days_ago = now - timedelta(days=30)
+ # Batch query 1: ALL conversations for all chatbots (single query)
+ all_convos_resp = supabase.table("conversations") \
+ .select("id, chatbot_id, session_id, language, created_at") \
+ .in_("chatbot_id", chatbot_ids) \
+ .execute()
+ all_convos = all_convos_resp.data or []
+ all_conv_ids = [c["id"] for c in all_convos]
+
+ # Batch query 2: ALL messages for all conversations (single query)
+ all_msgs: List[Dict] = []
+ if all_conv_ids:
+ # Split into chunks of 500 to avoid URL length limits
+ for i in range(0, len(all_conv_ids), 500):
+ chunk = all_conv_ids[i:i + 500]
+ msgs_resp = supabase.table("messages") \
+ .select("id, conversation_id, role, content, created_at") \
+ .in_("conversation_id", chunk) \
+ .execute()
+ all_msgs.extend(msgs_resp.data or [])
+
+ # Batch query 3: ALL feedback for all chatbots (single query)
+ all_feedback: List[Dict] = []
+ if chatbot_ids:
+ fb_resp = supabase.table("message_feedback") \
+ .select("chatbot_id, feedback") \
+ .in_("chatbot_id", chatbot_ids) \
+ .execute()
+ all_feedback = fb_resp.data or []
+
+ # Index data by chatbot_id for O(1) lookups
+ convos_by_chatbot: Dict[str, List[Dict]] = defaultdict(list)
+ for c in all_convos:
+ convos_by_chatbot[c["chatbot_id"]].append(c)
+
+ msgs_by_conv: Dict[str, List[Dict]] = defaultdict(list)
+ for m in all_msgs:
+ msgs_by_conv[m["conversation_id"]].append(m)
+
+ fb_by_chatbot: Dict[str, List[Dict]] = defaultdict(list)
+ for f in all_feedback:
+ fb_by_chatbot[f["chatbot_id"]].append(f)
+
+ # ── Aggregate per chatbot ────────────────────────────────────────────────────
+ chatbot_analytics = []
+ total_convos = 0
+ total_msgs = 0
+ total_sessions = 0
+ month_convos = 0
+ all_ratings = []
+
for chatbot in chatbot_list:
cid = chatbot["id"]
-
- # Total conversations
- convos = supabase.table("conversations").select("id, session_id, language, created_at", count="exact") \
- .eq("chatbot_id", cid).execute()
- conv_count = convos.count or 0
- conv_data = convos.data or []
+ conv_data = convos_by_chatbot[cid]
+ conv_count = len(conv_data)
total_convos += conv_count
# Unique sessions
@@ -157,34 +197,35 @@ async def get_analytics_overview(user=Depends(get_current_user)):
unique_sess = len(sessions)
total_sessions += unique_sess
- # Total messages
- msgs = supabase.table("messages").select("id", count="exact") \
- .in_("conversation_id", [c["id"] for c in conv_data] if conv_data else [""]).execute()
- msg_count = msgs.count or 0
+ # Messages for this chatbot
+ chatbot_msgs = []
+ for c in conv_data:
+ chatbot_msgs.extend(msgs_by_conv[c["id"]])
+ msg_count = len(chatbot_msgs)
total_msgs += msg_count
# Time-based conversation counts
- today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_start.strftime("%Y-%m-%d"))
+ today_str = today_start.strftime("%Y-%m-%d")
+ today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_str)
week_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= week_start.isoformat())
month_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= month_start.isoformat())
month_convos += month_count
# Daily conversations (last 30 days)
- daily = {}
+ daily: Dict[str, int] = {}
for c in conv_data:
if c.get("created_at") and c["created_at"] >= thirty_days_ago.isoformat():
day = c["created_at"][:10]
daily[day] = daily.get(day, 0) + 1
-
daily_list = [DailyConversations(date=d, count=n) for d, n in sorted(daily.items())]
- # Languages used
+ # Languages
lang_counts: Dict[str, int] = {}
for c in conv_data:
lang = c.get("language", "en")
lang_counts[lang] = lang_counts.get(lang, 0) + 1
- # Peak hour (approximate from created_at)
+ # Peak hour
hour_counts: Dict[int, int] = {}
for c in conv_data:
if c.get("created_at") and len(c["created_at"]) > 13:
@@ -195,35 +236,25 @@ async def get_analytics_overview(user=Depends(get_current_user)):
pass
peak = max(hour_counts, key=hour_counts.get) if hour_counts else None
- # Top queries (from user messages, get first message per conversation)
- top_queries: List[TopQuery] = []
- if conv_data:
- conv_ids = [c["id"] for c in conv_data[:100]] # limit to recent 100
- user_msgs = supabase.table("messages").select("content") \
- .in_("conversation_id", conv_ids) \
- .eq("role", "user") \
- .limit(200).execute()
- query_counts: Dict[str, int] = {}
- for m in (user_msgs.data or []):
+ # Top queries from user messages
+ query_counts: Dict[str, int] = {}
+ for m in chatbot_msgs:
+ if m.get("role") == "user":
content = (m.get("content") or "")[:100].strip()
if content:
query_counts[content] = query_counts.get(content, 0) + 1
- top_sorted = sorted(query_counts.items(), key=lambda x: -x[1])[:5]
- top_queries = [TopQuery(query=q, count=n) for q, n in top_sorted]
+ top_queries = [TopQuery(query=q, count=n) for q, n in sorted(query_counts.items(), key=lambda x: -x[1])[:5]]
# Rating
rating = chatbot.get("average_rating")
if rating:
all_ratings.append(rating)
- # Feedback counts
- fb_result = supabase.table("message_feedback").select("feedback", count="exact") \
- .eq("chatbot_id", cid).execute()
- total_fb = fb_result.count or 0
- fb_pos = sum(1 for f in (fb_result.data or []) if f.get("feedback") == "positive")
- fb_neg = total_fb - fb_pos
+ # Feedback
+ chatbot_fb = fb_by_chatbot[cid]
+ fb_pos = sum(1 for f in chatbot_fb if f.get("feedback") == "positive")
+ fb_neg = len(chatbot_fb) - fb_pos
- # Average messages per conversation
avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0
chatbot_analytics.append(ChatbotAnalyticsResponse(
@@ -234,7 +265,7 @@ async def get_analytics_overview(user=Depends(get_current_user)):
total_messages=msg_count,
average_messages_per_conversation=avg_msgs,
average_rating=rating,
- total_ratings=total_fb,
+ total_ratings=len(chatbot_fb),
conversations_today=today_count,
conversations_this_week=week_count,
conversations_this_month=month_count,
diff --git a/app/routers/appointments.py b/app/routers/appointments.py
new file mode 100644
index 0000000..9a1a958
--- /dev/null
+++ b/app/routers/appointments.py
@@ -0,0 +1,287 @@
+from fastapi import APIRouter, HTTPException, Depends, Query
+from app.database import get_supabase
+from app.dependencies import get_current_user
+from app.config import PLAN_LIMITS
+from app.models import (
+ AppointmentCreate, AppointmentResponse, AppointmentStatusUpdate,
+ BusinessHoursEntry, BusinessHoursSave,
+)
+from typing import List, Optional
+from datetime import datetime, timedelta, date
+import uuid
+import logging
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/appointments", tags=["Appointments"])
+public_router = APIRouter(tags=["Appointments"])
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _check_booking_access(user_id: str, supabase):
+ sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute()
+ plan = sub.data[0]["plan"] if sub.data else "free"
+ if plan not in ("starter", "business", "agency", "enterprise"):
+ raise HTTPException(status_code=402, detail="Appointment booking requires Starter plan or higher")
+ return plan
+
+
+def _get_user_company_id(user_id: str, supabase) -> str:
+ result = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
+ if not result.data:
+ raise HTTPException(status_code=404, detail="Company not found")
+ return result.data[0]["id"]
+
+
+def _verify_chatbot_ownership(chatbot_id: str, company_id: str, supabase):
+ chatbot = supabase.table("chatbots").select("id").eq("id", chatbot_id).eq("company_id", company_id).execute()
+ if not chatbot.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+
+
+def _parse_time(t: str) -> tuple[int, int]:
+ """Parse HH:MM into (hour, minute)."""
+ h, m = t.split(":")
+ return int(h), int(m)
+
+
+def _get_available_slots(chatbot_id: str, target_date: date, supabase) -> List[dict]:
+ """Return list of available {slot_start, slot_end} dicts for the given date."""
+ day_of_week = target_date.weekday() # 0=Mon
+
+ hours = supabase.table("business_hours") \
+ .select("*").eq("chatbot_id", chatbot_id).eq("day_of_week", day_of_week).execute()
+
+ if not hours.data or not hours.data[0].get("is_open"):
+ return []
+
+ h = hours.data[0]
+ open_h, open_m = _parse_time(h["open_time"])
+ close_h, close_m = _parse_time(h["close_time"])
+ duration = h.get("slot_duration_minutes", 60)
+
+ slot_start = datetime(target_date.year, target_date.month, target_date.day, open_h, open_m)
+ slot_end_limit = datetime(target_date.year, target_date.month, target_date.day, close_h, close_m)
+
+ # Fetch already-booked slots for that day
+ day_start = datetime(target_date.year, target_date.month, target_date.day, 0, 0)
+ day_end = day_start + timedelta(days=1)
+
+ booked = supabase.table("appointments") \
+ .select("slot_start, slot_end") \
+ .eq("chatbot_id", chatbot_id) \
+ .neq("status", "cancelled") \
+ .gte("slot_start", day_start.isoformat()) \
+ .lt("slot_start", day_end.isoformat()) \
+ .execute()
+ booked_starts = {b["slot_start"] for b in (booked.data or [])}
+
+ slots = []
+ now = datetime.utcnow()
+ while slot_start + timedelta(minutes=duration) <= slot_end_limit:
+ slot_end = slot_start + timedelta(minutes=duration)
+ # Skip past slots
+ if slot_start > now:
+ iso_start = slot_start.isoformat()
+ if iso_start not in booked_starts:
+ slots.append({"slot_start": iso_start, "slot_end": slot_end.isoformat()})
+ slot_start = slot_end
+
+ return slots
+
+
+# ── Protected endpoints (business owner) ─────────────────────────────────────
+
+@router.get("", response_model=List[AppointmentResponse])
+async def list_appointments(
+ chatbot_id: Optional[str] = Query(None),
+ status: Optional[str] = Query(None),
+ page: int = Query(1, ge=1),
+ limit: int = Query(50, ge=1, le=200),
+ user=Depends(get_current_user),
+):
+ supabase = get_supabase()
+ _check_booking_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id)
+ if chatbot_id:
+ chatbots_q = chatbots_q.eq("id", chatbot_id)
+ chatbots = chatbots_q.execute()
+ chatbot_ids = [c["id"] for c in (chatbots.data or [])]
+ if not chatbot_ids:
+ return []
+
+ offset = (page - 1) * limit
+ q = supabase.table("appointments").select("*").in_("chatbot_id", chatbot_ids)
+ if status:
+ q = q.eq("status", status)
+ result = q.order("slot_start", desc=False).range(offset, offset + limit - 1).execute()
+ return [AppointmentResponse(**a) for a in (result.data or [])]
+
+
+@router.patch("/{appointment_id}", response_model=AppointmentResponse)
+async def update_appointment_status(
+ appointment_id: str,
+ data: AppointmentStatusUpdate,
+ user=Depends(get_current_user),
+):
+ valid = ("pending", "confirmed", "cancelled", "completed")
+ if data.status not in valid:
+ raise HTTPException(status_code=400, detail=f"Status must be one of {valid}")
+
+ supabase = get_supabase()
+ _check_booking_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ appt = supabase.table("appointments").select("*, chatbots(company_id)") \
+ .eq("id", appointment_id).execute()
+ if not appt.data:
+ raise HTTPException(status_code=404, detail="Appointment not found")
+ if appt.data[0].get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+
+ result = supabase.table("appointments").update({"status": data.status}) \
+ .eq("id", appointment_id).execute()
+ return AppointmentResponse(**result.data[0])
+
+
+@router.get("/chatbot/{chatbot_id}/hours")
+async def get_business_hours(chatbot_id: str, user=Depends(get_current_user)):
+ supabase = get_supabase()
+ _check_booking_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+ _verify_chatbot_ownership(chatbot_id, company_id, supabase)
+
+ result = supabase.table("business_hours").select("*") \
+ .eq("chatbot_id", chatbot_id).order("day_of_week").execute()
+ return result.data or []
+
+
+@router.put("/chatbot/{chatbot_id}/hours")
+async def save_business_hours(
+ chatbot_id: str,
+ data: BusinessHoursSave,
+ user=Depends(get_current_user),
+):
+ supabase = get_supabase()
+ _check_booking_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+ _verify_chatbot_ownership(chatbot_id, company_id, supabase)
+
+ # Upsert each day
+ for entry in data.hours:
+ existing = supabase.table("business_hours").select("id") \
+ .eq("chatbot_id", chatbot_id).eq("day_of_week", entry.day_of_week).execute()
+ row = {
+ "chatbot_id": chatbot_id,
+ "day_of_week": entry.day_of_week,
+ "is_open": entry.is_open,
+ "open_time": entry.open_time,
+ "close_time": entry.close_time,
+ "slot_duration_minutes": entry.slot_duration_minutes,
+ }
+ if existing.data:
+ supabase.table("business_hours").update(row).eq("id", existing.data[0]["id"]).execute()
+ else:
+ row["id"] = str(uuid.uuid4())
+ supabase.table("business_hours").insert(row).execute()
+
+ return {"success": True}
+
+
+# ── Public endpoints (customers booking) ─────────────────────────────────────
+
+@public_router.get("/chatbots/{chatbot_id}/booking-info")
+async def get_booking_info(chatbot_id: str):
+ """Return public booking info for the booking page (no auth required)."""
+ supabase = get_supabase()
+ result = supabase.table("chatbots") \
+ .select("id, name, booking_enabled, companies(name)") \
+ .eq("id", chatbot_id).execute()
+ if not result.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+ chatbot = result.data[0]
+ if not chatbot.get("booking_enabled"):
+ raise HTTPException(status_code=400, detail="Booking is not enabled for this chatbot")
+ return {
+ "chatbot_id": chatbot["id"],
+ "chatbot_name": chatbot.get("name", ""),
+ "company_name": (chatbot.get("companies") or {}).get("name", ""),
+ }
+
+
+@public_router.get("/chatbots/{chatbot_id}/available-slots")
+async def get_available_slots(
+ chatbot_id: str,
+ date: str = Query(..., description="YYYY-MM-DD"),
+):
+ """Return available time slots for a given date (public)."""
+ supabase = get_supabase()
+
+ chatbot = supabase.table("chatbots").select("id, booking_enabled, is_published") \
+ .eq("id", chatbot_id).execute()
+ if not chatbot.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+ if not chatbot.data[0].get("booking_enabled"):
+ raise HTTPException(status_code=400, detail="Booking not enabled for this chatbot")
+
+ try:
+ target = datetime.strptime(date, "%Y-%m-%d").date()
+ except ValueError:
+ raise HTTPException(status_code=400, detail="Invalid date format, use YYYY-MM-DD")
+
+ slots = _get_available_slots(chatbot_id, target, supabase)
+ return {"date": date, "slots": slots}
+
+
+@public_router.post("/chatbots/{chatbot_id}/appointments", response_model=AppointmentResponse, status_code=201)
+async def create_appointment(chatbot_id: str, data: AppointmentCreate):
+ """Create an appointment (public endpoint, no auth required)."""
+ supabase = get_supabase()
+
+ chatbot = supabase.table("chatbots").select("id, booking_enabled") \
+ .eq("id", chatbot_id).execute()
+ if not chatbot.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+ if not chatbot.data[0].get("booking_enabled"):
+ raise HTTPException(status_code=400, detail="Booking not enabled for this chatbot")
+
+ # Verify the slot is still available
+ slot_start_dt = data.slot_start
+ target_date = slot_start_dt.date()
+ available = _get_available_slots(chatbot_id, target_date, supabase)
+ available_starts = {s["slot_start"] for s in available}
+
+ slot_iso = slot_start_dt.isoformat()
+ # Try a few normalizations (with/without timezone suffix)
+ if slot_iso not in available_starts and slot_iso + "Z" not in available_starts:
+ # Check without microseconds
+ slot_iso_no_ms = slot_start_dt.replace(microsecond=0).isoformat()
+ if slot_iso_no_ms not in available_starts:
+ raise HTTPException(status_code=409, detail="This slot is no longer available")
+
+ # Calculate slot_end based on business hours duration
+ hours = supabase.table("business_hours").select("slot_duration_minutes") \
+ .eq("chatbot_id", chatbot_id).eq("day_of_week", target_date.weekday()).execute()
+ duration = hours.data[0]["slot_duration_minutes"] if hours.data else 60
+ slot_end_dt = slot_start_dt + timedelta(minutes=duration)
+
+ appt_data = {
+ "id": str(uuid.uuid4()),
+ "chatbot_id": chatbot_id,
+ "conversation_id": data.conversation_id,
+ "customer_name": data.customer_name,
+ "customer_contact": data.customer_contact,
+ "service": data.service,
+ "slot_start": data.slot_start.isoformat(),
+ "slot_end": slot_end_dt.isoformat(),
+ "status": "pending",
+ "notes": data.notes,
+ }
+
+ result = supabase.table("appointments").insert(appt_data).execute()
+ if not result.data:
+ raise HTTPException(status_code=500, detail="Failed to create appointment")
+ return AppointmentResponse(**result.data[0])
diff --git a/app/routers/auth.py b/app/routers/auth.py
index 17add8c..2f1f54e 100644
--- a/app/routers/auth.py
+++ b/app/routers/auth.py
@@ -58,6 +58,12 @@ async def signup(data: UserSignup):
}
).execute()
+ # Safety-net: ensure user_profiles row exists (trigger should handle it, but just in case)
+ try:
+ supabase.table("user_profiles").insert({"user_id": user.id}).execute()
+ except Exception:
+ pass # Row may already exist from trigger
+
token = auth_resp.session.access_token if auth_resp.session else ""
return TokenResponse(
access_token=token,
@@ -66,6 +72,7 @@ async def signup(data: UserSignup):
email=user.email,
company_name=data.company_name,
plan="free",
+ is_admin=False,
),
)
except HTTPException:
@@ -109,6 +116,13 @@ async def login(data: UserLogin):
)
plan = sub.data[0]["plan"] if sub.data else "free"
+ # Get is_admin flag
+ try:
+ profile = supabase.table("user_profiles").select("is_admin").eq("user_id", user.id).execute()
+ is_admin = profile.data[0].get("is_admin", False) if profile.data else False
+ except Exception:
+ is_admin = False
+
return TokenResponse(
access_token=auth_resp.session.access_token,
user=UserResponse(
@@ -116,6 +130,7 @@ async def login(data: UserLogin):
email=user.email,
company_name=company_name,
plan=plan,
+ is_admin=is_admin,
),
)
except HTTPException:
@@ -230,9 +245,16 @@ async def get_me(user=Depends(get_current_user)):
)
plan = sub.data[0]["plan"] if sub.data else "free"
+ try:
+ profile = supabase.table("user_profiles").select("is_admin").eq("user_id", user.id).execute()
+ is_admin = profile.data[0].get("is_admin", False) if profile.data else False
+ except Exception:
+ is_admin = False
+
return UserResponse(
id=user.id,
email=user.email,
company_name=company_name,
plan=plan,
+ is_admin=is_admin,
)
diff --git a/app/routers/billing.py b/app/routers/billing.py
index d71313a..2c1ef48 100644
--- a/app/routers/billing.py
+++ b/app/routers/billing.py
@@ -89,6 +89,17 @@ async def stripe_webhook(
supabase = get_supabase()
event_type = event.get("type", "")
+ event_id = event.get("id", "")
+
+ # Idempotency check: skip already-processed events
+ if event_id:
+ existing = supabase.table("stripe_webhook_events") \
+ .select("stripe_event_id") \
+ .eq("stripe_event_id", event_id) \
+ .execute()
+ if existing.data:
+ logger.info(f"Stripe event {event_id} already processed, skipping")
+ return {"received": True}
if event_type == "checkout.session.completed":
session = event["data"]["object"]
@@ -140,6 +151,16 @@ async def stripe_webhook(
except Exception as e:
logger.warning(f"Failed to send cancellation notification: {e}")
+ # Record event as processed
+ if event_id:
+ try:
+ supabase.table("stripe_webhook_events").insert({
+ "stripe_event_id": event_id,
+ "event_type": event_type,
+ }).execute()
+ except Exception as e:
+ logger.warning(f"Failed to record stripe event {event_id}: {e}")
+
return {"received": True}
except HTTPException:
diff --git a/app/routers/campaigns.py b/app/routers/campaigns.py
new file mode 100644
index 0000000..32a8c81
--- /dev/null
+++ b/app/routers/campaigns.py
@@ -0,0 +1,167 @@
+from fastapi import APIRouter, HTTPException, Depends, Query
+from app.database import get_supabase
+from app.dependencies import get_current_user
+from app.models import CampaignCreate, CampaignResponse
+from typing import List, Optional
+import uuid
+import logging
+
+logger = logging.getLogger(__name__)
+router = APIRouter(prefix="/campaigns", tags=["Campaigns"])
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _check_campaigns_access(user_id: str, supabase):
+ sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute()
+ plan = sub.data[0]["plan"] if sub.data else "free"
+ if plan not in ("starter", "business", "agency", "enterprise"):
+ raise HTTPException(status_code=402, detail="Campaigns require Starter plan or higher")
+ return plan
+
+
+def _get_user_company_id(user_id: str, supabase) -> str:
+ result = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
+ if not result.data:
+ raise HTTPException(status_code=404, detail="Company not found")
+ return result.data[0]["id"]
+
+
+def _verify_chatbot_ownership(chatbot_id: str, company_id: str, supabase):
+ chatbot = supabase.table("chatbots").select("id").eq("id", chatbot_id).eq("company_id", company_id).execute()
+ if not chatbot.data:
+ raise HTTPException(status_code=404, detail="Chatbot not found")
+
+
+# ── Endpoints ─────────────────────────────────────────────────────────────────
+
+@router.get("", response_model=List[CampaignResponse])
+async def list_campaigns(
+ chatbot_id: Optional[str] = Query(None),
+ page: int = Query(1, ge=1),
+ limit: int = Query(20, ge=1, le=100),
+ user=Depends(get_current_user),
+):
+ supabase = get_supabase()
+ _check_campaigns_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id)
+ if chatbot_id:
+ chatbots_q = chatbots_q.eq("id", chatbot_id)
+ chatbots = chatbots_q.execute()
+ chatbot_ids = [c["id"] for c in (chatbots.data or [])]
+ if not chatbot_ids:
+ return []
+
+ offset = (page - 1) * limit
+ result = supabase.table("campaigns").select("*").in_("chatbot_id", chatbot_ids) \
+ .order("created_at", desc=True).range(offset, offset + limit - 1).execute()
+ return [CampaignResponse(**c) for c in (result.data or [])]
+
+
+@router.post("", response_model=CampaignResponse, status_code=201)
+async def create_campaign(data: CampaignCreate, user=Depends(get_current_user)):
+ supabase = get_supabase()
+ _check_campaigns_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+ _verify_chatbot_ownership(data.chatbot_id, company_id, supabase)
+
+ # Count Telegram subscribers for this chatbot
+ subscribers = supabase.table("channel_sessions").select("id", count="exact") \
+ .eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute()
+ recipients_count = subscribers.count or 0
+
+ campaign_data = {
+ "id": str(uuid.uuid4()),
+ "chatbot_id": data.chatbot_id,
+ "title": data.title,
+ "message": data.message,
+ "status": "draft",
+ "recipients_count": recipients_count,
+ "sent_count": 0,
+ }
+ result = supabase.table("campaigns").insert(campaign_data).execute()
+ if not result.data:
+ raise HTTPException(status_code=500, detail="Failed to create campaign")
+ return CampaignResponse(**result.data[0])
+
+
+@router.post("/{campaign_id}/send", response_model=CampaignResponse)
+async def send_campaign(campaign_id: str, user=Depends(get_current_user)):
+ """Broadcast the campaign message to all Telegram subscribers of the chatbot."""
+ supabase = get_supabase()
+ _check_campaigns_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \
+ .eq("id", campaign_id).execute()
+ if not campaign.data:
+ raise HTTPException(status_code=404, detail="Campaign not found")
+ c = campaign.data[0]
+ if c.get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+ if c["status"] == "sent":
+ raise HTTPException(status_code=400, detail="Campaign already sent")
+
+ chatbot_id = c["chatbot_id"]
+
+ # Get the Telegram bot token for this chatbot
+ conn = supabase.table("channel_connections").select("bot_token") \
+ .eq("chatbot_id", chatbot_id).eq("channel", "telegram").eq("is_active", True).execute()
+ if not conn.data or not conn.data[0].get("bot_token"):
+ raise HTTPException(status_code=400, detail="No active Telegram connection for this chatbot")
+ bot_token = conn.data[0]["bot_token"]
+
+ # Get all Telegram subscribers (channel_sessions)
+ sessions = supabase.table("channel_sessions").select("external_id") \
+ .eq("chatbot_id", chatbot_id).eq("channel", "telegram").execute()
+ subscribers = sessions.data or []
+
+ # Mark as sending
+ supabase.table("campaigns").update({"status": "sending"}).eq("id", campaign_id).execute()
+
+ # Broadcast
+ from app.services.telegram_service import send_message as tg_send
+ sent = 0
+ for sub in subscribers:
+ try:
+ # external_id format is "tg:{token_prefix}:{chat_id}"
+ parts = sub["external_id"].split(":")
+ if len(parts) >= 3:
+ chat_id = int(parts[2])
+ await tg_send(bot_token, chat_id, c["message"])
+ sent += 1
+ except Exception as e:
+ logger.warning(f"Failed to send campaign to {sub['external_id']}: {e}")
+
+ # Mark as sent
+ from datetime import datetime, timezone
+ supabase.table("campaigns").update({
+ "status": "sent",
+ "sent_count": sent,
+ "recipients_count": len(subscribers),
+ "sent_at": datetime.now(timezone.utc).isoformat(),
+ }).eq("id", campaign_id).execute()
+
+ updated = supabase.table("campaigns").select("*").eq("id", campaign_id).execute()
+ return CampaignResponse(**updated.data[0])
+
+
+@router.delete("/{campaign_id}")
+async def delete_campaign(campaign_id: str, user=Depends(get_current_user)):
+ supabase = get_supabase()
+ _check_campaigns_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \
+ .eq("id", campaign_id).execute()
+ if not campaign.data:
+ raise HTTPException(status_code=404, detail="Campaign not found")
+ if campaign.data[0].get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+ if campaign.data[0]["status"] == "sending":
+ raise HTTPException(status_code=400, detail="Cannot delete a campaign that is currently sending")
+
+ supabase.table("campaigns").delete().eq("id", campaign_id).execute()
+ return {"success": True}
diff --git a/app/routers/channels.py b/app/routers/channels.py
index 2773e00..807ee3c 100644
--- a/app/routers/channels.py
+++ b/app/routers/channels.py
@@ -1,11 +1,9 @@
-import json
-import re
import uuid
import logging
-from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
+from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel
-from typing import List, Optional
+from typing import Optional
from app.config import settings, PLAN_LIMITS
from app.database import get_supabase
@@ -14,7 +12,6 @@ from app.services.rag import rag_engine
from app.services.telegram_service import (
delete_webhook, get_bot_info, send_message as tg_send, set_webhook,
)
-from app.services.whatsapp_service import send_message as wa_send, verify_signature
from app.routers.chat import (
_get_conversation_history, _get_or_create_conversation, _save_message,
)
@@ -32,11 +29,6 @@ class TelegramConnectRequest(BaseModel):
bot_token: str
-class WhatsAppConnectRequest(BaseModel):
- chatbot_id: str
- wa_keyword: Optional[str] = None
-
-
# ── Helpers ───────────────────────────────────────────────────────────────────
def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase):
@@ -53,18 +45,6 @@ def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase):
raise HTTPException(status_code=404, detail="Chatbot not found")
-def _generate_keyword(chatbot_name: str, chatbot_id: str, supabase) -> str:
- base = re.sub(r"[^A-Z0-9]", "", chatbot_name.upper())[:10] or "BOT"
- existing = (
- supabase.table("channel_connections")
- .select("id").eq("channel", "whatsapp").eq("wa_keyword", base).execute()
- )
- if not existing.data:
- return base
- suffix = chatbot_id.replace("-", "")[:4].upper()
- return f"{base[:6]}{suffix}"
-
-
def _get_or_create_channel_session(
chatbot_id: str, channel: str, external_id: str, supabase
) -> dict:
@@ -85,32 +65,13 @@ def _get_or_create_channel_session(
return result.data[0]
-def _upsert_whatsapp_session(chatbot_id: str, phone: str, session_id: str, supabase):
- existing = (
- supabase.table("channel_sessions")
- .select("id").eq("channel", "whatsapp").eq("external_id", phone).execute()
- )
- if existing.data:
- supabase.table("channel_sessions").update(
- {"chatbot_id": chatbot_id, "session_id": session_id}
- ).eq("id", existing.data[0]["id"]).execute()
- else:
- supabase.table("channel_sessions").insert({
- "id": str(uuid.uuid4()),
- "chatbot_id": chatbot_id,
- "channel": "whatsapp",
- "external_id": phone,
- "session_id": session_id,
- }).execute()
-
-
def _check_channel_plan(user_id: str, channel: str, supabase):
"""Raise 402 if the user's plan doesn't include the requested channel."""
sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute()
plan = sub.data[0]["plan"] if sub.data else "free"
allowed = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("channels", [])
if channel not in allowed:
- label = {"telegram": "Starter", "whatsapp": "Business"}.get(channel, "a higher")
+ label = {"telegram": "Starter"}.get(channel, "a higher")
raise HTTPException(status_code=402, detail=f"{channel.title()} channel requires {label} plan or higher")
@@ -151,12 +112,6 @@ def _detect_language(text: str) -> str:
return best if scores[best] > 0.25 else "en"
-def _wa_link(keyword: str) -> str:
- if settings.whatsapp_display_number:
- return f"https://wa.me/{settings.whatsapp_display_number}?text=START+{keyword}"
- return ""
-
-
# ── CRUD endpoints ────────────────────────────────────────────────────────────
@router.get("")
@@ -165,17 +120,11 @@ async def list_channels(chatbot_id: str = Query(...), user=Depends(get_current_u
_verify_chatbot_ownership(chatbot_id, user.id, supabase)
result = (
supabase.table("channel_connections")
- .select("id,channel,bot_username,wa_keyword,is_active,created_at")
+ .select("id,channel,bot_username,is_active,created_at")
.eq("chatbot_id", chatbot_id)
.execute()
)
- rows = result.data or []
- for row in rows:
- if row["channel"] == "whatsapp" and row.get("wa_keyword"):
- row["wa_link"] = _wa_link(row["wa_keyword"])
- else:
- row["wa_link"] = None
- return rows
+ return result.data or []
@router.post("/telegram")
@@ -224,47 +173,6 @@ async def connect_telegram(data: TelegramConnectRequest, user=Depends(get_curren
}
-@router.post("/whatsapp")
-async def connect_whatsapp(data: WhatsAppConnectRequest, user=Depends(get_current_user)):
- supabase = get_supabase()
- _verify_chatbot_ownership(data.chatbot_id, user.id, supabase)
- _check_channel_plan(user.id, "whatsapp", supabase)
-
- chatbot = supabase.table("chatbots").select("name").eq("id", data.chatbot_id).execute()
- chatbot_name = chatbot.data[0]["name"] if chatbot.data else "BOT"
-
- keyword = (data.wa_keyword or _generate_keyword(chatbot_name, data.chatbot_id, supabase)).upper()
- keyword = re.sub(r"[^A-Z0-9]", "", keyword)[:12]
- if not keyword:
- raise HTTPException(status_code=400, detail="Keyword must contain letters or numbers")
-
- taken = (
- supabase.table("channel_connections")
- .select("id").eq("channel", "whatsapp").eq("wa_keyword", keyword)
- .neq("chatbot_id", data.chatbot_id).execute()
- )
- if taken.data:
- raise HTTPException(status_code=400, detail=f"Keyword '{keyword}' is already taken. Choose a different one.")
-
- existing = (
- supabase.table("channel_connections")
- .select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "whatsapp").execute()
- )
- conn_data = {
- "chatbot_id": data.chatbot_id,
- "channel": "whatsapp",
- "wa_keyword": keyword,
- "is_active": True,
- }
- if existing.data:
- supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute()
- else:
- conn_data["id"] = str(uuid.uuid4())
- supabase.table("channel_connections").insert(conn_data).execute()
-
- return {"success": True, "keyword": keyword, "wa_link": _wa_link(keyword)}
-
-
@router.delete("/{connection_id}")
async def disconnect_channel(connection_id: str, user=Depends(get_current_user)):
supabase = get_supabase()
@@ -383,136 +291,3 @@ async def telegram_webhook(bot_token: str, request: Request):
return {"ok": True}
-# ── WhatsApp webhooks ─────────────────────────────────────────────────────────
-
-@webhook_router.get("/whatsapp")
-async def whatsapp_verify(request: Request):
- """Meta webhook verification challenge."""
- params = dict(request.query_params)
- if (
- params.get("hub.mode") == "subscribe"
- and settings.whatsapp_verify_token
- and params.get("hub.verify_token") == settings.whatsapp_verify_token
- ):
- return Response(content=params["hub.challenge"], media_type="text/plain")
- raise HTTPException(status_code=403, detail="Forbidden")
-
-
-@webhook_router.post("/whatsapp")
-async def whatsapp_webhook(request: Request):
- """Receive messages from WhatsApp Cloud API."""
- raw_body = await request.body()
-
- if settings.whatsapp_app_secret:
- sig = request.headers.get("X-Hub-Signature-256", "")
- if not verify_signature(raw_body, sig, settings.whatsapp_app_secret):
- raise HTTPException(status_code=403, detail="Invalid signature")
-
- try:
- body = json.loads(raw_body)
- value = body["entry"][0]["changes"][0]["value"]
- if "messages" not in value:
- return {"ok": True}
- msg = value["messages"][0]
- if msg.get("type") != "text":
- return {"ok": True}
- from_number = msg["from"]
- text = msg["text"]["body"].strip()
- except (KeyError, IndexError, json.JSONDecodeError):
- return {"ok": True}
-
- supabase = get_supabase()
-
- async def _wa_reply(to: str, message: str):
- if settings.whatsapp_access_token and settings.whatsapp_phone_number_id:
- await wa_send(settings.whatsapp_phone_number_id, to, message, settings.whatsapp_access_token)
-
- # Handle START
- if text.upper().startswith("START "):
- keyword = re.sub(r"[^A-Z0-9]", "", text[6:].strip().upper())
- conn = (
- supabase.table("channel_connections")
- .select("*").eq("channel", "whatsapp").eq("wa_keyword", keyword).eq("is_active", True).execute()
- )
- if not conn.data:
- await _wa_reply(from_number, f"Sorry, chatbot '{keyword}' not found. Use the link from the business you're contacting.")
- return {"ok": True}
-
- chatbot_id = conn.data[0]["chatbot_id"]
- chatbot_result = supabase.table("chatbots").select("name,welcome_message").eq("id", chatbot_id).execute()
- chatbot = chatbot_result.data[0] if chatbot_result.data else {}
-
- _upsert_whatsapp_session(chatbot_id, from_number, str(uuid.uuid4()), supabase)
- welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?"
- await _wa_reply(from_number, welcome)
- return {"ok": True}
-
- # Regular message — find active session
- session_result = (
- supabase.table("channel_sessions")
- .select("*").eq("channel", "whatsapp").eq("external_id", from_number).execute()
- )
- if not session_result.data:
- await _wa_reply(from_number, "To start chatting, use the WhatsApp link from the business you're trying to contact.")
- return {"ok": True}
-
- session = session_result.data[0]
- chatbot_id = session["chatbot_id"]
-
- # Check subscription still allows WhatsApp
- if not _check_chatbot_channel_subscription(chatbot_id, "whatsapp", supabase):
- await _wa_reply(from_number, "This service is currently unavailable. Please contact the business directly.")
- return {"ok": True}
-
- chatbot_result = (
- supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute()
- )
- if not chatbot_result.data:
- return {"ok": True}
- chatbot = chatbot_result.data[0]
-
- collection_name = chatbot.get("qdrant_collection_name")
- if not collection_name:
- await _wa_reply(from_number, "This chatbot isn't ready yet. Please try again later.")
- return {"ok": True}
-
- detected_lang = _detect_language(text)
- company_data = chatbot.get("companies", {}) or {}
- conversation = _get_or_create_conversation(
- chatbot_id=chatbot_id,
- session_id=session["session_id"],
- user_id=None,
- language=detected_lang,
- supabase=supabase,
- )
- history = _get_conversation_history(conversation["id"], supabase)
- chatbot_config = {**chatbot, "company_name": company_data.get("name", "")}
-
- try:
- result = await rag_engine.process_query(
- query=text,
- collection_name=collection_name,
- chatbot_config=chatbot_config,
- conversation_history=history,
- language=detected_lang,
- )
- except Exception as e:
- logger.error(f"WhatsApp RAG error for chatbot {chatbot_id}: {e}")
- await _wa_reply(from_number, "Sorry, I encountered an error. Please try again.")
- return {"ok": True}
-
- confidence_score = max((s.score for s in result.get("sources", [])), default=0.0)
- _save_message(conversation["id"], "user", text, supabase)
- _save_message(
- conversation["id"], "assistant", result["response"], supabase,
- sources=[s.model_dump() for s in result.get("sources", [])],
- model=result.get("model", ""),
- confidence_score=confidence_score,
- )
- supabase.table("conversations").update(
- {"message_count": len(history) + 2}
- ).eq("id", conversation["id"]).execute()
- supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute()
-
- await _wa_reply(from_number, result["response"])
- return {"ok": True}
diff --git a/app/routers/chat.py b/app/routers/chat.py
index 3a7540f..1b448b0 100644
--- a/app/routers/chat.py
+++ b/app/routers/chat.py
@@ -124,6 +124,19 @@ async def chat(
# Get conversation history
history = _get_conversation_history(conversation["id"], supabase)
+ # If an agent has taken over this conversation, stop the bot from responding
+ conv_status = conversation.get("status", "open")
+ if conv_status == "agent_handling":
+ return ChatResponse(
+ response="",
+ session_id=session_id,
+ sources=[],
+ model_used="",
+ tokens_used=0,
+ needs_lead_capture=False,
+ handoff=False,
+ )
+
# Get company info for context
company_data = chatbot.get("companies", {}) or {}
chatbot_config = {
@@ -131,6 +144,19 @@ async def chat(
"company_name": company_data.get("name", ""),
}
+ # If booking is enabled, inject a note into the system prompt so the bot
+ # can guide users to the booking page
+ if chatbot.get("booking_enabled"):
+ from app.config import settings as _cfg
+ booking_url = f"{_cfg.app_url}/book/{chatbot_id}"
+ booking_note = (
+ f"\n\nAppointment booking: This business accepts appointments online. "
+ f"If the user wants to book an appointment, meeting, or consultation, "
+ f"provide them this booking link: {booking_url}"
+ )
+ existing_prompt = chatbot_config.get("system_prompt") or ""
+ chatbot_config["system_prompt"] = existing_prompt + booking_note
+
# Run RAG
result = await rag_engine.process_query(
query=message.message,
diff --git a/app/routers/chatbots.py b/app/routers/chatbots.py
index 77c2711..cb0d52b 100644
--- a/app/routers/chatbots.py
+++ b/app/routers/chatbots.py
@@ -5,6 +5,7 @@ from app.models import (
from app.database import get_supabase
from app.dependencies import get_current_user, get_user_subscription
from app.services.vector_store import vector_store
+from app.services.storage import delete_from_storage
from app.config import PLAN_LIMITS
from typing import List
import uuid
@@ -83,8 +84,20 @@ async def create_chatbot(data: ChatbotCreate, user=Depends(get_current_user)):
"handoff_keywords": data.handoff_keywords,
}
- result = supabase.table("chatbots").insert(chatbot_data).execute()
- if not result.data:
+ try:
+ result = supabase.table("chatbots").insert(chatbot_data).execute()
+ if not result.data:
+ raise HTTPException(status_code=500, detail="Failed to create chatbot")
+ except HTTPException:
+ raise
+ except Exception as e:
+ # Cleanup orphaned Qdrant collection if DB insert failed
+ if collection_name:
+ try:
+ vector_store.delete_collection(collection_name)
+ logger.warning(f"Cleaned up orphaned Qdrant collection {collection_name} after DB failure")
+ except Exception:
+ pass
raise HTTPException(status_code=500, detail="Failed to create chatbot")
return _format_chatbot(result.data[0], supabase)
@@ -139,7 +152,11 @@ async def delete_chatbot(chatbot_id: str, user=Depends(get_current_user)):
try:
vector_store.delete_collection(chatbot["qdrant_collection_name"])
except Exception as e:
- logger.warning(f"Failed to delete collection: {e}")
+ logger.warning(f"Failed to delete Qdrant collection: {e}")
+
+ # Delete logo from Supabase Storage
+ if chatbot.get("logo_url"):
+ delete_from_storage(supabase, "logos", chatbot["logo_url"])
supabase.table("chatbots").delete().eq("id", chatbot_id).execute()
return SuccessResponse(success=True, message="Chatbot deleted")
@@ -303,4 +320,5 @@ def _format_chatbot(chatbot: dict, supabase) -> ChatbotResponse:
handoff_message=chatbot.get("handoff_message", "I'll connect you with our team. Please wait."),
handoff_email=chatbot.get("handoff_email"),
handoff_keywords=chatbot.get("handoff_keywords") or ["human", "agent", "speak to someone", "talk to a person", "real person"],
+ booking_enabled=bool(chatbot.get("booking_enabled")),
)
\ No newline at end of file
diff --git a/app/routers/documents.py b/app/routers/documents.py
index 8749f41..1cad3e8 100644
--- a/app/routers/documents.py
+++ b/app/routers/documents.py
@@ -5,6 +5,7 @@ from app.dependencies import get_current_user
from app.services.document_processor import process_document
from app.services.embeddings import embedding_service
from app.services.vector_store import vector_store
+from app.services.storage import delete_from_storage, extract_storage_path
from app.config import settings
from typing import List
import uuid
@@ -205,10 +206,72 @@ async def delete_document(chatbot_id: str, document_id: str, user=Depends(get_cu
except Exception as e:
logger.warning(f"Failed to delete vectors: {e}")
+ # Delete file from Supabase Storage
+ if doc.data[0].get("file_url"):
+ delete_from_storage(supabase, "documents", doc.data[0]["file_url"])
+
supabase.table("documents").delete().eq("id", document_id).execute()
return SuccessResponse(success=True, message="Document deleted")
+@router.post("/{document_id}/retry", response_model=DocumentResponse)
+async def retry_document_processing(
+ chatbot_id: str,
+ document_id: str,
+ background_tasks: BackgroundTasks,
+ user=Depends(get_current_user),
+):
+ """Retry processing a failed document."""
+ supabase = get_supabase()
+ chatbot = _get_user_chatbot(chatbot_id, user.id, supabase)
+
+ doc = supabase.table("documents").select("*").eq("id", document_id).eq("chatbot_id", chatbot_id).execute()
+ if not doc.data:
+ raise HTTPException(status_code=404, detail="Document not found")
+
+ document = doc.data[0]
+ if document.get("status") != "failed":
+ raise HTTPException(status_code=400, detail="Only failed documents can be retried")
+
+ file_url = document.get("file_url")
+ if not file_url:
+ raise HTTPException(
+ status_code=400,
+ detail="No file URL stored. Please re-upload this document."
+ )
+
+ # Download file from storage
+ try:
+ path = extract_storage_path(file_url, "documents")
+ if not path:
+ raise HTTPException(status_code=400, detail="Cannot locate file in storage. Please re-upload.")
+ file_bytes = supabase.storage.from_("documents").download(path)
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Failed to download document {document_id} for retry: {e}")
+ raise HTTPException(status_code=400, detail="Cannot retrieve file from storage. Please re-upload.")
+
+ # Reset status to processing
+ result = supabase.table("documents").update({
+ "status": "processing",
+ "error_message": None,
+ "chunk_count": 0,
+ }).eq("id", document_id).execute()
+
+ # Re-enqueue background processing
+ background_tasks.add_task(
+ _process_document_bg,
+ file_bytes=file_bytes,
+ file_name=document["file_name"],
+ doc_id=document_id,
+ chatbot=chatbot,
+ supabase=supabase,
+ )
+
+ return DocumentResponse(**result.data[0])
+
+
# ── URL Sources ───────────────────────────────────────────────────────────────
@url_router.get("", response_model=List[UrlSourceResponse])
diff --git a/app/routers/inbox.py b/app/routers/inbox.py
index e75c50e..1b57ef9 100644
--- a/app/routers/inbox.py
+++ b/app/routers/inbox.py
@@ -2,8 +2,9 @@ from fastapi import APIRouter, HTTPException, Depends, Query
from app.database import get_supabase
from app.dependencies import get_current_user
from app.config import PLAN_LIMITS
-from app.models import InboxConversation, InboxMessage
+from app.models import InboxConversation, InboxMessage, ConversationStatusUpdate, AgentReplyCreate
from typing import List, Optional
+import uuid
import logging
logger = logging.getLogger(__name__)
@@ -79,6 +80,8 @@ async def list_inbox_conversations(
language=conv.get("language", "en"),
message_count=conv.get("message_count", 0),
first_message=first_message_text,
+ status=conv.get("status", "open"),
+ last_agent_reply_at=conv.get("last_agent_reply_at"),
created_at=conv.get("created_at"),
))
@@ -137,6 +140,67 @@ async def get_inbox_conversation(
}
+@router.patch("/conversations/{conversation_id}/status")
+async def update_conversation_status(
+ conversation_id: str,
+ data: ConversationStatusUpdate,
+ user=Depends(get_current_user),
+):
+ """Update conversation status (open, agent_handling, resolved)."""
+ if data.status not in ("open", "agent_handling", "resolved"):
+ raise HTTPException(status_code=400, detail="Invalid status")
+
+ supabase = get_supabase()
+ _check_inbox_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ conv = supabase.table("conversations").select("*, chatbots(company_id)") \
+ .eq("id", conversation_id).execute()
+ if not conv.data:
+ raise HTTPException(status_code=404, detail="Conversation not found")
+ if conv.data[0].get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+
+ supabase.table("conversations").update({"status": data.status}).eq("id", conversation_id).execute()
+ return {"success": True, "status": data.status}
+
+
+@router.post("/conversations/{conversation_id}/reply")
+async def agent_reply(
+ conversation_id: str,
+ data: AgentReplyCreate,
+ user=Depends(get_current_user),
+):
+ """Send an agent reply to a conversation."""
+ supabase = get_supabase()
+ _check_inbox_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ conv = supabase.table("conversations").select("*, chatbots(company_id)") \
+ .eq("id", conversation_id).execute()
+ if not conv.data:
+ raise HTTPException(status_code=404, detail="Conversation not found")
+ if conv.data[0].get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+
+ msg_id = str(uuid.uuid4())
+ supabase.table("messages").insert({
+ "id": msg_id,
+ "conversation_id": conversation_id,
+ "role": "agent",
+ "content": data.message,
+ }).execute()
+
+ # Mark as agent_handling if not already, and record reply time
+ current_status = conv.data[0].get("status", "open")
+ update_data: dict = {"last_agent_reply_at": "now()"}
+ if current_status == "open":
+ update_data["status"] = "agent_handling"
+ supabase.table("conversations").update(update_data).eq("id", conversation_id).execute()
+
+ return {"success": True, "message_id": msg_id}
+
+
@router.delete("/conversations/{conversation_id}")
async def delete_inbox_conversation(
conversation_id: str,
diff --git a/app/routers/leads.py b/app/routers/leads.py
index 625dd9e..e1bd613 100644
--- a/app/routers/leads.py
+++ b/app/routers/leads.py
@@ -2,7 +2,7 @@ from fastapi import APIRouter, HTTPException, Depends, Query
from fastapi.responses import StreamingResponse
from app.database import get_supabase
from app.dependencies import get_current_user
-from app.models import LeadCreate, LeadResponse
+from app.models import LeadCreate, LeadResponse, LeadUpdate
from typing import List, Optional
import uuid
import csv
@@ -60,6 +60,40 @@ async def list_leads(
return [LeadResponse(**lead) for lead in (result.data or [])]
+@router.patch("/{lead_id}", response_model=LeadResponse)
+async def update_lead(
+ lead_id: str,
+ data: LeadUpdate,
+ user=Depends(get_current_user),
+):
+ """Update lead status or notes."""
+ supabase = get_supabase()
+ _check_leads_access(user.id, supabase)
+ company_id = _get_user_company_id(user.id, supabase)
+
+ lead = supabase.table("leads").select("*, chatbots(company_id)") \
+ .eq("id", lead_id).execute()
+ if not lead.data:
+ raise HTTPException(status_code=404, detail="Lead not found")
+ if lead.data[0].get("chatbots", {}).get("company_id") != company_id:
+ raise HTTPException(status_code=403, detail="Not authorized")
+
+ update_fields: dict = {}
+ if data.status is not None:
+ valid_statuses = ("new", "contacted", "qualified", "closed", "lost")
+ if data.status not in valid_statuses:
+ raise HTTPException(status_code=400, detail=f"Status must be one of {valid_statuses}")
+ update_fields["status"] = data.status
+ if data.notes is not None:
+ update_fields["notes"] = data.notes
+
+ if not update_fields:
+ return LeadResponse(**lead.data[0])
+
+ result = supabase.table("leads").update(update_fields).eq("id", lead_id).execute()
+ return LeadResponse(**result.data[0])
+
+
@router.get("/export")
async def export_leads_csv(
chatbot_id: Optional[str] = Query(None),
diff --git a/app/routers/models.py b/app/routers/models.py
index 9a26158..bf38544 100644
--- a/app/routers/models.py
+++ b/app/routers/models.py
@@ -96,7 +96,7 @@ async def get_available_models(user=Depends(get_current_user)):
if plan == "free":
upgrade_label = "Upgrade to Starter for more AI models and messaging channels"
elif plan == "starter":
- upgrade_label = "Upgrade to Business for GPT-4o, Claude, Gemini and WhatsApp"
+ upgrade_label = "Upgrade to Business for GPT-4o, Claude, and Gemini"
return ModelsResponse(
models=models,
diff --git a/app/services/rag.py b/app/services/rag.py
index 8fe5b2a..3c05817 100644
--- a/app/services/rag.py
+++ b/app/services/rag.py
@@ -7,7 +7,7 @@ import logging
logger = logging.getLogger(__name__)
-RAG_SYSTEM_PROMPT = """You are a helpful AI assistant for {company_name}.
+RAG_SYSTEM_PROMPT = """You are a helpful AI assistant for {company_name}.
Your role is to answer questions based on the provided context from company documents.
IMPORTANT RULES:
@@ -16,13 +16,20 @@ IMPORTANT RULES:
3. Be concise and helpful
4. Always maintain a professional, friendly tone
5. If asked about topics completely outside the context, politely redirect to relevant topics
-
+{language_instruction}
{custom_instructions}
Context from knowledge base:
{context}
"""
+LANGUAGE_NAMES = {
+ "en": "English", "fr": "French", "es": "Spanish", "de": "German",
+ "it": "Italian", "pt": "Portuguese", "ar": "Arabic", "zh": "Chinese",
+ "ja": "Japanese", "ko": "Korean", "ru": "Russian", "nl": "Dutch",
+ "tr": "Turkish", "pl": "Polish", "vi": "Vietnamese", "th": "Thai",
+}
+
class RAGEngine:
def __init__(self):
@@ -102,8 +109,15 @@ class RAGEngine:
logger.warning(f"[RAG] No context found for query: '{query}' in collection '{collection_name}'")
# Step 4: Build messages
+ lang_name = LANGUAGE_NAMES.get(language, "English") if language and language != "en" else ""
+ language_instruction = (
+ f"\n6. Respond in {lang_name}. Match the language of the user's message."
+ if lang_name else ""
+ )
+
system_prompt = RAG_SYSTEM_PROMPT.format(
company_name=chatbot_config.get("company_name", ""),
+ language_instruction=language_instruction,
custom_instructions=chatbot_config.get("system_prompt") or "",
context=context,
)
diff --git a/app/services/storage.py b/app/services/storage.py
new file mode 100644
index 0000000..899c137
--- /dev/null
+++ b/app/services/storage.py
@@ -0,0 +1,46 @@
+"""
+Supabase Storage helper utilities.
+Used to delete files from storage buckets when deleting documents or chatbots.
+"""
+import logging
+from typing import Optional
+from urllib.parse import urlparse
+
+logger = logging.getLogger(__name__)
+
+
+def extract_storage_path(url: str, bucket: str) -> Optional[str]:
+ """
+ Extract the file path from a Supabase Storage public URL.
+
+ URL format: {supabase_url}/storage/v1/object/public/{bucket}/{path}
+ Returns the path portion after the bucket name, or None if not parseable.
+ """
+ if not url:
+ return None
+ try:
+ parsed = urlparse(url)
+ prefix = f"/storage/v1/object/public/{bucket}/"
+ if prefix in parsed.path:
+ idx = parsed.path.index(prefix) + len(prefix)
+ return parsed.path[idx:]
+ except Exception as e:
+ logger.warning(f"Failed to extract storage path from URL '{url}': {e}")
+ return None
+
+
+def delete_from_storage(supabase, bucket: str, url: str) -> bool:
+ """
+ Delete a file from a Supabase Storage bucket given its public URL.
+ Returns True on success, False if the URL couldn't be parsed or deletion failed.
+ """
+ path = extract_storage_path(url, bucket)
+ if not path:
+ return False
+ try:
+ supabase.storage.from_(bucket).remove([path])
+ logger.info(f"Deleted storage file: {bucket}/{path}")
+ return True
+ except Exception as e:
+ logger.warning(f"Failed to delete storage file {bucket}/{path}: {e}")
+ return False
diff --git a/app/services/whatsapp_service.py b/app/services/whatsapp_service.py
deleted file mode 100644
index 708a850..0000000
--- a/app/services/whatsapp_service.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import httpx
-import hashlib
-import hmac
-import logging
-
-logger = logging.getLogger(__name__)
-
-_META_API = "https://graph.facebook.com/v19.0"
-
-
-async def send_message(phone_number_id: str, to: str, text: str, access_token: str) -> bool:
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- r = await client.post(
- f"{_META_API}/{phone_number_id}/messages",
- headers={"Authorization": f"Bearer {access_token}"},
- json={
- "messaging_product": "whatsapp",
- "to": to,
- "type": "text",
- "text": {"body": text},
- },
- )
- return r.status_code == 200
- except Exception as e:
- logger.error(f"WhatsApp send error: {e}")
- return False
-
-
-def verify_signature(payload: bytes, signature: str, app_secret: str) -> bool:
- expected = "sha256=" + hmac.new(
- app_secret.encode("utf-8"),
- payload,
- hashlib.sha256,
- ).hexdigest()
- return hmac.compare_digest(expected, signature)
diff --git a/app/services/widget.py b/app/services/widget.py
index 9cb9906..9942611 100644
--- a/app/services/widget.py
+++ b/app/services/widget.py
@@ -1,140 +1,182 @@
-def generate_widget_js(app_url: str) -> str:
- """Generate the embeddable widget JavaScript with app_url baked in."""
- return f"""
-(function() {{
- var APP_URL = "{app_url}";
-
- // Find script tag to get chatbot ID
- var scripts = document.querySelectorAll('script[data-chatbot]');
- var chatbotId = null;
- if (scripts.length > 0) {{
- chatbotId = scripts[scripts.length - 1].getAttribute('data-chatbot');
- }}
- if (!chatbotId) {{
- console.warn('[Contexta] No data-chatbot attribute found on script tag');
- return;
- }}
-
- // Styles
- var style = document.createElement('style');
- style.textContent = `
- .contexta-btn {{
- position: fixed;
- bottom: 24px;
- right: 24px;
- width: 56px;
- height: 56px;
- border-radius: 50%;
- background: #6366f1;
- border: none;
- cursor: pointer;
- box-shadow: 0 4px 12px rgba(0,0,0,0.2);
- display: flex;
- align-items: center;
- justify-content: center;
- z-index: 999998;
- transition: transform 0.2s, box-shadow 0.2s;
- }}
- .contexta-btn:hover {{
- transform: scale(1.08);
- box-shadow: 0 6px 20px rgba(0,0,0,0.25);
- }}
- .contexta-btn svg {{
- width: 26px;
- height: 26px;
- fill: white;
- }}
- .contexta-container {{
- position: fixed;
- bottom: 92px;
- right: 24px;
- width: 380px;
- height: 580px;
- border-radius: 16px;
- box-shadow: 0 8px 32px rgba(0,0,0,0.15);
- z-index: 999999;
- overflow: hidden;
- display: none;
- flex-direction: column;
- border: 1px solid #e5e7eb;
- }}
- .contexta-container.open {{
- display: flex;
- }}
- .contexta-iframe {{
- width: 100%;
- height: 100%;
- border: none;
- flex: 1;
- }}
- .contexta-close {{
- position: absolute;
- top: 8px;
- right: 8px;
- background: rgba(0,0,0,0.3);
- border: none;
- color: white;
- border-radius: 50%;
- width: 24px;
- height: 24px;
- cursor: pointer;
- font-size: 14px;
- display: flex;
- align-items: center;
- justify-content: center;
- z-index: 1;
- }}
- @media (max-width: 480px) {{
- .contexta-container {{
- bottom: 0;
- right: 0;
- width: 100vw;
- height: 100vh;
- border-radius: 0;
- }}
- }}
- `;
- document.head.appendChild(style);
-
- // Button
- var btn = document.createElement('button');
- btn.className = 'contexta-btn';
- btn.setAttribute('aria-label', 'Open chat');
- btn.innerHTML = '';
- document.body.appendChild(btn);
-
- // Container with iframe
- var container = document.createElement('div');
- container.className = 'contexta-container';
-
- var closeBtn = document.createElement('button');
- closeBtn.className = 'contexta-close';
- closeBtn.innerHTML = '×';
- closeBtn.setAttribute('aria-label', 'Close chat');
- container.appendChild(closeBtn);
-
- var iframe = document.createElement('iframe');
- iframe.className = 'contexta-iframe';
- iframe.src = APP_URL + '/chat/' + chatbotId;
- iframe.setAttribute('allow', 'microphone');
- container.appendChild(iframe);
-
- document.body.appendChild(container);
-
- // Toggle logic
- var isOpen = false;
- function toggle() {{
- isOpen = !isOpen;
- if (isOpen) {{
- container.classList.add('open');
- btn.innerHTML = '';
- }} else {{
- container.classList.remove('open');
- btn.innerHTML = '';
- }}
- }}
-
- btn.addEventListener('click', toggle);
- closeBtn.addEventListener('click', toggle);
-}})();
"""
+Widget JS generator.
+
+Produces a self-contained, framework-agnostic JavaScript bundle served at
+GET /widget.js. Embed on any page with:
+
+
+
+Works on vanilla HTML, WordPress, Webflow, Shopify, Next.js (_document),
+and any framework where you control the HTML shell.
+
+For React/Vue projects that want a native component, host-side devs can call
+ window.Contexta.open() / .close() / .toggle()
+from their own button, or await the dedicated npm package (@contexta/widget).
+
+Design decisions
+----------------
+- All CSS is ID-scoped (#ctxa-*) to avoid colliding with host-page styles.
+- The iframe src is set lazily on first open — zero network cost until use.
+- document.currentScript is captured synchronously (before any async code)
+ so it works even when the host page has many script tags.
+- z-index 2147483647 is the highest a browser will honour.
+- sandbox attribute restricts the iframe while still allowing forms/popups.
+"""
+
+_TEMPLATE = r"""(function () {
+ 'use strict';
+
+ /* ── Double-init guard ─────────────────────────────────────────────── */
+ if (window.__ctxa) return;
+
+ /* ── Read chatbot ID from world",
+ )
+ assert "