From 9dccc832937a33a5c8defc88876313d255755d4c Mon Sep 17 00:00:00 2001 From: belviskhoremk Date: Fri, 6 Mar 2026 22:37:40 +0000 Subject: [PATCH] updates Mar6 --- app/config.py | 148 +++++---- app/dependencies.py | 2 +- app/main.py | 22 +- app/models.py | 113 ++++++- app/routers/analytics.py | 111 ++++++- app/routers/auth.py | 105 +++++++ app/routers/billing.py | 22 +- app/routers/channels.py | 518 +++++++++++++++++++++++++++++++ app/routers/chat.py | 142 ++++++++- app/routers/chatbots.py | 69 +++- app/routers/documents.py | 168 +++++++++- app/routers/inbox.py | 162 ++++++++++ app/routers/leads.py | 150 +++++++++ app/routers/models.py | 6 +- app/routers/upload.py | 56 ++++ app/services/n8n_service.py | 62 ++++ app/services/telegram_service.py | 57 ++++ app/services/web_scraper.py | 65 ++++ app/services/whatsapp_service.py | 36 +++ app/services/widget.py | 140 +++++++++ pyproject.toml | 10 + supabase_migration.sql | 119 +++++++ supabase_migration_channels.sql | 48 +++ 23 files changed, 2257 insertions(+), 74 deletions(-) create mode 100644 app/routers/channels.py create mode 100644 app/routers/inbox.py create mode 100644 app/routers/leads.py create mode 100644 app/routers/upload.py create mode 100644 app/services/n8n_service.py create mode 100644 app/services/telegram_service.py create mode 100644 app/services/web_scraper.py create mode 100644 app/services/whatsapp_service.py create mode 100644 app/services/widget.py create mode 100644 supabase_migration.sql create mode 100644 supabase_migration_channels.sql diff --git a/app/config.py b/app/config.py index a68d474..ebdf32b 100644 --- a/app/config.py +++ b/app/config.py @@ -31,7 +31,8 @@ class Settings(BaseSettings): stripe_secret_key: str = "" stripe_webhook_secret: str = "" stripe_starter_price_id: str = "" - stripe_pro_price_id: str = "" + stripe_business_price_id: str = "" + stripe_agency_price_id: str = "" # Redis redis_url: str = "redis://localhost:6379" @@ -42,6 +43,25 @@ class Settings(BaseSettings): # Files max_file_size_mb: int = 50 + # App URL (for widget embedding) + app_url: str = "http://localhost:5173" + + # Backend API URL (used for Telegram webhook registration) + api_url: str = "http://localhost:8000" + + # n8n Handoff webhook + n8n_handoff_webhook_url: Optional[str] = None + + # Supabase Storage + supabase_storage_url: str = "" + + # WhatsApp Cloud API + whatsapp_access_token: str = "" + whatsapp_phone_number_id: str = "" + whatsapp_verify_token: str = "contexta_whatsapp_verify" + whatsapp_app_secret: str = "" + whatsapp_display_number: str = "" # E.164 without '+', e.g. "15551234567" + @property def allowed_origins_list(self) -> List[str]: return [o.strip() for o in self.allowed_origins.split(",")] @@ -161,13 +181,14 @@ MODEL_PROVIDERS = { DEFAULT_MODELS = { "free": "accounts/fireworks/models/llama-v3p3-70b-instruct", "starter": "accounts/fireworks/models/qwen3-235b-a22b", - "pro": "gpt-4o", + "business": "gpt-4o", + "agency": "gpt-4o", "enterprise": "gpt-4o", } # ═══════════════════════════════════════════════════════════════════════════════ -# PLAN LIMITS — Pricing: Starter $3/mo, Pro $20/mo +# PLAN LIMITS — Pricing: Starter $12/mo, Business $29/mo, Agency $79/mo # ═══════════════════════════════════════════════════════════════════════════════ # # Cost analysis (per 1M tokens approx): @@ -185,87 +206,102 @@ DEFAULT_MODELS = { # Avg conversation: ~2K tokens input + 1K output = ~3K tokens # Fireworks models: ~$0.001-$0.004 per conversation # GPT-4o: ~$0.015 per conversation -# GPT-4o Mini: ~$0.001 per conversation -# Claude Haiku: ~$0.006 per conversation -# Gemini Flash: ~$0.001 per conversation -# Gemini Pro: ~$0.013 per conversation # -# Starter at $3/mo with 500 convos: max cost ~$2/mo (fireworks) → margin OK -# Pro at $20/mo with 2,000 convos: max cost ~$12/mo (if all GPT-4o) → margin OK -# Typical mix: ~$5-8/mo actual cost → healthy margin +# Starter $12/mo, 1500 convos: max cost ~$6/mo (fireworks mix) → margin OK +# Business $29/mo, 5000 convos: max cost ~$15/mo (mixed models) → margin OK +# Agency $79/mo, 20000 convos: max cost ~$30/mo (fireworks) → healthy margin # ═══════════════════════════════════════════════════════════════════════════════ +_ALL_FIREWORKS = [ + "accounts/fireworks/models/llama-v3p3-70b-instruct", + "accounts/fireworks/models/qwen3-235b-a22b", + "accounts/fireworks/models/deepseek-v3p1", + "accounts/fireworks/models/kimi-k2-instruct-0905", +] +_ALL_PREMIUM = [ + "gpt-4o", "gpt-4o-mini", + "claude-haiku-4-5-20251001", + "gemini-2.5-flash", "gemini-2.5-lite", "gemini-2.5-pro", +] + PLAN_LIMITS = { + # ── Free ───────────────────────────────────────────────────────────────── + # Build, test, and go live with one chatbot — no card needed. "free": { - "max_chatbots": 999999, # unlimited creation - "max_published": 0, # cannot publish + "max_chatbots": 999999, + "max_published": 1, # can publish 1 chatbot "max_documents_per_chatbot": 3, "max_document_size_mb": 5, - "models": [ - "accounts/fireworks/models/llama-v3p3-70b-instruct", - ], - "conversations_limit": 50, # 50 preview conversations/month + "models": ["accounts/fireworks/models/llama-v3p3-70b-instruct"], + "conversations_limit": 100, # 100 real conversations/month "code_export": False, "analytics": False, - "features": ["preview_mode", "testing"], + "channels": [], # no messaging channels + "url_sources": 0, + "leads_per_month": 0, + "show_branding": True, # cannot remove badge }, + # ── Starter $12/mo ─────────────────────────────────────────────────────── + # For individuals and solo businesses going live. "starter": { "max_chatbots": 999999, "max_published": 1, "max_documents_per_chatbot": 10, "max_document_size_mb": 10, - "models": [ - "accounts/fireworks/models/llama-v3p3-70b-instruct", - "accounts/fireworks/models/qwen3-235b-a22b", - "accounts/fireworks/models/deepseek-v3p1", - "accounts/fireworks/models/kimi-k2-instruct-0905", - ], - "conversations_limit": 500, # 500 conversations/month + "models": _ALL_FIREWORKS, + "conversations_limit": 1500, "code_export": False, "analytics": True, - "features": ["marketplace", "analytics", "branding"], + "channels": ["telegram"], + "url_sources": 5, + "leads_per_month": 500, + "show_branding": True, # badge stays }, - "pro": { - "max_chatbots": 5, - "max_published": 5, + # ── Business $29/mo ────────────────────────────────────────────────────── + # For growing businesses that need more chatbots and WhatsApp reach. + "business": { + "max_chatbots": 999999, + "max_published": 3, "max_documents_per_chatbot": 50, "max_document_size_mb": 50, - "models": [ - # Fireworks (included) - "accounts/fireworks/models/llama-v3p3-70b-instruct", - "accounts/fireworks/models/qwen3-235b-a22b", - "accounts/fireworks/models/deepseek-v3p1", - "accounts/fireworks/models/kimi-k2-instruct-0905", - # OpenAI - "gpt-4o", - "gpt-4o-mini", - # Anthropic - "claude-haiku-4-5-20251001", - # Google - "gemini-2.5-flash", - "gemini-2.5-lite", - "gemini-2.5-pro", - ], - "conversations_limit": 2000, # 2,000 conversations/month - "code_export": True, + "models": _ALL_FIREWORKS + _ALL_PREMIUM, + "conversations_limit": 5000, + "code_export": False, "analytics": True, - "features": [ - "marketplace", - "code_export", - "advanced_analytics", - "priority_support", - "custom_domain", - ], + "channels": ["telegram", "whatsapp"], + "url_sources": 999999, + "leads_per_month": 999999, + "show_branding": False, # can remove badge }, - "enterprise": { + # ── Agency $79/mo ──────────────────────────────────────────────────────── + # For agencies and large businesses managing many chatbots. + "agency": { "max_chatbots": 999999, "max_published": 999999, "max_documents_per_chatbot": 999999, "max_document_size_mb": 200, - "models": ["*"], # resolves to all MODEL_CATALOG keys + "models": _ALL_FIREWORKS + _ALL_PREMIUM, + "conversations_limit": 20000, + "code_export": True, + "analytics": True, + "channels": ["telegram", "whatsapp"], + "url_sources": 999999, + "leads_per_month": 999999, + "show_branding": False, + }, + # ── Enterprise (custom) ─────────────────────────────────────────────────── + "enterprise": { + "max_chatbots": 999999, + "max_published": 999999, + "max_documents_per_chatbot": 999999, + "max_document_size_mb": 999999, + "models": ["*"], "conversations_limit": 999999, "code_export": True, "analytics": True, - "features": ["*"], + "channels": ["telegram", "whatsapp"], + "url_sources": 999999, + "leads_per_month": 999999, + "show_branding": False, }, } \ No newline at end of file diff --git a/app/dependencies.py b/app/dependencies.py index ee5a058..a28ce3e 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -70,7 +70,7 @@ async def get_user_subscription(user=Depends(get_current_user)): async def require_plan(min_plan: str, user=Depends(get_current_user)): """Require a minimum plan level""" - plan_order = ["free", "starter", "pro", "enterprise"] + plan_order = ["free", "starter", "business", "agency", "enterprise"] subscription = await get_user_subscription(user) user_plan = subscription.get("plan", "free") diff --git a/app/main.py b/app/main.py index 6c8708f..7f1876e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,11 +1,14 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, Response import logging from app.config import settings -from app.routers import auth, chatbots, documents, chat, marketplace, billing, models, analytics +from app.routers import auth, chatbots, documents, chat, marketplace, billing, models, analytics, inbox, leads, upload +from app.routers.documents import router_url_sources +from app.routers.leads import leads_public_router +from app.routers.channels import router as channels_router, webhook_router as channels_webhook_router # Configure logging logging.basicConfig( @@ -52,6 +55,21 @@ app.include_router(marketplace.router, prefix="/api/v1") app.include_router(billing.router, prefix="/api/v1") app.include_router(models.router, prefix="/api/v1") app.include_router(analytics.router, prefix="/api/v1") +app.include_router(inbox.router, prefix="/api/v1") +app.include_router(leads.router, prefix="/api/v1") +app.include_router(upload.router, prefix="/api/v1") +app.include_router(router_url_sources, prefix="/api/v1") +app.include_router(leads_public_router, prefix="/api/v1") +app.include_router(channels_router, prefix="/api/v1") +app.include_router(channels_webhook_router, prefix="/api/v1") + + +# ── Widget ───────────────────────────────────────────────────────────────────── +@app.get("/widget.js") +async def serve_widget(): + from app.services.widget import generate_widget_js + return Response(generate_widget_js(settings.app_url), media_type="application/javascript") + # ── Health & Info ────────────────────────────────────────────────────────────── @app.get("/") diff --git a/app/models.py b/app/models.py index ef7ee5b..03647da 100644 --- a/app/models.py +++ b/app/models.py @@ -10,7 +10,8 @@ import uuid class PlanType(str, Enum): free = "free" starter = "starter" - pro = "pro" + business = "business" + agency = "agency" enterprise = "enterprise" @@ -107,6 +108,14 @@ class ChatbotCreate(BaseModel): category: Optional[str] = None industry: Optional[str] = None languages: List[str] = ["en"] + show_branding: bool = True + lead_capture_enabled: bool = False + lead_capture_fields: List[str] = ["email"] + lead_capture_trigger: str = "after_first_message" + handoff_enabled: bool = False + handoff_message: str = "I'll connect you with our team. Please wait." + handoff_email: Optional[str] = None + handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"] class ChatbotUpdate(BaseModel): @@ -122,6 +131,14 @@ class ChatbotUpdate(BaseModel): category: Optional[str] = None industry: Optional[str] = None languages: Optional[List[str]] = None + show_branding: Optional[bool] = None + lead_capture_enabled: Optional[bool] = None + lead_capture_fields: Optional[List[str]] = None + lead_capture_trigger: Optional[str] = None + handoff_enabled: Optional[bool] = None + handoff_message: Optional[str] = None + handoff_email: Optional[str] = None + handoff_keywords: Optional[List[str]] = None class ChatbotResponse(BaseModel): @@ -147,6 +164,14 @@ class ChatbotResponse(BaseModel): average_rating: Optional[float] = None created_at: Optional[datetime] = None published_at: Optional[datetime] = None + show_branding: bool = True + lead_capture_enabled: bool = False + lead_capture_fields: List[str] = ["email"] + lead_capture_trigger: str = "after_first_message" + handoff_enabled: bool = False + handoff_message: str = "I'll connect you with our team. Please wait." + handoff_email: Optional[str] = None + handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"] class ChatbotPublicResponse(BaseModel): @@ -203,6 +228,8 @@ class ChatResponse(BaseModel): sources: List[SourceDocument] = [] model_used: str tokens_used: int = 0 + needs_lead_capture: bool = False + handoff: bool = False class MessageResponse(BaseModel): @@ -239,7 +266,7 @@ class SubscriptionResponse(BaseModel): class CheckoutSessionCreate(BaseModel): - plan: str # starter or pro + plan: str # starter, business, or agency success_url: str cancel_url: str @@ -307,4 +334,84 @@ class SuccessResponse(BaseModel): class ErrorResponse(BaseModel): error: str - detail: Optional[str] = None \ No newline at end of file + detail: Optional[str] = None + + +# ─── Lead Models ─────────────────────────────────────────────────────────────── + +class LeadCreate(BaseModel): + email: Optional[str] = None + name: Optional[str] = None + phone: Optional[str] = None + company: Optional[str] = None + conversation_id: Optional[str] = None + + +class LeadResponse(BaseModel): + id: str + chatbot_id: str + conversation_id: Optional[str] = None + email: Optional[str] = None + name: Optional[str] = None + phone: Optional[str] = None + company: Optional[str] = None + created_at: Optional[datetime] = None + + +# ─── URL Source Models ───────────────────────────────────────────────────────── + +class UrlSourceCreate(BaseModel): + url: str + + +class UrlSourceResponse(BaseModel): + id: str + chatbot_id: str + url: str + status: str + page_title: Optional[str] = None + chunk_count: int = 0 + error_message: Optional[str] = None + created_at: Optional[datetime] = None + + +# ─── Feedback Models ─────────────────────────────────────────────────────────── + +class FeedbackCreate(BaseModel): + message_id: str + feedback: str # 'positive' or 'negative' + + +# ─── Inbox Models ───────────────────────────────────────────────────────────── + +class InboxConversation(BaseModel): + id: str + chatbot_id: str + chatbot_name: str + session_id: Optional[str] = None + language: str + message_count: int + first_message: Optional[str] = None + created_at: Optional[datetime] = None + + +class InboxMessage(BaseModel): + id: str + role: str + content: str + sources: Optional[List[Dict]] = None + confidence_score: Optional[float] = None + is_handoff: bool = False + created_at: Optional[datetime] = None + + +# ─── Channel Models ──────────────────────────────────────────────────────────── + +class ChannelConnectionResponse(BaseModel): + id: str + channel: str + bot_username: Optional[str] = None + wa_keyword: Optional[str] = None + wa_link: Optional[str] = None + is_active: bool + created_at: Optional[datetime] = None \ No newline at end of file diff --git a/app/routers/analytics.py b/app/routers/analytics.py index 3551e85..78687d6 100644 --- a/app/routers/analytics.py +++ b/app/routers/analytics.py @@ -45,6 +45,10 @@ class ChatbotAnalyticsResponse(BaseModel): top_queries: List[TopQuery] languages_used: Dict[str, int] peak_hour: Optional[int] # 0-23 + unanswered_count: int = 0 + unanswered_queries: List[TopQuery] = [] + feedback_positive: int = 0 + feedback_negative: int = 0 class OverviewAnalyticsResponse(BaseModel): @@ -212,6 +216,13 @@ async def get_analytics_overview(user=Depends(get_current_user)): if rating: all_ratings.append(rating) + # Feedback counts + fb_result = supabase.table("message_feedback").select("feedback", count="exact") \ + .eq("chatbot_id", cid).execute() + total_fb = fb_result.count or 0 + fb_pos = sum(1 for f in (fb_result.data or []) if f.get("feedback") == "positive") + fb_neg = total_fb - fb_pos + # Average messages per conversation avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0 @@ -223,7 +234,7 @@ async def get_analytics_overview(user=Depends(get_current_user)): total_messages=msg_count, average_messages_per_conversation=avg_msgs, average_rating=rating, - total_ratings=0, # would need a ratings table for precise count + total_ratings=total_fb, conversations_today=today_count, conversations_this_week=week_count, conversations_this_month=month_count, @@ -231,6 +242,8 @@ async def get_analytics_overview(user=Depends(get_current_user)): top_queries=top_queries, languages_used=lang_counts, peak_hour=peak, + feedback_positive=fb_pos, + feedback_negative=fb_neg, )) # Overall average rating @@ -345,6 +358,48 @@ async def get_chatbot_analytics(chatbot_id: str, user=Depends(get_current_user)) avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0 + # Feedback counts + fb_pos = 0 + fb_neg = 0 + if conv_ids and conv_ids != [""]: + feedback = supabase.table("message_feedback").select("feedback") \ + .eq("chatbot_id", chatbot_id).execute() + for f in (feedback.data or []): + if f["feedback"] == "positive": + fb_pos += 1 + else: + fb_neg += 1 + + # Unanswered queries (low confidence) + unanswered_queries: List[TopQuery] = [] + unanswered_count = 0 + if conv_ids and conv_ids != [""]: + try: + low_conf_msgs = supabase.table("messages").select("id, conversation_id, confidence_score") \ + .in_("conversation_id", conv_ids[:100]) \ + .eq("role", "assistant") \ + .lt("confidence_score", 0.2) \ + .limit(200).execute() + unanswered_count = len(low_conf_msgs.data or []) + # For each low-confidence assistant message, find the preceding user message + if low_conf_msgs.data: + unanswered_q_counts: Dict[str, int] = {} + for lm in low_conf_msgs.data[:20]: # limit work + prev_user = supabase.table("messages").select("content") \ + .eq("conversation_id", lm["conversation_id"]) \ + .eq("role", "user") \ + .lt("created_at", lm.get("created_at", "9999")) \ + .order("created_at", desc=True) \ + .limit(1).execute() + if prev_user.data: + q = (prev_user.data[0].get("content") or "")[:100].strip() + if q: + unanswered_q_counts[q] = unanswered_q_counts.get(q, 0) + 1 + top_unanswered = sorted(unanswered_q_counts.items(), key=lambda x: -x[1])[:5] + unanswered_queries = [TopQuery(query=q, count=n) for q, n in top_unanswered] + except Exception: + pass # unanswered queries is optional + return ChatbotAnalyticsResponse( chatbot_id=chatbot_id, chatbot_name=cb.get("name", "Untitled"), @@ -353,7 +408,7 @@ async def get_chatbot_analytics(chatbot_id: str, user=Depends(get_current_user)) total_messages=msg_count, average_messages_per_conversation=avg_msgs, average_rating=cb.get("average_rating"), - total_ratings=0, + total_ratings=fb_pos + fb_neg, conversations_today=today_count, conversations_this_week=week_count, conversations_this_month=month_count, @@ -361,4 +416,56 @@ async def get_chatbot_analytics(chatbot_id: str, user=Depends(get_current_user)) top_queries=top_queries, languages_used=lang_counts, peak_hour=peak, + unanswered_count=unanswered_count, + unanswered_queries=unanswered_queries, + feedback_positive=fb_pos, + feedback_negative=fb_neg, ) + + +@router.get("/chatbot/{chatbot_id}/gaps", response_model=List[TopQuery]) +async def get_knowledge_gaps(chatbot_id: str, user=Depends(get_current_user)): + """Returns top queries where the bot had low confidence (knowledge gaps). Starter+ only.""" + plan = _get_user_plan(user.id) + _check_analytics_access(plan) + + supabase = get_supabase() + company = supabase.table("companies").select("id").eq("owner_id", user.id).execute() + if not company.data: + raise HTTPException(status_code=404, detail="Company not found") + + chatbot = supabase.table("chatbots").select("id") \ + .eq("id", chatbot_id).eq("company_id", company.data[0]["id"]).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + + # Find conversations + convs = supabase.table("conversations").select("id").eq("chatbot_id", chatbot_id).execute() + conv_ids = [c["id"] for c in (convs.data or [])] + if not conv_ids: + return [] + + # Low confidence assistant messages + low_conf = supabase.table("messages").select("id, conversation_id, created_at") \ + .in_("conversation_id", conv_ids[:100]) \ + .eq("role", "assistant") \ + .lt("confidence_score", 0.2) \ + .limit(100).execute() + + if not low_conf.data: + return [] + + q_counts: Dict[str, int] = {} + for msg in low_conf.data[:30]: + prev = supabase.table("messages").select("content") \ + .eq("conversation_id", msg["conversation_id"]) \ + .eq("role", "user") \ + .order("created_at", desc=True) \ + .limit(1).execute() + if prev.data: + content = (prev.data[0].get("content") or "")[:100].strip() + if content: + q_counts[content] = q_counts.get(content, 0) + 1 + + sorted_gaps = sorted(q_counts.items(), key=lambda x: -x[1])[:10] + return [TopQuery(query=q, count=n) for q, n in sorted_gaps] diff --git a/app/routers/auth.py b/app/routers/auth.py index 4d5cda4..17add8c 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -2,15 +2,34 @@ from fastapi import APIRouter, HTTPException, status, Depends from app.models import UserSignup, UserLogin, UserResponse, TokenResponse from app.database import get_supabase from app.dependencies import get_current_user +from app.config import settings +from pydantic import BaseModel, EmailStr, Field +from typing import Optional import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["Authentication"]) +class ForgotPasswordRequest(BaseModel): + email: EmailStr + + +class ResetPasswordRequest(BaseModel): + access_token: str + new_password: str = Field(min_length=8) + + +class ProfileUpdate(BaseModel): + company_name: Optional[str] = None + current_password: Optional[str] = None + new_password: Optional[str] = Field(default=None, min_length=8) + + @router.post("/signup", response_model=TokenResponse) async def signup(data: UserSignup): supabase = get_supabase() + user_id = None try: # Create auth user auth_resp = supabase.auth.sign_up( @@ -20,6 +39,7 @@ async def signup(data: UserSignup): raise HTTPException(status_code=400, detail="Failed to create account") user = auth_resp.user + user_id = user.id # Create company record supabase.table("companies").insert( @@ -52,6 +72,12 @@ async def signup(data: UserSignup): raise except Exception as e: logger.error(f"Signup error: {e}") + # Rollback auth user if company/subscription creation failed + if user_id and "already registered" not in str(e).lower(): + try: + supabase.auth.admin.delete_user(user_id) + except Exception as rb_err: + logger.error(f"Signup rollback failed: {rb_err}") if "already registered" in str(e).lower() or "already exists" in str(e).lower(): raise HTTPException(status_code=400, detail="Email already registered") raise HTTPException(status_code=400, detail=str(e)) @@ -99,6 +125,85 @@ async def login(data: UserLogin): raise HTTPException(status_code=401, detail="Invalid credentials") +@router.post("/forgot-password") +async def forgot_password(data: ForgotPasswordRequest): + """Send password reset email via Supabase. Always returns success to prevent email enumeration.""" + supabase = get_supabase() + try: + supabase.auth.reset_password_for_email( + data.email, + options={"redirect_to": f"{settings.app_url}/reset-password"}, + ) + except Exception as e: + logger.warning(f"Forgot password request error (suppressed): {e}") + return {"message": "If that email is registered, a password reset link has been sent."} + + +@router.post("/reset-password") +async def reset_password(data: ResetPasswordRequest): + """Reset user password using the recovery token from the reset email.""" + supabase = get_supabase() + try: + user_response = supabase.auth.get_user(data.access_token) + if not user_response.user: + raise HTTPException(status_code=400, detail="Invalid or expired reset token") + supabase.auth.admin.update_user_by_id( + user_response.user.id, + {"password": data.new_password}, + ) + return {"message": "Password updated successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Password reset error: {e}") + raise HTTPException(status_code=400, detail="Invalid or expired reset token") + + +@router.patch("/profile", response_model=UserResponse) +async def update_profile(data: ProfileUpdate, user=Depends(get_current_user)): + """Update company name and/or password.""" + supabase = get_supabase() + + if data.company_name: + supabase.table("companies").update({"name": data.company_name}).eq("owner_id", user.id).execute() + + if data.new_password: + if not data.current_password: + raise HTTPException(status_code=400, detail="Current password required to change password") + try: + supabase.auth.sign_in_with_password({"email": user.email, "password": data.current_password}) + except Exception: + raise HTTPException(status_code=400, detail="Current password is incorrect") + supabase.auth.admin.update_user_by_id(user.id, {"password": data.new_password}) + + company = supabase.table("companies").select("name").eq("owner_id", user.id).execute() + company_name = company.data[0]["name"] if company.data else "" + sub = supabase.table("subscriptions").select("plan").eq("user_id", user.id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + + return UserResponse(id=user.id, email=user.email, company_name=company_name, plan=plan) + + +@router.delete("/account") +async def delete_account(user=Depends(get_current_user)): + """Permanently delete account, company, chatbots, and all data.""" + supabase = get_supabase() + + company = supabase.table("companies").select("id").eq("owner_id", user.id).execute() + if company.data: + supabase.table("companies").delete().eq("id", company.data[0]["id"]).execute() + + supabase.table("subscriptions").delete().eq("user_id", user.id).execute() + + try: + supabase.auth.admin.delete_user(user.id) + except Exception as e: + logger.error(f"Failed to delete auth user {user.id}: {e}") + raise HTTPException(status_code=500, detail="Failed to delete account") + + return {"message": "Account deleted successfully"} + + @router.post("/logout") async def logout(user=Depends(get_current_user)): supabase = get_supabase() diff --git a/app/routers/billing.py b/app/routers/billing.py index 3b7ef08..d71313a 100644 --- a/app/routers/billing.py +++ b/app/routers/billing.py @@ -11,7 +11,8 @@ router = APIRouter(prefix="/billing", tags=["Billing"]) PLAN_PRICE_IDS = { "starter": settings.stripe_starter_price_id, - "pro": settings.stripe_pro_price_id, + "business": settings.stripe_business_price_id, + "agency": settings.stripe_agency_price_id, } @@ -70,15 +71,20 @@ async def stripe_webhook( stripe.api_key = settings.stripe_secret_key payload = await request.body() - if settings.stripe_webhook_secret and stripe_signature: + if settings.stripe_webhook_secret: + if not stripe_signature: + raise HTTPException(status_code=400, detail="Missing Stripe signature") try: event = stripe.Webhook.construct_event( payload, stripe_signature, settings.stripe_webhook_secret ) except stripe.error.SignatureVerificationError: raise HTTPException(status_code=400, detail="Invalid signature") + elif settings.app_env == "production": + raise HTTPException(status_code=500, detail="Webhook secret not configured") else: import json + logger.warning("Stripe webhook received without signature verification (dev mode only)") event = json.loads(payload) supabase = get_supabase() @@ -122,6 +128,18 @@ async def stripe_webhook( "visibility": "preview", }).eq("company_id", company.data[0]["id"]).execute() + # Send cancellation notification via n8n + if settings.n8n_handoff_webhook_url: + try: + from app.services.n8n_service import send_notification + await send_notification( + event_type="subscription_canceled", + data={"user_id": user_id}, + webhook_url=settings.n8n_handoff_webhook_url, + ) + except Exception as e: + logger.warning(f"Failed to send cancellation notification: {e}") + return {"received": True} except HTTPException: diff --git a/app/routers/channels.py b/app/routers/channels.py new file mode 100644 index 0000000..2773e00 --- /dev/null +++ b/app/routers/channels.py @@ -0,0 +1,518 @@ +import json +import re +import uuid +import logging + +from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response +from pydantic import BaseModel +from typing import List, Optional + +from app.config import settings, PLAN_LIMITS +from app.database import get_supabase +from app.dependencies import get_current_user +from app.services.rag import rag_engine +from app.services.telegram_service import ( + delete_webhook, get_bot_info, send_message as tg_send, set_webhook, +) +from app.services.whatsapp_service import send_message as wa_send, verify_signature +from app.routers.chat import ( + _get_conversation_history, _get_or_create_conversation, _save_message, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/channels", tags=["Channels"]) +webhook_router = APIRouter(prefix="/webhooks", tags=["Webhooks"]) + + +# ── Request models ──────────────────────────────────────────────────────────── + +class TelegramConnectRequest(BaseModel): + chatbot_id: str + bot_token: str + + +class WhatsAppConnectRequest(BaseModel): + chatbot_id: str + wa_keyword: Optional[str] = None + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase): + company = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + if not company.data: + raise HTTPException(status_code=404, detail="Company not found") + chatbot = ( + supabase.table("chatbots").select("id") + .eq("id", chatbot_id) + .eq("company_id", company.data[0]["id"]) + .execute() + ) + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + + +def _generate_keyword(chatbot_name: str, chatbot_id: str, supabase) -> str: + base = re.sub(r"[^A-Z0-9]", "", chatbot_name.upper())[:10] or "BOT" + existing = ( + supabase.table("channel_connections") + .select("id").eq("channel", "whatsapp").eq("wa_keyword", base).execute() + ) + if not existing.data: + return base + suffix = chatbot_id.replace("-", "")[:4].upper() + return f"{base[:6]}{suffix}" + + +def _get_or_create_channel_session( + chatbot_id: str, channel: str, external_id: str, supabase +) -> dict: + existing = ( + supabase.table("channel_sessions") + .select("*").eq("channel", channel).eq("external_id", external_id).execute() + ) + if existing.data: + return existing.data[0] + row = { + "id": str(uuid.uuid4()), + "chatbot_id": chatbot_id, + "channel": channel, + "external_id": external_id, + "session_id": str(uuid.uuid4()), + } + result = supabase.table("channel_sessions").insert(row).execute() + return result.data[0] + + +def _upsert_whatsapp_session(chatbot_id: str, phone: str, session_id: str, supabase): + existing = ( + supabase.table("channel_sessions") + .select("id").eq("channel", "whatsapp").eq("external_id", phone).execute() + ) + if existing.data: + supabase.table("channel_sessions").update( + {"chatbot_id": chatbot_id, "session_id": session_id} + ).eq("id", existing.data[0]["id"]).execute() + else: + supabase.table("channel_sessions").insert({ + "id": str(uuid.uuid4()), + "chatbot_id": chatbot_id, + "channel": "whatsapp", + "external_id": phone, + "session_id": session_id, + }).execute() + + +def _check_channel_plan(user_id: str, channel: str, supabase): + """Raise 402 if the user's plan doesn't include the requested channel.""" + sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + allowed = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("channels", []) + if channel not in allowed: + label = {"telegram": "Starter", "whatsapp": "Business"}.get(channel, "a higher") + raise HTTPException(status_code=402, detail=f"{channel.title()} channel requires {label} plan or higher") + + +def _check_chatbot_channel_subscription(chatbot_id: str, channel: str, supabase) -> bool: + """Return False (drop message) if the chatbot owner's plan no longer includes this channel.""" + chatbot = supabase.table("chatbots").select("company_id").eq("id", chatbot_id).execute() + if not chatbot.data: + return False + company = supabase.table("companies").select("owner_id").eq("id", chatbot.data[0]["company_id"]).execute() + if not company.data: + return False + owner_id = company.data[0]["owner_id"] + sub = supabase.table("subscriptions").select("plan, status").eq("user_id", owner_id).execute() + if not sub.data: + return False + row = sub.data[0] + if row.get("status") != "active": + return False + allowed = PLAN_LIMITS.get(row.get("plan", "free"), PLAN_LIMITS["free"]).get("channels", []) + return channel in allowed + + +def _detect_language(text: str) -> str: + """Simple script-based language detection for channel messages.""" + sample = text[:200] + total = sum(1 for c in sample if not c.isspace()) + if total == 0: + return "en" + scores = { + "ar": sum(1 for c in sample if "\u0600" <= c <= "\u06FF") / total, + "zh": sum(1 for c in sample if "\u4E00" <= c <= "\u9FFF") / total, + "ja": sum(1 for c in sample if "\u3040" <= c <= "\u30FF") / total, + "he": sum(1 for c in sample if "\u0590" <= c <= "\u05FF") / total, + "ru": sum(1 for c in sample if "\u0400" <= c <= "\u04FF") / total, + "fr": 0.0, "es": 0.0, # Latin-based — can't detect from scripts alone + } + best = max(scores, key=scores.get) + return best if scores[best] > 0.25 else "en" + + +def _wa_link(keyword: str) -> str: + if settings.whatsapp_display_number: + return f"https://wa.me/{settings.whatsapp_display_number}?text=START+{keyword}" + return "" + + +# ── CRUD endpoints ──────────────────────────────────────────────────────────── + +@router.get("") +async def list_channels(chatbot_id: str = Query(...), user=Depends(get_current_user)): + supabase = get_supabase() + _verify_chatbot_ownership(chatbot_id, user.id, supabase) + result = ( + supabase.table("channel_connections") + .select("id,channel,bot_username,wa_keyword,is_active,created_at") + .eq("chatbot_id", chatbot_id) + .execute() + ) + rows = result.data or [] + for row in rows: + if row["channel"] == "whatsapp" and row.get("wa_keyword"): + row["wa_link"] = _wa_link(row["wa_keyword"]) + else: + row["wa_link"] = None + return rows + + +@router.post("/telegram") +async def connect_telegram(data: TelegramConnectRequest, user=Depends(get_current_user)): + supabase = get_supabase() + _verify_chatbot_ownership(data.chatbot_id, user.id, supabase) + _check_channel_plan(user.id, "telegram", supabase) + + bot_info = await get_bot_info(data.bot_token) + if not bot_info: + raise HTTPException( + status_code=400, + detail="Invalid bot token. Check your token from @BotFather.", + ) + + if not settings.api_url: + raise HTTPException(status_code=500, detail="API_URL not configured on the server") + + webhook_url = f"{settings.api_url.rstrip('/')}/api/v1/webhooks/telegram/{data.bot_token}" + ok = await set_webhook(data.bot_token, webhook_url) + if not ok: + raise HTTPException(status_code=500, detail="Failed to register webhook with Telegram") + + existing = ( + supabase.table("channel_connections") + .select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute() + ) + conn_data = { + "chatbot_id": data.chatbot_id, + "channel": "telegram", + "bot_token": data.bot_token, + "bot_username": bot_info.get("username", ""), + "is_active": True, + } + if existing.data: + supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute() + else: + conn_data["id"] = str(uuid.uuid4()) + supabase.table("channel_connections").insert(conn_data).execute() + + username = bot_info.get("username", "") + return { + "success": True, + "bot_username": username, + "bot_link": f"https://t.me/{username}", + } + + +@router.post("/whatsapp") +async def connect_whatsapp(data: WhatsAppConnectRequest, user=Depends(get_current_user)): + supabase = get_supabase() + _verify_chatbot_ownership(data.chatbot_id, user.id, supabase) + _check_channel_plan(user.id, "whatsapp", supabase) + + chatbot = supabase.table("chatbots").select("name").eq("id", data.chatbot_id).execute() + chatbot_name = chatbot.data[0]["name"] if chatbot.data else "BOT" + + keyword = (data.wa_keyword or _generate_keyword(chatbot_name, data.chatbot_id, supabase)).upper() + keyword = re.sub(r"[^A-Z0-9]", "", keyword)[:12] + if not keyword: + raise HTTPException(status_code=400, detail="Keyword must contain letters or numbers") + + taken = ( + supabase.table("channel_connections") + .select("id").eq("channel", "whatsapp").eq("wa_keyword", keyword) + .neq("chatbot_id", data.chatbot_id).execute() + ) + if taken.data: + raise HTTPException(status_code=400, detail=f"Keyword '{keyword}' is already taken. Choose a different one.") + + existing = ( + supabase.table("channel_connections") + .select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "whatsapp").execute() + ) + conn_data = { + "chatbot_id": data.chatbot_id, + "channel": "whatsapp", + "wa_keyword": keyword, + "is_active": True, + } + if existing.data: + supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute() + else: + conn_data["id"] = str(uuid.uuid4()) + supabase.table("channel_connections").insert(conn_data).execute() + + return {"success": True, "keyword": keyword, "wa_link": _wa_link(keyword)} + + +@router.delete("/{connection_id}") +async def disconnect_channel(connection_id: str, user=Depends(get_current_user)): + supabase = get_supabase() + conn = supabase.table("channel_connections").select("*").eq("id", connection_id).execute() + if not conn.data: + raise HTTPException(status_code=404, detail="Connection not found") + + c = conn.data[0] + _verify_chatbot_ownership(c["chatbot_id"], user.id, supabase) + + if c["channel"] == "telegram" and c.get("bot_token"): + try: + await delete_webhook(c["bot_token"]) + except Exception: + pass + + supabase.table("channel_connections").delete().eq("id", connection_id).execute() + return {"success": True} + + +# ── Telegram webhook ────────────────────────────────────────────────────────── + +@webhook_router.post("/telegram/{bot_token}") +async def telegram_webhook(bot_token: str, request: Request): + try: + body = await request.json() + except Exception: + return {"ok": True} + + message = body.get("message") or body.get("edited_message") + if not message or "text" not in message: + return {"ok": True} + + chat_id = message["chat"]["id"] + text = message["text"].strip() + + supabase = get_supabase() + conn = ( + supabase.table("channel_connections") + .select("*").eq("channel", "telegram").eq("bot_token", bot_token).eq("is_active", True).execute() + ) + if not conn.data: + return {"ok": True} + + chatbot_id = conn.data[0]["chatbot_id"] + + # Check subscription still allows Telegram + if not _check_chatbot_channel_subscription(chatbot_id, "telegram", supabase): + await tg_send(bot_token, chat_id, "This service is currently unavailable. Please contact the business directly.") + return {"ok": True} + + chatbot_result = ( + supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute() + ) + if not chatbot_result.data: + return {"ok": True} + chatbot = chatbot_result.data[0] + + if not chatbot.get("is_published"): + await tg_send(bot_token, chat_id, "This chatbot is not yet published.") + return {"ok": True} + + collection_name = chatbot.get("qdrant_collection_name") + if not collection_name: + await tg_send(bot_token, chat_id, "This chatbot has no knowledge base configured yet.") + return {"ok": True} + + if text.startswith("/start"): + welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?" + await tg_send(bot_token, chat_id, welcome) + return {"ok": True} + + # Use first 8 chars of token as namespace to avoid collisions between bots + external_id = f"tg:{bot_token[:8]}:{chat_id}" + session = _get_or_create_channel_session(chatbot_id, "telegram", external_id, supabase) + + detected_lang = _detect_language(text) + company_data = chatbot.get("companies", {}) or {} + conversation = _get_or_create_conversation( + chatbot_id=chatbot_id, + session_id=session["session_id"], + user_id=None, + language=detected_lang, + supabase=supabase, + ) + history = _get_conversation_history(conversation["id"], supabase) + chatbot_config = {**chatbot, "company_name": company_data.get("name", "")} + + try: + result = await rag_engine.process_query( + query=text, + collection_name=collection_name, + chatbot_config=chatbot_config, + conversation_history=history, + language=detected_lang, + ) + except Exception as e: + logger.error(f"Telegram RAG error for chatbot {chatbot_id}: {e}") + await tg_send(bot_token, chat_id, "Sorry, I encountered an error. Please try again.") + return {"ok": True} + + confidence_score = max((s.score for s in result.get("sources", [])), default=0.0) + _save_message(conversation["id"], "user", text, supabase) + _save_message( + conversation["id"], "assistant", result["response"], supabase, + sources=[s.model_dump() for s in result.get("sources", [])], + model=result.get("model", ""), + confidence_score=confidence_score, + ) + supabase.table("conversations").update( + {"message_count": len(history) + 2} + ).eq("id", conversation["id"]).execute() + supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute() + + await tg_send(bot_token, chat_id, result["response"]) + return {"ok": True} + + +# ── WhatsApp webhooks ───────────────────────────────────────────────────────── + +@webhook_router.get("/whatsapp") +async def whatsapp_verify(request: Request): + """Meta webhook verification challenge.""" + params = dict(request.query_params) + if ( + params.get("hub.mode") == "subscribe" + and settings.whatsapp_verify_token + and params.get("hub.verify_token") == settings.whatsapp_verify_token + ): + return Response(content=params["hub.challenge"], media_type="text/plain") + raise HTTPException(status_code=403, detail="Forbidden") + + +@webhook_router.post("/whatsapp") +async def whatsapp_webhook(request: Request): + """Receive messages from WhatsApp Cloud API.""" + raw_body = await request.body() + + if settings.whatsapp_app_secret: + sig = request.headers.get("X-Hub-Signature-256", "") + if not verify_signature(raw_body, sig, settings.whatsapp_app_secret): + raise HTTPException(status_code=403, detail="Invalid signature") + + try: + body = json.loads(raw_body) + value = body["entry"][0]["changes"][0]["value"] + if "messages" not in value: + return {"ok": True} + msg = value["messages"][0] + if msg.get("type") != "text": + return {"ok": True} + from_number = msg["from"] + text = msg["text"]["body"].strip() + except (KeyError, IndexError, json.JSONDecodeError): + return {"ok": True} + + supabase = get_supabase() + + async def _wa_reply(to: str, message: str): + if settings.whatsapp_access_token and settings.whatsapp_phone_number_id: + await wa_send(settings.whatsapp_phone_number_id, to, message, settings.whatsapp_access_token) + + # Handle START + if text.upper().startswith("START "): + keyword = re.sub(r"[^A-Z0-9]", "", text[6:].strip().upper()) + conn = ( + supabase.table("channel_connections") + .select("*").eq("channel", "whatsapp").eq("wa_keyword", keyword).eq("is_active", True).execute() + ) + if not conn.data: + await _wa_reply(from_number, f"Sorry, chatbot '{keyword}' not found. Use the link from the business you're contacting.") + return {"ok": True} + + chatbot_id = conn.data[0]["chatbot_id"] + chatbot_result = supabase.table("chatbots").select("name,welcome_message").eq("id", chatbot_id).execute() + chatbot = chatbot_result.data[0] if chatbot_result.data else {} + + _upsert_whatsapp_session(chatbot_id, from_number, str(uuid.uuid4()), supabase) + welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?" + await _wa_reply(from_number, welcome) + return {"ok": True} + + # Regular message — find active session + session_result = ( + supabase.table("channel_sessions") + .select("*").eq("channel", "whatsapp").eq("external_id", from_number).execute() + ) + if not session_result.data: + await _wa_reply(from_number, "To start chatting, use the WhatsApp link from the business you're trying to contact.") + return {"ok": True} + + session = session_result.data[0] + chatbot_id = session["chatbot_id"] + + # Check subscription still allows WhatsApp + if not _check_chatbot_channel_subscription(chatbot_id, "whatsapp", supabase): + await _wa_reply(from_number, "This service is currently unavailable. Please contact the business directly.") + return {"ok": True} + + chatbot_result = ( + supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute() + ) + if not chatbot_result.data: + return {"ok": True} + chatbot = chatbot_result.data[0] + + collection_name = chatbot.get("qdrant_collection_name") + if not collection_name: + await _wa_reply(from_number, "This chatbot isn't ready yet. Please try again later.") + return {"ok": True} + + detected_lang = _detect_language(text) + company_data = chatbot.get("companies", {}) or {} + conversation = _get_or_create_conversation( + chatbot_id=chatbot_id, + session_id=session["session_id"], + user_id=None, + language=detected_lang, + supabase=supabase, + ) + history = _get_conversation_history(conversation["id"], supabase) + chatbot_config = {**chatbot, "company_name": company_data.get("name", "")} + + try: + result = await rag_engine.process_query( + query=text, + collection_name=collection_name, + chatbot_config=chatbot_config, + conversation_history=history, + language=detected_lang, + ) + except Exception as e: + logger.error(f"WhatsApp RAG error for chatbot {chatbot_id}: {e}") + await _wa_reply(from_number, "Sorry, I encountered an error. Please try again.") + return {"ok": True} + + confidence_score = max((s.score for s in result.get("sources", [])), default=0.0) + _save_message(conversation["id"], "user", text, supabase) + _save_message( + conversation["id"], "assistant", result["response"], supabase, + sources=[s.model_dump() for s in result.get("sources", [])], + model=result.get("model", ""), + confidence_score=confidence_score, + ) + supabase.table("conversations").update( + {"message_count": len(history) + 2} + ).eq("id", conversation["id"]).execute() + supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute() + + await _wa_reply(from_number, result["response"]) + return {"ok": True} diff --git a/app/routers/chat.py b/app/routers/chat.py index 88eedfe..3a7540f 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -1,15 +1,73 @@ -from fastapi import APIRouter, HTTPException, Depends -from app.models import ChatMessage, ChatResponse, ConversationResponse, MessageResponse +import time +from collections import defaultdict + +from fastapi import APIRouter, HTTPException, Depends, Request +from app.models import ChatMessage, ChatResponse, ConversationResponse, MessageResponse, FeedbackCreate from app.database import get_supabase from app.dependencies import get_current_user, get_optional_user from app.services.rag import rag_engine +from app.config import PLAN_LIMITS from typing import List, Optional +from datetime import datetime import uuid import logging logger = logging.getLogger(__name__) router = APIRouter(tags=["Chat"]) +# ── Simple in-memory rate limiter ──────────────────────────────────────────── +_rate_store: dict = defaultdict(list) +_RATE_LIMIT = 30 # max requests +_RATE_WINDOW = 60 # per second window + + +def _check_rate_limit(client_ip: str): + now = time.time() + _rate_store[client_ip] = [t for t in _rate_store[client_ip] if now - t < _RATE_WINDOW] + if len(_rate_store[client_ip]) >= _RATE_LIMIT: + raise HTTPException( + status_code=429, + detail="Too many requests. Please wait before sending more messages.", + ) + _rate_store[client_ip].append(now) + + +def _check_conversation_limit(chatbot: dict, supabase): + """Check if the chatbot owner has remaining conversation quota this month.""" + company_id = chatbot.get("company_id") + if not company_id: + return + + company = supabase.table("companies").select("owner_id").eq("id", company_id).execute() + if not company.data: + return + + owner_id = company.data[0]["owner_id"] + sub = supabase.table("subscriptions").select("plan").eq("user_id", owner_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + limit = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("conversations_limit", 100) + + if limit >= 999999: + return # unlimited + + month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat() + chatbots = supabase.table("chatbots").select("id").eq("company_id", company_id).execute() + chatbot_ids = [c["id"] for c in (chatbots.data or [])] + if not chatbot_ids: + return + + count_result = supabase.table("conversations").select("id", count="exact") \ + .in_("chatbot_id", chatbot_ids) \ + .gte("created_at", month_start) \ + .execute() + + used = count_result.count or 0 + if used >= limit: + raise HTTPException( + status_code=429, + detail=f"This chatbot's monthly conversation limit ({limit}) has been reached. Please try again next month.", + ) + def _get_public_chatbot(chatbot_id: str, supabase) -> dict: """Get a published chatbot (or any chatbot for preview)""" @@ -23,8 +81,13 @@ def _get_public_chatbot(chatbot_id: str, supabase) -> dict: async def chat( chatbot_id: str, message: ChatMessage, + request: Request, user=Depends(get_optional_user), ): + # Rate limiting by IP + client_ip = request.client.host if request.client else "unknown" + _check_rate_limit(client_ip) + supabase = get_supabase() chatbot = _get_public_chatbot(chatbot_id, supabase) @@ -43,6 +106,13 @@ async def chat( # Get or create conversation session_id = message.session_id or str(uuid.uuid4()) + + # Only enforce conversation limit for new sessions (not ongoing ones) + is_existing = supabase.table("conversations").select("id") \ + .eq("chatbot_id", chatbot_id).eq("session_id", session_id).execute() + if not is_existing.data: + _check_conversation_limit(chatbot, supabase) + conversation = _get_or_create_conversation( chatbot_id=chatbot_id, session_id=session_id, @@ -70,6 +140,42 @@ async def chat( language=message.language, ) + # Compute confidence score + confidence_score = max((s.score for s in result.get("sources", [])), default=0.0) + + # Check handoff + is_handoff = False + if chatbot.get("handoff_enabled"): + handoff_keywords = chatbot.get("handoff_keywords", []) + msg_lower = message.message.lower() + if any(kw.lower() in msg_lower for kw in handoff_keywords): + is_handoff = True + # Fire n8n notification (async, non-blocking) + try: + from app.services.n8n_service import send_handoff_notification + from app.config import settings as _settings + company_data_for_handoff = chatbot.get("companies", {}) or {} + await send_handoff_notification( + chatbot_name=chatbot.get("name", ""), + owner_email=chatbot.get("handoff_email") or "", + conversation_history=history, + trigger_message=message.message, + chatbot_id=chatbot_id, + conversation_id=conversation["id"], + webhook_url=_settings.n8n_handoff_webhook_url, + ) + except Exception: + pass # never block chat on handoff failure + + # Check lead capture + needs_lead_capture = False + if ( + chatbot.get("lead_capture_enabled") + and chatbot.get("lead_capture_trigger") == "after_first_message" + and len(history) == 0 + ): + needs_lead_capture = True + # Save messages _save_message(conversation["id"], "user", message.message, supabase) _save_message( @@ -79,6 +185,8 @@ async def chat( supabase, sources=[s.model_dump() for s in result.get("sources", [])], model=result.get("model", ""), + confidence_score=confidence_score, + is_handoff=is_handoff, ) # Update conversation message count @@ -92,6 +200,8 @@ async def chat( sources=result.get("sources", []), model_used=result.get("model", ""), tokens_used=result.get("tokens_used", 0), + needs_lead_capture=needs_lead_capture, + handoff=is_handoff, ) @@ -129,6 +239,30 @@ async def get_chat_history( ] +@router.post("/chat/{chatbot_id}/feedback") +async def submit_feedback(chatbot_id: str, data: FeedbackCreate): + """Submit feedback (thumbs up/down) for a message. No auth required.""" + import uuid as _uuid + if data.feedback not in ("positive", "negative"): + raise HTTPException(status_code=400, detail="Feedback must be 'positive' or 'negative'") + + supabase = get_supabase() + + # Verify message exists + msg = supabase.table("messages").select("id, conversation_id").eq("id", data.message_id).execute() + if not msg.data: + raise HTTPException(status_code=404, detail="Message not found") + + supabase.table("message_feedback").insert({ + "id": str(_uuid.uuid4()), + "message_id": data.message_id, + "chatbot_id": chatbot_id, + "feedback": data.feedback, + }).execute() + + return {"success": True} + + # ── OLD analytics endpoint REMOVED ─────────────────────────────────────────── # The /analytics/{chatbot_id} endpoint that was here has been replaced by # the dedicated analytics router (app/routers/analytics.py) which provides: @@ -188,6 +322,8 @@ def _save_message( supabase, sources: Optional[list] = None, model: str = "", + confidence_score: Optional[float] = None, + is_handoff: bool = False, ): supabase.table("messages").insert({ "id": str(uuid.uuid4()), @@ -196,4 +332,6 @@ def _save_message( "content": content, "sources": sources, "model": model, + "confidence_score": confidence_score, + "is_handoff": is_handoff, }).execute() \ No newline at end of file diff --git a/app/routers/chatbots.py b/app/routers/chatbots.py index 40b02c4..77c2711 100644 --- a/app/routers/chatbots.py +++ b/app/routers/chatbots.py @@ -73,6 +73,14 @@ async def create_chatbot(data: ChatbotCreate, user=Depends(get_current_user)): "visibility": "preview", "is_published": False, "qdrant_collection_name": collection_name, + "show_branding": data.show_branding, + "lead_capture_enabled": data.lead_capture_enabled, + "lead_capture_fields": data.lead_capture_fields, + "lead_capture_trigger": data.lead_capture_trigger, + "handoff_enabled": data.handoff_enabled, + "handoff_message": data.handoff_message, + "handoff_email": data.handoff_email, + "handoff_keywords": data.handoff_keywords, } result = supabase.table("chatbots").insert(chatbot_data).execute() @@ -180,8 +188,8 @@ async def export_chatbot(chatbot_id: str, user=Depends(get_current_user)): # Check plan sub = supabase.table("subscriptions").select("plan").eq("user_id", user.id).eq("status", "active").execute() plan = sub.data[0]["plan"] if sub.data else "free" - if plan not in ("pro", "enterprise"): - raise HTTPException(status_code=402, detail="Code export requires Pro plan or higher") + if plan not in ("agency", "enterprise"): + raise HTTPException(status_code=402, detail="Code export requires Agency plan or higher") zip_bytes = generate_export_package( chatbot=chatbot, @@ -198,6 +206,55 @@ async def export_chatbot(chatbot_id: str, user=Depends(get_current_user)): ) +@router.get("/{chatbot_id}/public") +async def get_chatbot_public(chatbot_id: str): + """Public endpoint - returns basic info for a published chatbot (used by PublicChatPage).""" + supabase = get_supabase() + result = supabase.table("chatbots").select("id, name, welcome_message, primary_color, logo_url, show_branding, is_published, description").eq("id", chatbot_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + chatbot = result.data[0] + if not chatbot.get("is_published"): + raise HTTPException(status_code=404, detail="Chatbot not found or not published") + return { + "id": chatbot["id"], + "name": chatbot["name"], + "welcome_message": chatbot.get("welcome_message", "Hello! How can I help?"), + "primary_color": chatbot.get("primary_color", "#6366f1"), + "logo_url": chatbot.get("logo_url"), + "show_branding": chatbot.get("show_branding", True), + "is_published": chatbot.get("is_published", False), + "description": chatbot.get("description"), + } + + +@router.get("/{chatbot_id}/embed") +async def get_chatbot_embed(chatbot_id: str, user=Depends(get_current_user)): + """Returns embed info including the widget script tag for a chatbot.""" + from app.config import settings + supabase = get_supabase() + company = _get_user_company(user.id, supabase) + chatbot = _get_owned_chatbot(chatbot_id, company["id"], supabase) + + api_url = "http://localhost:8000" # In production, read from settings + app_url = settings.app_url + + embed_script = f'' + chat_url = f"{app_url}/chat/{chatbot_id}" + + return { + "chatbot_id": chatbot_id, + "name": chatbot.get("name"), + "primary_color": chatbot.get("primary_color", "#6366f1"), + "welcome_message": chatbot.get("welcome_message"), + "logo_url": chatbot.get("logo_url"), + "show_branding": chatbot.get("show_branding", True), + "embed_script": embed_script, + "chat_url": chat_url, + "is_published": chatbot.get("is_published", False), + } + + # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_owned_chatbot(chatbot_id: str, company_id: str, supabase) -> dict: @@ -238,4 +295,12 @@ def _format_chatbot(chatbot: dict, supabase) -> ChatbotResponse: conversation_count=conv_count.count or 0, created_at=chatbot.get("created_at"), published_at=chatbot.get("published_at"), + show_branding=chatbot.get("show_branding", True), + lead_capture_enabled=chatbot.get("lead_capture_enabled", False), + lead_capture_fields=chatbot.get("lead_capture_fields") or ["email"], + lead_capture_trigger=chatbot.get("lead_capture_trigger", "after_first_message"), + handoff_enabled=chatbot.get("handoff_enabled", False), + handoff_message=chatbot.get("handoff_message", "I'll connect you with our team. Please wait."), + handoff_email=chatbot.get("handoff_email"), + handoff_keywords=chatbot.get("handoff_keywords") or ["human", "agent", "speak to someone", "talk to a person", "real person"], ) \ No newline at end of file diff --git a/app/routers/documents.py b/app/routers/documents.py index 4eaaf63..8749f41 100644 --- a/app/routers/documents.py +++ b/app/routers/documents.py @@ -1,5 +1,5 @@ from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, BackgroundTasks -from app.models import DocumentResponse, SuccessResponse +from app.models import DocumentResponse, SuccessResponse, UrlSourceCreate, UrlSourceResponse from app.database import get_supabase from app.dependencies import get_current_user from app.services.document_processor import process_document @@ -12,6 +12,7 @@ import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/chatbots/{chatbot_id}/documents", tags=["Documents"]) +url_router = APIRouter(prefix="/chatbots/{chatbot_id}/url-sources", tags=["URL Sources"]) ALLOWED_TYPES = { "application/pdf": ".pdf", @@ -206,3 +207,168 @@ async def delete_document(chatbot_id: str, document_id: str, user=Depends(get_cu supabase.table("documents").delete().eq("id", document_id).execute() return SuccessResponse(success=True, message="Document deleted") + + +# ── URL Sources ─────────────────────────────────────────────────────────────── + +@url_router.get("", response_model=List[UrlSourceResponse]) +async def list_url_sources(chatbot_id: str, user=Depends(get_current_user)): + supabase = get_supabase() + _get_user_chatbot(chatbot_id, user.id, supabase) + result = supabase.table("url_sources").select("*") \ + .eq("chatbot_id", chatbot_id) \ + .order("created_at", desc=True) \ + .execute() + return result.data or [] + + +@url_router.post("", status_code=201) +async def add_url_source( + chatbot_id: str, + data: UrlSourceCreate, + background_tasks: BackgroundTasks, + user=Depends(get_current_user), +): + from app.config import PLAN_LIMITS + supabase = get_supabase() + chatbot = _get_user_chatbot(chatbot_id, user.id, supabase) + + # Plan check + sub = supabase.table("subscriptions").select("plan").eq("user_id", user.id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + max_sources = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("url_sources", 0) + if max_sources == 0: + raise HTTPException(status_code=402, detail="URL sources require Starter plan or higher") + + # Count existing + existing = supabase.table("url_sources").select("id", count="exact").eq("chatbot_id", chatbot_id).execute() + if (existing.count or 0) >= max_sources: + raise HTTPException(status_code=402, detail=f"URL source limit reached ({max_sources}). Upgrade to add more.") + + source_id = str(uuid.uuid4()) + source_data = { + "id": source_id, + "chatbot_id": chatbot_id, + "url": data.url, + "status": "pending", + } + result = supabase.table("url_sources").insert(source_data).execute() + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create URL source") + + # Process in background + background_tasks.add_task( + _process_url_source, + source_id=source_id, + url=data.url, + chatbot=chatbot, + supabase=supabase, + ) + + return result.data[0] + + +@url_router.delete("/{source_id}", response_model=SuccessResponse) +async def delete_url_source(chatbot_id: str, source_id: str, user=Depends(get_current_user)): + supabase = get_supabase() + _get_user_chatbot(chatbot_id, user.id, supabase) + + source = supabase.table("url_sources").select("*").eq("id", source_id).eq("chatbot_id", chatbot_id).execute() + if not source.data: + raise HTTPException(status_code=404, detail="URL source not found") + + supabase.table("url_sources").delete().eq("id", source_id).execute() + return SuccessResponse(success=True, message="URL source deleted") + + +async def _process_url_source(source_id: str, url: str, chatbot: dict, supabase): + """Background task to scrape a URL and add its content to the vector store.""" + from app.services.web_scraper import scrape_url + from app.services.document_processor import chunk_text + from app.services.embeddings import embedding_service + from app.services.vector_store import vector_store + + try: + # Update status to processing + supabase.table("url_sources").update({"status": "processing"}).eq("id", source_id).execute() + + # Scrape URL + scraped = await scrape_url(url) + if "error" in scraped: + supabase.table("url_sources").update({ + "status": "failed", + "error_message": scraped["error"], + }).eq("id", source_id).execute() + return + + text = scraped["text"] + title = scraped.get("title", url) + collection_name = chatbot.get("qdrant_collection_name") + + if not collection_name: + supabase.table("url_sources").update({ + "status": "failed", + "error_message": "No vector store configured", + }).eq("id", source_id).execute() + return + + # Ensure collection exists + if not vector_store.collection_exists(collection_name): + vector_store.create_collection(collection_name) + + # Chunk text + chunks = chunk_text(text) + if not chunks: + supabase.table("url_sources").update({ + "status": "failed", + "error_message": "No content extracted", + }).eq("id", source_id).execute() + return + + # Embed and upsert + all_ids = [] + all_vectors = [] + all_payloads = [] + batch_size = 50 + + for i in range(0, len(chunks), batch_size): + batch = chunks[i:i + batch_size] + vectors = embedding_service.embed_batch(batch) + ids = [str(uuid.uuid4()) for _ in vectors] + payloads = [{ + "document_id": source_id, + "company_id": chatbot.get("company_id", ""), + "file_name": f"[URL] {title}", + "page_number": i // batch_size + 1, + "chunk_index": i + j, + "text": chunk, + "source_url": url, + } for j, chunk in enumerate(batch)] + all_ids.extend(ids) + all_vectors.extend(vectors) + all_payloads.extend(payloads) + + vector_store.upsert_vectors( + collection_name=collection_name, + vectors=all_vectors, + payloads=all_payloads, + ids=all_ids, + ) + + supabase.table("url_sources").update({ + "status": "completed", + "page_title": title, + "chunk_count": len(chunks), + }).eq("id", source_id).execute() + + logger.info(f"URL source {source_id} processed: {len(chunks)} chunks from {url}") + + except Exception as e: + logger.error(f"URL source processing error {source_id}: {e}") + supabase.table("url_sources").update({ + "status": "failed", + "error_message": str(e)[:500], + }).eq("id", source_id).execute() + + +router_url_sources = url_router diff --git a/app/routers/inbox.py b/app/routers/inbox.py new file mode 100644 index 0000000..e75c50e --- /dev/null +++ b/app/routers/inbox.py @@ -0,0 +1,162 @@ +from fastapi import APIRouter, HTTPException, Depends, Query +from app.database import get_supabase +from app.dependencies import get_current_user +from app.config import PLAN_LIMITS +from app.models import InboxConversation, InboxMessage +from typing import List, Optional +import logging + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/inbox", tags=["Inbox"]) + + +def _check_inbox_access(user_id: str, supabase): + sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + if plan not in ("starter", "business", "agency", "enterprise"): + raise HTTPException(status_code=402, detail="Conversation inbox requires Starter plan or higher") + return plan + + +def _get_user_company_id(user_id: str, supabase) -> str: + result = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Company not found") + return result.data[0]["id"] + + +@router.get("/conversations", response_model=List[InboxConversation]) +async def list_inbox_conversations( + chatbot_id: Optional[str] = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + user=Depends(get_current_user), +): + """List conversations for all (or one) of the user's chatbots.""" + supabase = get_supabase() + _check_inbox_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + # Get user's chatbots + chatbots_query = supabase.table("chatbots").select("id, name").eq("company_id", company_id) + if chatbot_id: + chatbots_query = chatbots_query.eq("id", chatbot_id) + chatbots_result = chatbots_query.execute() + chatbot_map = {c["id"]: c["name"] for c in (chatbots_result.data or [])} + + if not chatbot_map: + return [] + + chatbot_ids = list(chatbot_map.keys()) + offset = (page - 1) * limit + + # Query conversations + convs = supabase.table("conversations").select("*") \ + .in_("chatbot_id", chatbot_ids) \ + .order("created_at", desc=True) \ + .range(offset, offset + limit - 1) \ + .execute() + + results = [] + for conv in (convs.data or []): + cid = conv["id"] + + # Get first user message for preview + first_msg = supabase.table("messages").select("content") \ + .eq("conversation_id", cid) \ + .eq("role", "user") \ + .order("created_at", desc=False) \ + .limit(1) \ + .execute() + + first_message_text = first_msg.data[0]["content"] if first_msg.data else None + + results.append(InboxConversation( + id=cid, + chatbot_id=conv["chatbot_id"], + chatbot_name=chatbot_map.get(conv["chatbot_id"], "Unknown"), + session_id=conv.get("session_id"), + language=conv.get("language", "en"), + message_count=conv.get("message_count", 0), + first_message=first_message_text, + created_at=conv.get("created_at"), + )) + + return results + + +@router.get("/conversations/{conversation_id}") +async def get_inbox_conversation( + conversation_id: str, + user=Depends(get_current_user), +): + """Get a full conversation thread with all messages.""" + supabase = get_supabase() + _check_inbox_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + # Verify ownership + conv = supabase.table("conversations").select("*, chatbots(company_id, name)") \ + .eq("id", conversation_id) \ + .execute() + + if not conv.data: + raise HTTPException(status_code=404, detail="Conversation not found") + + c = conv.data[0] + chatbot_data = c.get("chatbots") or {} + if chatbot_data.get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + # Get messages + msgs = supabase.table("messages").select("*") \ + .eq("conversation_id", conversation_id) \ + .order("created_at", desc=False) \ + .execute() + + messages = [ + InboxMessage( + id=m["id"], + role=m["role"], + content=m["content"], + sources=m.get("sources"), + confidence_score=m.get("confidence_score"), + is_handoff=m.get("is_handoff", False), + created_at=m.get("created_at"), + ) + for m in (msgs.data or []) + ] + + return { + "conversation_id": conversation_id, + "chatbot_name": chatbot_data.get("name", "Unknown"), + "language": c.get("language", "en"), + "session_id": c.get("session_id"), + "created_at": c.get("created_at"), + "messages": [m.model_dump() for m in messages], + } + + +@router.delete("/conversations/{conversation_id}") +async def delete_inbox_conversation( + conversation_id: str, + user=Depends(get_current_user), +): + """Delete a conversation and all its messages.""" + supabase = get_supabase() + _check_inbox_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + # Verify ownership + conv = supabase.table("conversations").select("*, chatbots(company_id)") \ + .eq("id", conversation_id).execute() + + if not conv.data: + raise HTTPException(status_code=404, detail="Conversation not found") + + chatbot_data = conv.data[0].get("chatbots") or {} + if chatbot_data.get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + supabase.table("conversations").delete().eq("id", conversation_id).execute() + return {"success": True} diff --git a/app/routers/leads.py b/app/routers/leads.py new file mode 100644 index 0000000..625dd9e --- /dev/null +++ b/app/routers/leads.py @@ -0,0 +1,150 @@ +from fastapi import APIRouter, HTTPException, Depends, Query +from fastapi.responses import StreamingResponse +from app.database import get_supabase +from app.dependencies import get_current_user +from app.models import LeadCreate, LeadResponse +from typing import List, Optional +import uuid +import csv +import io +import logging + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/leads", tags=["Leads"]) + + +def _get_user_company_id(user_id: str, supabase) -> str: + result = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Company not found") + return result.data[0]["id"] + + +def _check_leads_access(user_id: str, supabase): + sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + if plan not in ("starter", "business", "agency", "enterprise"): + raise HTTPException(status_code=402, detail="Lead capture requires Starter plan or higher") + return plan + + +@router.get("", response_model=List[LeadResponse]) +async def list_leads( + chatbot_id: Optional[str] = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(50, ge=1, le=200), + user=Depends(get_current_user), +): + """List leads for the user's chatbots.""" + supabase = get_supabase() + _check_leads_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + # Get owned chatbot IDs + chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id) + if chatbot_id: + chatbots_q = chatbots_q.eq("id", chatbot_id) + chatbots = chatbots_q.execute() + chatbot_ids = [c["id"] for c in (chatbots.data or [])] + + if not chatbot_ids: + return [] + + offset = (page - 1) * limit + result = supabase.table("leads").select("*") \ + .in_("chatbot_id", chatbot_ids) \ + .order("created_at", desc=True) \ + .range(offset, offset + limit - 1) \ + .execute() + + return [LeadResponse(**lead) for lead in (result.data or [])] + + +@router.get("/export") +async def export_leads_csv( + chatbot_id: Optional[str] = Query(None), + user=Depends(get_current_user), +): + """Export leads as CSV file.""" + supabase = get_supabase() + _check_leads_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id) + if chatbot_id: + chatbots_q = chatbots_q.eq("id", chatbot_id) + chatbots = chatbots_q.execute() + chatbot_ids = [c["id"] for c in (chatbots.data or [])] + + if not chatbot_ids: + leads_data = [] + else: + result = supabase.table("leads").select("*") \ + .in_("chatbot_id", chatbot_ids) \ + .order("created_at", desc=True) \ + .execute() + leads_data = result.data or [] + + # Build CSV + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=["id", "chatbot_id", "email", "name", "phone", "company", "created_at"]) + writer.writeheader() + for lead in leads_data: + writer.writerow({ + "id": lead.get("id", ""), + "chatbot_id": lead.get("chatbot_id", ""), + "email": lead.get("email", ""), + "name": lead.get("name", ""), + "phone": lead.get("phone", ""), + "company": lead.get("company", ""), + "created_at": lead.get("created_at", ""), + }) + + csv_bytes = output.getvalue().encode("utf-8") + return StreamingResponse( + iter([csv_bytes]), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=leads.csv"}, + ) + + +# Public endpoint — no auth required +leads_public_router = APIRouter(tags=["Leads"]) + + +@leads_public_router.post("/chatbots/{chatbot_id}/leads", response_model=LeadResponse, status_code=201) +async def submit_lead(chatbot_id: str, data: LeadCreate): + """Submit a lead for a chatbot (public endpoint, no auth required).""" + supabase = get_supabase() + + # Verify chatbot exists + chatbot = supabase.table("chatbots").select("id, lead_capture_enabled").eq("id", chatbot_id).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + if not chatbot.data[0].get("lead_capture_enabled", False): + raise HTTPException(status_code=400, detail="Lead capture not enabled for this chatbot") + + # Deduplicate by email+chatbot_id + if data.email: + existing = supabase.table("leads").select("*") \ + .eq("chatbot_id", chatbot_id) \ + .eq("email", data.email) \ + .execute() + if existing.data: + return LeadResponse(**existing.data[0]) + + lead_data = { + "id": str(uuid.uuid4()), + "chatbot_id": chatbot_id, + "conversation_id": data.conversation_id, + "email": data.email, + "name": data.name, + "phone": data.phone, + "company": data.company, + } + + result = supabase.table("leads").insert(lead_data).execute() + if not result.data: + raise HTTPException(status_code=500, detail="Failed to save lead") + + return LeadResponse(**result.data[0]) diff --git a/app/routers/models.py b/app/routers/models.py index 680da57..9a26158 100644 --- a/app/routers/models.py +++ b/app/routers/models.py @@ -91,12 +91,12 @@ async def get_available_models(user=Depends(get_current_user)): )) # Determine upgrade messaging - has_premium = plan in ("pro", "enterprise") + has_premium = plan in ("business", "agency", "enterprise") upgrade_label = None if plan == "free": - upgrade_label = "Upgrade to Starter for more models and publishing" + upgrade_label = "Upgrade to Starter for more AI models and messaging channels" elif plan == "starter": - upgrade_label = "Upgrade to Pro for GPT-4o, Claude, Gemini" + upgrade_label = "Upgrade to Business for GPT-4o, Claude, Gemini and WhatsApp" return ModelsResponse( models=models, diff --git a/app/routers/upload.py b/app/routers/upload.py new file mode 100644 index 0000000..ba5d024 --- /dev/null +++ b/app/routers/upload.py @@ -0,0 +1,56 @@ +from fastapi import APIRouter, HTTPException, Depends, UploadFile, File +from app.database import get_supabase +from app.dependencies import get_current_user +from app.config import settings +import uuid +import logging + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/upload", tags=["Upload"]) + +ALLOWED_IMAGE_TYPES = {"image/png", "image/jpeg", "image/jpg", "image/gif", "image/svg+xml", "image/webp"} +MAX_LOGO_SIZE = 2 * 1024 * 1024 # 2MB + + +@router.post("/logo") +async def upload_logo( + file: UploadFile = File(...), + user=Depends(get_current_user), +): + """Upload a chatbot logo to Supabase Storage. Returns public URL.""" + supabase = get_supabase() + + # Validate content type + if file.content_type not in ALLOWED_IMAGE_TYPES: + raise HTTPException( + status_code=400, + detail=f"Invalid file type. Allowed: PNG, JPG, GIF, SVG, WebP" + ) + + # Read file + file_bytes = await file.read() + if len(file_bytes) > MAX_LOGO_SIZE: + raise HTTPException(status_code=413, detail="Image must be under 2MB") + + # Determine extension + content_type = file.content_type or "image/png" + ext_map = { + "image/png": "png", "image/jpeg": "jpg", "image/jpg": "jpg", + "image/gif": "gif", "image/svg+xml": "svg", "image/webp": "webp" + } + ext = ext_map.get(content_type, "png") + + # Upload to Supabase Storage + path = f"{user.id}/{uuid.uuid4().hex}.{ext}" + try: + result = supabase.storage.from_("logos").upload( + path=path, + file=file_bytes, + file_options={"content-type": content_type, "upsert": "true"}, + ) + # Get public URL + public_url = supabase.storage.from_("logos").get_public_url(path) + return {"url": public_url} + except Exception as e: + logger.error(f"Logo upload failed: {e}") + raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)[:200]}") diff --git a/app/services/n8n_service.py b/app/services/n8n_service.py new file mode 100644 index 0000000..77123a2 --- /dev/null +++ b/app/services/n8n_service.py @@ -0,0 +1,62 @@ +import httpx +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +async def send_notification( + event_type: str, + data: Dict[str, Any], + webhook_url: Optional[str] = None, +) -> bool: + """ + Generic n8n notification sender. + Returns True if sent, False if not configured or failed. + """ + if not webhook_url: + return False + + payload = { + "event": event_type, + "timestamp": datetime.utcnow().isoformat(), + **data, + } + + try: + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(webhook_url, json=payload) + response.raise_for_status() + logger.info(f"n8n notification sent: {event_type}") + return True + except Exception as e: + logger.error(f"Failed to send n8n notification ({event_type}): {e}") + return False + + +async def send_handoff_notification( + chatbot_name: str, + owner_email: str, + conversation_history: List[dict], + trigger_message: str, + chatbot_id: str, + conversation_id: str, + webhook_url: Optional[str] = None, +) -> bool: + """ + Send a human handoff notification to the configured n8n webhook. + Returns True if sent, False if not configured or failed. + """ + return await send_notification( + event_type="handoff", + data={ + "chatbot_name": chatbot_name, + "owner_email": owner_email, + "trigger_message": trigger_message, + "conversation_history": conversation_history[-10:], + "chatbot_id": chatbot_id, + "conversation_id": conversation_id, + }, + webhook_url=webhook_url, + ) diff --git a/app/services/telegram_service.py b/app/services/telegram_service.py new file mode 100644 index 0000000..de99195 --- /dev/null +++ b/app/services/telegram_service.py @@ -0,0 +1,57 @@ +import httpx +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + +_BASE = "https://api.telegram.org/bot{token}" + + +async def get_bot_info(bot_token: str) -> Optional[dict]: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.get(f"https://api.telegram.org/bot{bot_token}/getMe") + data = r.json() + if data.get("ok"): + return data["result"] + except Exception as e: + logger.error(f"Telegram getMe error: {e}") + return None + + +async def set_webhook(bot_token: str, webhook_url: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post( + f"https://api.telegram.org/bot{bot_token}/setWebhook", + json={"url": webhook_url, "allowed_updates": ["message"]}, + ) + return r.json().get("ok", False) + except Exception as e: + logger.error(f"Telegram setWebhook error: {e}") + return False + + +async def delete_webhook(bot_token: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post( + f"https://api.telegram.org/bot{bot_token}/deleteWebhook" + ) + return r.json().get("ok", False) + except Exception as e: + logger.error(f"Telegram deleteWebhook error: {e}") + return False + + +async def send_message(bot_token: str, chat_id, text: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post( + f"https://api.telegram.org/bot{bot_token}/sendMessage", + json={"chat_id": chat_id, "text": text}, + ) + return r.json().get("ok", False) + except Exception as e: + logger.error(f"Telegram sendMessage error: {e}") + return False diff --git a/app/services/web_scraper.py b/app/services/web_scraper.py new file mode 100644 index 0000000..ee1cc6c --- /dev/null +++ b/app/services/web_scraper.py @@ -0,0 +1,65 @@ +import httpx +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + +MAX_TEXT_BYTES = 100 * 1024 # 100KB + + +async def scrape_url(url: str) -> dict: + """ + Fetch a URL and extract clean text content. + Returns: {title, text, url} or {error, url} + """ + try: + from bs4 import BeautifulSoup + + headers = { + "User-Agent": "Mozilla/5.0 (compatible; ContextaBot/1.0; +https://contexta.ai)", + } + + async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: + response = await client.get(url, headers=headers) + response.raise_for_status() + + content_type = response.headers.get("content-type", "") + if "text/html" not in content_type and "text/plain" not in content_type: + return {"error": f"Unsupported content type: {content_type}", "url": url} + + html = response.text + soup = BeautifulSoup(html, "html.parser") + + # Extract title + title_tag = soup.find("title") + title = title_tag.get_text(strip=True) if title_tag else "" + + # Remove unwanted tags + for tag in soup.find_all(["nav", "header", "footer", "script", "style", "noscript", "aside", "advertisement"]): + tag.decompose() + + # Extract main content (prefer article/main/body) + main = soup.find("main") or soup.find("article") or soup.find("body") or soup + text = main.get_text(separator="\n", strip=True) + + # Clean up whitespace + lines = [line.strip() for line in text.splitlines() if line.strip()] + text = "\n".join(lines) + + # Limit size + if len(text.encode("utf-8")) > MAX_TEXT_BYTES: + text = text[:MAX_TEXT_BYTES].rsplit("\n", 1)[0] + + if not text: + return {"error": "No text content found on page", "url": url} + + logger.info(f"Scraped {url}: {len(text)} chars, title='{title}'") + return {"title": title, "text": text, "url": url} + + except httpx.TimeoutException: + return {"error": "Request timed out", "url": url} + except httpx.HTTPStatusError as e: + return {"error": f"HTTP {e.response.status_code}", "url": url} + except Exception as e: + logger.error(f"Scrape error for {url}: {e}") + return {"error": str(e)[:200], "url": url} diff --git a/app/services/whatsapp_service.py b/app/services/whatsapp_service.py new file mode 100644 index 0000000..708a850 --- /dev/null +++ b/app/services/whatsapp_service.py @@ -0,0 +1,36 @@ +import httpx +import hashlib +import hmac +import logging + +logger = logging.getLogger(__name__) + +_META_API = "https://graph.facebook.com/v19.0" + + +async def send_message(phone_number_id: str, to: str, text: str, access_token: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post( + f"{_META_API}/{phone_number_id}/messages", + headers={"Authorization": f"Bearer {access_token}"}, + json={ + "messaging_product": "whatsapp", + "to": to, + "type": "text", + "text": {"body": text}, + }, + ) + return r.status_code == 200 + except Exception as e: + logger.error(f"WhatsApp send error: {e}") + return False + + +def verify_signature(payload: bytes, signature: str, app_secret: str) -> bool: + expected = "sha256=" + hmac.new( + app_secret.encode("utf-8"), + payload, + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(expected, signature) diff --git a/app/services/widget.py b/app/services/widget.py new file mode 100644 index 0000000..9cb9906 --- /dev/null +++ b/app/services/widget.py @@ -0,0 +1,140 @@ +def generate_widget_js(app_url: str) -> str: + """Generate the embeddable widget JavaScript with app_url baked in.""" + return f""" +(function() {{ + var APP_URL = "{app_url}"; + + // Find script tag to get chatbot ID + var scripts = document.querySelectorAll('script[data-chatbot]'); + var chatbotId = null; + if (scripts.length > 0) {{ + chatbotId = scripts[scripts.length - 1].getAttribute('data-chatbot'); + }} + if (!chatbotId) {{ + console.warn('[Contexta] No data-chatbot attribute found on script tag'); + return; + }} + + // Styles + var style = document.createElement('style'); + style.textContent = ` + .contexta-btn {{ + position: fixed; + bottom: 24px; + right: 24px; + width: 56px; + height: 56px; + border-radius: 50%; + background: #6366f1; + border: none; + cursor: pointer; + box-shadow: 0 4px 12px rgba(0,0,0,0.2); + display: flex; + align-items: center; + justify-content: center; + z-index: 999998; + transition: transform 0.2s, box-shadow 0.2s; + }} + .contexta-btn:hover {{ + transform: scale(1.08); + box-shadow: 0 6px 20px rgba(0,0,0,0.25); + }} + .contexta-btn svg {{ + width: 26px; + height: 26px; + fill: white; + }} + .contexta-container {{ + position: fixed; + bottom: 92px; + right: 24px; + width: 380px; + height: 580px; + border-radius: 16px; + box-shadow: 0 8px 32px rgba(0,0,0,0.15); + z-index: 999999; + overflow: hidden; + display: none; + flex-direction: column; + border: 1px solid #e5e7eb; + }} + .contexta-container.open {{ + display: flex; + }} + .contexta-iframe {{ + width: 100%; + height: 100%; + border: none; + flex: 1; + }} + .contexta-close {{ + position: absolute; + top: 8px; + right: 8px; + background: rgba(0,0,0,0.3); + border: none; + color: white; + border-radius: 50%; + width: 24px; + height: 24px; + cursor: pointer; + font-size: 14px; + display: flex; + align-items: center; + justify-content: center; + z-index: 1; + }} + @media (max-width: 480px) {{ + .contexta-container {{ + bottom: 0; + right: 0; + width: 100vw; + height: 100vh; + border-radius: 0; + }} + }} + `; + document.head.appendChild(style); + + // Button + var btn = document.createElement('button'); + btn.className = 'contexta-btn'; + btn.setAttribute('aria-label', 'Open chat'); + btn.innerHTML = ''; + document.body.appendChild(btn); + + // Container with iframe + var container = document.createElement('div'); + container.className = 'contexta-container'; + + var closeBtn = document.createElement('button'); + closeBtn.className = 'contexta-close'; + closeBtn.innerHTML = '×'; + closeBtn.setAttribute('aria-label', 'Close chat'); + container.appendChild(closeBtn); + + var iframe = document.createElement('iframe'); + iframe.className = 'contexta-iframe'; + iframe.src = APP_URL + '/chat/' + chatbotId; + iframe.setAttribute('allow', 'microphone'); + container.appendChild(iframe); + + document.body.appendChild(container); + + // Toggle logic + var isOpen = false; + function toggle() {{ + isOpen = !isOpen; + if (isOpen) {{ + container.classList.add('open'); + btn.innerHTML = ''; + }} else {{ + container.classList.remove('open'); + btn.innerHTML = ''; + }} + }} + + btn.addEventListener('click', toggle); + closeBtn.addEventListener('click', toggle); +}})(); +""" diff --git a/pyproject.toml b/pyproject.toml index 10cf84b..47f5c81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,4 +13,14 @@ dependencies = [ "stripe>=14.3.0", "supabase>=2.28.0", "uvicorn>=0.41.0", + "beautifulsoup4>=4.12.0", + "httpx>=0.27.0", + "anthropic>=0.40.0", + "google-generativeai>=0.8.0", + "python-docx>=1.1.0", + "pypdf>=4.0.0", + "openpyxl>=3.1.0", + "pandas>=2.2.0", + "python-multipart>=0.0.9", + "pydantic-settings>=2.0.0", ] diff --git a/supabase_migration.sql b/supabase_migration.sql new file mode 100644 index 0000000..549df80 --- /dev/null +++ b/supabase_migration.sql @@ -0,0 +1,119 @@ +-- Contexta DB Migration +-- Run this in your Supabase SQL Editor + +-- ── chatbots: new columns ───────────────────────────────────────────────────── +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS logo_url TEXT; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS show_branding BOOLEAN DEFAULT TRUE; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS lead_capture_enabled BOOLEAN DEFAULT FALSE; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS lead_capture_fields JSONB DEFAULT '["email"]'; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS lead_capture_trigger VARCHAR(50) DEFAULT 'after_first_message'; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS handoff_enabled BOOLEAN DEFAULT FALSE; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS handoff_message TEXT DEFAULT 'I''ll connect you with our team. Please wait.'; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS handoff_email TEXT; +ALTER TABLE chatbots ADD COLUMN IF NOT EXISTS handoff_keywords JSONB DEFAULT '["human", "agent", "speak to someone", "talk to a person", "real person"]'; + +-- ── messages: new columns ───────────────────────────────────────────────────── +ALTER TABLE messages ADD COLUMN IF NOT EXISTS confidence_score DECIMAL(4,3) DEFAULT NULL; +ALTER TABLE messages ADD COLUMN IF NOT EXISTS is_handoff BOOLEAN DEFAULT FALSE; + +-- ── leads table ─────────────────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS leads ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID REFERENCES chatbots(id) ON DELETE CASCADE, + conversation_id UUID REFERENCES conversations(id) ON DELETE SET NULL, + email VARCHAR(255), + name VARCHAR(255), + phone VARCHAR(50), + company VARCHAR(255), + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_leads_chatbot ON leads(chatbot_id); +CREATE INDEX IF NOT EXISTS idx_leads_created ON leads(created_at DESC); +ALTER TABLE leads ENABLE ROW LEVEL SECURITY; +CREATE POLICY "leads_owner" ON leads FOR ALL USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +); + +-- ── url_sources table ───────────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS url_sources ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID REFERENCES chatbots(id) ON DELETE CASCADE, + url TEXT NOT NULL, + status VARCHAR(50) DEFAULT 'pending', + page_title TEXT, + chunk_count INT DEFAULT 0, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_url_sources_chatbot ON url_sources(chatbot_id); +ALTER TABLE url_sources ENABLE ROW LEVEL SECURITY; +CREATE POLICY "url_sources_owner" ON url_sources FOR ALL USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +); + +-- ── message_feedback table ──────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS message_feedback ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + message_id UUID REFERENCES messages(id) ON DELETE CASCADE, + chatbot_id UUID REFERENCES chatbots(id) ON DELETE CASCADE, + feedback VARCHAR(20) NOT NULL CHECK (feedback IN ('positive', 'negative')), + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_feedback_chatbot ON message_feedback(chatbot_id); +ALTER TABLE message_feedback ENABLE ROW LEVEL SECURITY; +CREATE POLICY "feedback_insert" ON message_feedback FOR INSERT WITH CHECK (true); +CREATE POLICY "feedback_select_owner" ON message_feedback FOR SELECT USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +); + +-- ── Supabase Storage ────────────────────────────────────────────────────────── +-- Create the 'logos' bucket manually in the Supabase dashboard: +-- Storage → New bucket → Name: logos → Public: ON + +-- ── channel_connections table ───────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS channel_connections ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID NOT NULL REFERENCES chatbots(id) ON DELETE CASCADE, + channel VARCHAR(20) NOT NULL CHECK (channel IN ('telegram', 'whatsapp')), + bot_token TEXT, + bot_username TEXT, + wa_keyword VARCHAR(50), + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(chatbot_id, channel) +); +CREATE INDEX IF NOT EXISTS idx_channel_connections_chatbot ON channel_connections(chatbot_id); +CREATE INDEX IF NOT EXISTS idx_channel_connections_wa_keyword ON channel_connections(wa_keyword) WHERE channel = 'whatsapp'; +ALTER TABLE channel_connections ENABLE ROW LEVEL SECURITY; +CREATE POLICY "channel_connections_owner" ON channel_connections FOR ALL USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +); + +-- ── channel_sessions table ──────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS channel_sessions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID NOT NULL REFERENCES chatbots(id) ON DELETE CASCADE, + channel VARCHAR(20) NOT NULL, + external_id TEXT NOT NULL, + session_id TEXT NOT NULL, + last_active TIMESTAMPTZ DEFAULT NOW(), + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(channel, external_id) +); +CREATE INDEX IF NOT EXISTS idx_channel_sessions_lookup ON channel_sessions(channel, external_id); diff --git a/supabase_migration_channels.sql b/supabase_migration_channels.sql new file mode 100644 index 0000000..36ea149 --- /dev/null +++ b/supabase_migration_channels.sql @@ -0,0 +1,48 @@ +-- Contexta — Channels Migration +-- Run this in your Supabase SQL Editor + +-- ── channel_connections table ───────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS channel_connections ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID NOT NULL REFERENCES chatbots(id) ON DELETE CASCADE, + channel VARCHAR(20) NOT NULL CHECK (channel IN ('telegram', 'whatsapp')), + bot_token TEXT, + bot_username TEXT, + wa_keyword VARCHAR(50), + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(chatbot_id, channel) +); +CREATE INDEX IF NOT EXISTS idx_channel_connections_chatbot ON channel_connections(chatbot_id); +CREATE INDEX IF NOT EXISTS idx_channel_connections_wa_keyword ON channel_connections(wa_keyword) WHERE channel = 'whatsapp'; +ALTER TABLE channel_connections ENABLE ROW LEVEL SECURITY; +CREATE POLICY "channel_connections_owner" ON channel_connections FOR ALL USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +); + +-- ── channel_sessions table ──────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS channel_sessions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + chatbot_id UUID NOT NULL REFERENCES chatbots(id) ON DELETE CASCADE, + channel VARCHAR(20) NOT NULL, + external_id TEXT NOT NULL, + session_id TEXT NOT NULL, + last_active TIMESTAMPTZ DEFAULT NOW(), + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(channel, external_id) +); +CREATE INDEX IF NOT EXISTS idx_channel_sessions_lookup ON channel_sessions(channel, external_id); +ALTER TABLE channel_sessions ENABLE ROW LEVEL SECURITY; +-- Webhook handlers use the service_role key so they bypass RLS. +-- This policy lets authenticated owners read their own sessions via the dashboard. +CREATE POLICY "channel_sessions_owner" ON channel_sessions FOR SELECT USING ( + chatbot_id IN ( + SELECT c.id FROM chatbots c + JOIN companies co ON c.company_id = co.id + WHERE co.owner_id = auth.uid() + ) +);