From 92d4c2fc5e36e9a816eea5db6ada8277646f0843 Mon Sep 17 00:00:00 2001 From: belviskhoremk Date: Fri, 3 Apr 2026 09:11:58 +0000 Subject: [PATCH] feat: add appointments, campaigns, admin, storage, tests and various updates - Add new routers: admin, appointments, campaigns - Add storage service and logging config - Add migrations directory and test suite with pytest config - Add supabase_migration_features.sql - Update models, dependencies, config, and existing routers - Remove whatsapp_service (deleted) - Update pyproject.toml and uv.lock dependencies Co-Authored-By: Claude Sonnet 4.6 --- app/config.py | 89 ++-- app/dependencies.py | 42 +- app/logging_config.py | 34 ++ app/main.py | 39 +- app/models.py | 223 +++++++++- app/routers/admin.py | 555 +++++++++++++++++++++++ app/routers/analytics.py | 117 +++-- app/routers/appointments.py | 287 ++++++++++++ app/routers/auth.py | 22 + app/routers/billing.py | 21 + app/routers/campaigns.py | 167 +++++++ app/routers/channels.py | 235 +--------- app/routers/chat.py | 26 ++ app/routers/chatbots.py | 24 +- app/routers/documents.py | 63 +++ app/routers/inbox.py | 66 ++- app/routers/leads.py | 36 +- app/routers/models.py | 2 +- app/services/rag.py | 18 +- app/services/storage.py | 46 ++ app/services/whatsapp_service.py | 36 -- app/services/widget.py | 320 ++++++++------ migrations/001_user_profiles.sql | 42 ++ migrations/002_messages_confidence.sql | 8 + migrations/003_stripe_idempotency.sql | 13 + pyproject.toml | 4 + pytest.ini | 3 + supabase_migration.sql | 4 +- supabase_migration_channels.sql | 4 +- supabase_migration_features.sql | 97 +++++ tests/__init__.py | 0 tests/conftest.py | 77 ++++ tests/test_admin.py | 94 ++++ tests/test_analytics.py | 255 +++++++++++ tests/test_appointments.py | 554 +++++++++++++++++++++++ tests/test_auth.py | 119 +++++ tests/test_billing.py | 81 ++++ tests/test_campaigns.py | 453 +++++++++++++++++++ tests/test_channels.py | 258 +++++++++++ tests/test_chat.py | 289 ++++++++++++ tests/test_chatbots.py | 90 ++++ tests/test_config_plans.py | 285 ++++++++++++ tests/test_documents.py | 267 ++++++++++++ tests/test_inbox.py | 428 ++++++++++++++++++ tests/test_leads.py | 405 +++++++++++++++++ tests/test_marketplace.py | 212 +++++++++ tests/test_models.py | 107 +++++ tests/test_rag.py | 118 +++++ tests/test_upload.py | 125 ++++++ tests/test_widget.py | 151 +++++++ uv.lock | 580 ++++++++++++++++++++++++- 51 files changed, 7076 insertions(+), 515 deletions(-) create mode 100644 app/logging_config.py create mode 100644 app/routers/admin.py create mode 100644 app/routers/appointments.py create mode 100644 app/routers/campaigns.py create mode 100644 app/services/storage.py delete mode 100644 app/services/whatsapp_service.py create mode 100644 migrations/001_user_profiles.sql create mode 100644 migrations/002_messages_confidence.sql create mode 100644 migrations/003_stripe_idempotency.sql create mode 100644 pytest.ini create mode 100644 supabase_migration_features.sql create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_admin.py create mode 100644 tests/test_analytics.py create mode 100644 tests/test_appointments.py create mode 100644 tests/test_auth.py create mode 100644 tests/test_billing.py create mode 100644 tests/test_campaigns.py create mode 100644 tests/test_channels.py create mode 100644 tests/test_chat.py create mode 100644 tests/test_chatbots.py create mode 100644 tests/test_config_plans.py create mode 100644 tests/test_documents.py create mode 100644 tests/test_inbox.py create mode 100644 tests/test_leads.py create mode 100644 tests/test_marketplace.py create mode 100644 tests/test_models.py create mode 100644 tests/test_rag.py create mode 100644 tests/test_upload.py create mode 100644 tests/test_widget.py diff --git a/app/config.py b/app/config.py index ebdf32b..1867743 100644 --- a/app/config.py +++ b/app/config.py @@ -55,13 +55,6 @@ class Settings(BaseSettings): # Supabase Storage supabase_storage_url: str = "" - # WhatsApp Cloud API - whatsapp_access_token: str = "" - whatsapp_phone_number_id: str = "" - whatsapp_verify_token: str = "contexta_whatsapp_verify" - whatsapp_app_secret: str = "" - whatsapp_display_number: str = "" # E.164 without '+', e.g. "15551234567" - @property def allowed_origins_list(self) -> List[str]: return [o.strip() for o in self.allowed_origins.split(",")] @@ -188,7 +181,7 @@ DEFAULT_MODELS = { # ═══════════════════════════════════════════════════════════════════════════════ -# PLAN LIMITS — Pricing: Starter $12/mo, Business $29/mo, Agency $79/mo +# PLAN LIMITS — Pricing: Starter $19/mo, Business $49/mo, Agency $99/mo # ═══════════════════════════════════════════════════════════════════════════════ # # Cost analysis (per 1M tokens approx): @@ -207,9 +200,9 @@ DEFAULT_MODELS = { # Fireworks models: ~$0.001-$0.004 per conversation # GPT-4o: ~$0.015 per conversation # -# Starter $12/mo, 1500 convos: max cost ~$6/mo (fireworks mix) → margin OK -# Business $29/mo, 5000 convos: max cost ~$15/mo (mixed models) → margin OK -# Agency $79/mo, 20000 convos: max cost ~$30/mo (fireworks) → healthy margin +# Starter $19/mo, 1500 convos: max cost ~$6/mo (fireworks mix) → margin OK +# Business $49/mo, 5000 convos: max cost ~$15/mo (mixed models) → margin OK +# Agency $99/mo, 20000 convos: max cost ~$30/mo (fireworks) → healthy margin # ═══════════════════════════════════════════════════════════════════════════════ _ALL_FIREWORKS = [ @@ -229,52 +222,76 @@ PLAN_LIMITS = { # Build, test, and go live with one chatbot — no card needed. "free": { "max_chatbots": 999999, - "max_published": 1, # can publish 1 chatbot + "max_published": 1, # can publish 1 chatbot "max_documents_per_chatbot": 3, "max_document_size_mb": 5, "models": ["accounts/fireworks/models/llama-v3p3-70b-instruct"], - "conversations_limit": 100, # 100 real conversations/month + "conversations_limit": 100, # 100 real conversations/month "code_export": False, "analytics": False, - "channels": [], # no messaging channels + "gap_suggestions": False, + "channels": [], # no messaging channels "url_sources": 0, "leads_per_month": 0, - "show_branding": True, # cannot remove badge + "inbox_replies": False, # read-only inbox + "leads_editing": False, # view-only leads + "show_branding": True, # cannot remove badge + "appointments": False, + "appointments_chatbots": 0, + "campaigns": False, + "campaigns_per_month": 0, + "max_campaign_recipients": 0, }, - # ── Starter $12/mo ─────────────────────────────────────────────────────── - # For individuals and solo businesses going live. + # ── Starter $19/mo ─────────────────────────────────────────────────────── + # For solo operators: live chat, leads, booking, and campaigns. "starter": { "max_chatbots": 999999, - "max_published": 1, + "max_published": 3, "max_documents_per_chatbot": 10, "max_document_size_mb": 10, "models": _ALL_FIREWORKS, "conversations_limit": 1500, "code_export": False, "analytics": True, + "gap_suggestions": False, "channels": ["telegram"], "url_sources": 5, "leads_per_month": 500, - "show_branding": True, # badge stays + "inbox_replies": True, + "leads_editing": True, + "show_branding": True, # badge stays on Starter + "appointments": True, + "appointments_chatbots": 1, # booking on 1 chatbot + "campaigns": True, + "campaigns_per_month": 3, + "max_campaign_recipients": 500, }, - # ── Business $29/mo ────────────────────────────────────────────────────── - # For growing businesses that need more chatbots and WhatsApp reach. + # ── Business $49/mo ────────────────────────────────────────────────────── + # For growing businesses: premium AI, unlimited booking, full analytics. "business": { "max_chatbots": 999999, - "max_published": 3, + "max_published": 10, "max_documents_per_chatbot": 50, "max_document_size_mb": 50, "models": _ALL_FIREWORKS + _ALL_PREMIUM, "conversations_limit": 5000, "code_export": False, "analytics": True, - "channels": ["telegram", "whatsapp"], + "gap_suggestions": True, + "channels": ["telegram"], "url_sources": 999999, "leads_per_month": 999999, - "show_branding": False, # can remove badge + "inbox_replies": True, + "leads_editing": True, + "show_branding": False, # can remove badge + "appointments": True, + "appointments_chatbots": 999999, + "campaigns": True, + "campaigns_per_month": 999999, + "max_campaign_recipients": 5000, }, - # ── Agency $79/mo ──────────────────────────────────────────────────────── - # For agencies and large businesses managing many chatbots. + # ── Agency $99/mo ──────────────────────────────────────────────────────── + # For agencies: unlimited everything, unlimited campaign recipients. "agency": { "max_chatbots": 999999, "max_published": 999999, @@ -284,10 +301,18 @@ PLAN_LIMITS = { "conversations_limit": 20000, "code_export": True, "analytics": True, - "channels": ["telegram", "whatsapp"], + "gap_suggestions": True, + "channels": ["telegram"], "url_sources": 999999, "leads_per_month": 999999, + "inbox_replies": True, + "leads_editing": True, "show_branding": False, + "appointments": True, + "appointments_chatbots": 999999, + "campaigns": True, + "campaigns_per_month": 999999, + "max_campaign_recipients": 999999, }, # ── Enterprise (custom) ─────────────────────────────────────────────────── "enterprise": { @@ -299,9 +324,17 @@ PLAN_LIMITS = { "conversations_limit": 999999, "code_export": True, "analytics": True, - "channels": ["telegram", "whatsapp"], + "gap_suggestions": True, + "channels": ["telegram"], "url_sources": 999999, "leads_per_month": 999999, + "inbox_replies": True, + "leads_editing": True, "show_branding": False, + "appointments": True, + "appointments_chatbots": 999999, + "campaigns": True, + "campaigns_per_month": 999999, + "max_campaign_recipients": 999999, }, } \ No newline at end of file diff --git a/app/dependencies.py b/app/dependencies.py index a28ce3e..b9ba036 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -29,7 +29,24 @@ async def get_current_user( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", ) - return response.user + user = response.user + + # Check for suspension + try: + profile = supabase.table("user_profiles").select("suspended_at").eq("user_id", user.id).execute() + if profile.data and profile.data[0].get("suspended_at"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Account suspended. Please contact support.", + ) + except HTTPException: + raise + except Exception: + pass # Don't block login if profile lookup fails + + return user + except HTTPException: + raise except Exception as e: logger.error(f"Auth error: {e}") raise HTTPException( @@ -38,6 +55,29 @@ async def get_current_user( ) +async def get_admin_user( + current_user=Depends(get_current_user), +): + """Require the current user to be an admin.""" + supabase = get_supabase() + try: + profile = supabase.table("user_profiles").select("is_admin").eq("user_id", current_user.id).execute() + if not profile.data or not profile.data[0].get("is_admin"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin access required", + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Admin check failed: {e}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin access required", + ) + return current_user + + async def get_optional_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), ): diff --git a/app/logging_config.py b/app/logging_config.py new file mode 100644 index 0000000..d97ef27 --- /dev/null +++ b/app/logging_config.py @@ -0,0 +1,34 @@ +import logging +import os + + +def configure_logging(): + """Configure structured JSON logging for the application.""" + log_level = logging.DEBUG if os.getenv("APP_ENV", "development") == "development" else logging.INFO + + try: + from pythonjsonlogger import jsonlogger + + handler = logging.StreamHandler() + formatter = jsonlogger.JsonFormatter( + fmt="%(asctime)s %(levelname)s %(name)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + rename_fields={"asctime": "timestamp", "levelname": "level", "name": "logger"}, + ) + handler.setFormatter(formatter) + except ImportError: + # Fallback to plain text if pythonjsonlogger not installed yet + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.addHandler(handler) + root_logger.setLevel(log_level) + + # Silence noisy third-party loggers + logging.getLogger("uvicorn.access").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) diff --git a/app/main.py b/app/main.py index 7f1876e..314b1e5 100644 --- a/app/main.py +++ b/app/main.py @@ -4,17 +4,18 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response import logging +from app.logging_config import configure_logging +configure_logging() # Must be called before any logger is created + from app.config import settings from app.routers import auth, chatbots, documents, chat, marketplace, billing, models, analytics, inbox, leads, upload from app.routers.documents import router_url_sources from app.routers.leads import leads_public_router from app.routers.channels import router as channels_router, webhook_router as channels_webhook_router +from app.routers import admin as admin_router +from app.routers.appointments import router as appointments_router, public_router as appointments_public_router +from app.routers.campaigns import router as campaigns_router -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) logger = logging.getLogger(__name__) @@ -62,13 +63,28 @@ app.include_router(router_url_sources, prefix="/api/v1") app.include_router(leads_public_router, prefix="/api/v1") app.include_router(channels_router, prefix="/api/v1") app.include_router(channels_webhook_router, prefix="/api/v1") +app.include_router(appointments_router, prefix="/api/v1") +app.include_router(appointments_public_router, prefix="/api/v1") +app.include_router(campaigns_router, prefix="/api/v1") +app.include_router(admin_router.router, prefix="/api/v1") # ── Widget ───────────────────────────────────────────────────────────────────── -@app.get("/widget.js") +@app.get("/widget.js", include_in_schema=False) async def serve_widget(): from app.services.widget import generate_widget_js - return Response(generate_widget_js(settings.app_url), media_type="application/javascript") + return Response( + content=generate_widget_js(settings.app_url), + media_type="application/javascript", + headers={ + # Allow any site to load this script tag cross-origin + "Access-Control-Allow-Origin": "*", + # Cache for 1 hour in browsers / CDN; revalidate when stale + "Cache-Control": "public, max-age=3600, stale-while-revalidate=86400", + # Prevent MIME sniffing + "X-Content-Type-Options": "nosniff", + }, + ) # ── Health & Info ────────────────────────────────────────────────────────────── @@ -87,6 +103,15 @@ async def health(): return {"status": "healthy", "environment": settings.app_env} +# ── Prometheus Metrics ────────────────────────────────────────────────────────── +try: + from prometheus_fastapi_instrumentator import Instrumentator + Instrumentator().instrument(app).expose(app, endpoint="/metrics", include_in_schema=False) + logger.info("Prometheus metrics enabled at /metrics") +except ImportError: + logger.info("prometheus-fastapi-instrumentator not installed, metrics endpoint disabled") + + # ── Sentry ───────────────────────────────────────────────────────────────────── if settings.sentry_dsn: import sentry_sdk diff --git a/app/models.py b/app/models.py index 03647da..99f5230 100644 --- a/app/models.py +++ b/app/models.py @@ -1,8 +1,9 @@ -from pydantic import BaseModel, EmailStr, Field +from pydantic import BaseModel, EmailStr, Field, field_validator from typing import Optional, List, Dict, Any from datetime import datetime from enum import Enum import uuid +import re # ─── Enums ──────────────────────────────────────────────────────────────────── @@ -59,6 +60,7 @@ class UserResponse(BaseModel): email: str company_name: Optional[str] = None plan: str = "free" + is_admin: bool = False created_at: Optional[datetime] = None @@ -100,6 +102,39 @@ class ChatbotCreate(BaseModel): description: Optional[str] = None system_prompt: Optional[str] = None model: str = "accounts/fireworks/models/kimi-k2-instruct-0905" + + @field_validator("name", mode="before") + @classmethod + def sanitize_name(cls, v: Any) -> Any: + if v: + v = str(v).strip() + if len(v) > 100: + raise ValueError("Name must be 100 characters or less") + return v + + @field_validator("system_prompt", mode="before") + @classmethod + def sanitize_system_prompt(cls, v: Any) -> Any: + if v: + v = re.sub(r"]*>.*?", "", str(v), flags=re.DOTALL | re.IGNORECASE) + v = re.sub(r"javascript:", "", v, flags=re.IGNORECASE) + if len(v) > 10000: + raise ValueError("System prompt must be 10000 characters or less") + return v + + @field_validator("description", mode="before") + @classmethod + def sanitize_description(cls, v: Any) -> Any: + if v and len(str(v)) > 2000: + raise ValueError("Description must be 2000 characters or less") + return v + + @field_validator("welcome_message", mode="before") + @classmethod + def sanitize_welcome_message(cls, v: Any) -> Any: + if v and len(str(v)) > 500: + raise ValueError("Welcome message must be 500 characters or less") + return v temperature: float = Field(default=0.7, ge=0.0, le=2.0) max_tokens: int = Field(default=1000, ge=100, le=8000) primary_color: str = "#6366f1" @@ -116,12 +151,46 @@ class ChatbotCreate(BaseModel): handoff_message: str = "I'll connect you with our team. Please wait." handoff_email: Optional[str] = None handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"] + booking_enabled: bool = False class ChatbotUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None system_prompt: Optional[str] = None + + @field_validator("name", mode="before") + @classmethod + def sanitize_name(cls, v: Any) -> Any: + if v: + v = str(v).strip() + if len(v) > 100: + raise ValueError("Name must be 100 characters or less") + return v + + @field_validator("system_prompt", mode="before") + @classmethod + def sanitize_system_prompt(cls, v: Any) -> Any: + if v: + v = re.sub(r"]*>.*?", "", str(v), flags=re.DOTALL | re.IGNORECASE) + v = re.sub(r"javascript:", "", v, flags=re.IGNORECASE) + if len(v) > 10000: + raise ValueError("System prompt must be 10000 characters or less") + return v + + @field_validator("description", mode="before") + @classmethod + def sanitize_description(cls, v: Any) -> Any: + if v and len(str(v)) > 2000: + raise ValueError("Description must be 2000 characters or less") + return v + + @field_validator("welcome_message", mode="before") + @classmethod + def sanitize_welcome_message(cls, v: Any) -> Any: + if v and len(str(v)) > 500: + raise ValueError("Welcome message must be 500 characters or less") + return v model: Optional[str] = None temperature: Optional[float] = None max_tokens: Optional[int] = None @@ -139,6 +208,7 @@ class ChatbotUpdate(BaseModel): handoff_message: Optional[str] = None handoff_email: Optional[str] = None handoff_keywords: Optional[List[str]] = None + booking_enabled: Optional[bool] = None class ChatbotResponse(BaseModel): @@ -172,6 +242,7 @@ class ChatbotResponse(BaseModel): handoff_message: str = "I'll connect you with our team. Please wait." handoff_email: Optional[str] = None handoff_keywords: List[str] = ["human", "agent", "speak to someone", "talk to a person", "real person"] + booking_enabled: bool = False class ChatbotPublicResponse(BaseModel): @@ -355,9 +426,16 @@ class LeadResponse(BaseModel): name: Optional[str] = None phone: Optional[str] = None company: Optional[str] = None + status: str = "new" + notes: Optional[str] = None created_at: Optional[datetime] = None +class LeadUpdate(BaseModel): + status: Optional[str] = None # new, contacted, qualified, closed, lost + notes: Optional[str] = None + + # ─── URL Source Models ───────────────────────────────────────────────────────── class UrlSourceCreate(BaseModel): @@ -392,6 +470,8 @@ class InboxConversation(BaseModel): language: str message_count: int first_message: Optional[str] = None + status: str = "open" + last_agent_reply_at: Optional[datetime] = None created_at: Optional[datetime] = None @@ -405,13 +485,148 @@ class InboxMessage(BaseModel): created_at: Optional[datetime] = None +class ConversationStatusUpdate(BaseModel): + status: str # open, agent_handling, resolved + + +class AgentReplyCreate(BaseModel): + message: str = Field(min_length=1, max_length=4000) + + # ─── Channel Models ──────────────────────────────────────────────────────────── class ChannelConnectionResponse(BaseModel): id: str channel: str bot_username: Optional[str] = None - wa_keyword: Optional[str] = None - wa_link: Optional[str] = None is_active: bool - created_at: Optional[datetime] = None \ No newline at end of file + created_at: Optional[datetime] = None + + +# ─── Admin Models ────────────────────────────────────────────────────────────── + +class AdminUserListItem(BaseModel): + id: str + email: str + company_name: Optional[str] = None + plan: str = "free" + subscription_status: str = "active" + chatbot_count: int = 0 + conversations_count: int = 0 + is_suspended: bool = False + is_admin: bool = False + created_at: Optional[datetime] = None + + +class AdminUserDetail(AdminUserListItem): + website: Optional[str] = None + industry: Optional[str] = None + chatbots: List[Dict[str, Any]] = [] + + +class AdminChangePlanRequest(BaseModel): + plan: str + reason: Optional[str] = None + + +class AdminSuspendRequest(BaseModel): + suspend: bool + reason: Optional[str] = None + + +class AdminStatsResponse(BaseModel): + total_users: int + total_chatbots: int + total_published_chatbots: int + total_conversations: int + total_messages: int + active_subscriptions: Dict[str, int] + + +class AdminChatbotListItem(BaseModel): + id: str + name: str + owner_email: Optional[str] = None + company_name: Optional[str] = None + is_published: bool = False + document_count: int = 0 + conversation_count: int = 0 + created_at: Optional[datetime] = None + + +class AdminSystemHealth(BaseModel): + db: str + qdrant: str + llm_providers: Dict[str, bool] + timestamp: datetime + + +class AdminConversationListItem(BaseModel): + id: str + chatbot_name: Optional[str] = None + session_id: Optional[str] = None + language: Optional[str] = None + message_count: int = 0 + created_at: Optional[datetime] = None + first_message: Optional[str] = None + + +# ─── Appointment Models ──────────────────────────────────────────────────────── + +class BusinessHoursEntry(BaseModel): + day_of_week: int = Field(ge=0, le=6) # 0=Mon, 6=Sun + is_open: bool = True + open_time: str = "09:00" # HH:MM + close_time: str = "17:00" + slot_duration_minutes: int = Field(default=60, ge=15, le=480) + + +class BusinessHoursSave(BaseModel): + hours: List[BusinessHoursEntry] + + +class AppointmentCreate(BaseModel): + customer_name: str = Field(min_length=1, max_length=200) + customer_contact: str = Field(min_length=1, max_length=200) + service: Optional[str] = None + slot_start: datetime + notes: Optional[str] = None + conversation_id: Optional[str] = None + + +class AppointmentResponse(BaseModel): + id: str + chatbot_id: str + conversation_id: Optional[str] = None + customer_name: str + customer_contact: str + service: Optional[str] = None + slot_start: datetime + slot_end: datetime + status: str + notes: Optional[str] = None + created_at: Optional[datetime] = None + + +class AppointmentStatusUpdate(BaseModel): + status: str # pending, confirmed, cancelled, completed + + +# ─── Campaign Models ─────────────────────────────────────────────────────────── + +class CampaignCreate(BaseModel): + chatbot_id: str + title: str = Field(min_length=1, max_length=200) + message: str = Field(min_length=1, max_length=4000) + + +class CampaignResponse(BaseModel): + id: str + chatbot_id: str + title: str + message: str + status: str + recipients_count: int + sent_count: int + created_at: Optional[datetime] = None + sent_at: Optional[datetime] = None \ No newline at end of file diff --git a/app/routers/admin.py b/app/routers/admin.py new file mode 100644 index 0000000..07f0f60 --- /dev/null +++ b/app/routers/admin.py @@ -0,0 +1,555 @@ +""" +Admin router — all endpoints require is_admin = TRUE in user_profiles. + +Bootstrap: after running migration 001, set your admin user in Supabase: + UPDATE user_profiles SET is_admin = TRUE WHERE user_id = ''; +""" +import logging +import time +from collections import defaultdict +from datetime import datetime +from typing import Optional, List, Dict + +from fastapi import APIRouter, Depends, HTTPException, Query +from app.dependencies import get_admin_user +from app.database import get_supabase +from app.models import ( + AdminStatsResponse, AdminUserListItem, AdminUserDetail, + AdminChangePlanRequest, AdminSuspendRequest, AdminChatbotListItem, + AdminSystemHealth, AdminConversationListItem, SuccessResponse, +) +from app.services.vector_store import vector_store +from app.services.storage import delete_from_storage +from app.config import settings + +router = APIRouter(prefix="/admin", tags=["Admin"]) +logger = logging.getLogger(__name__) +_app_start_time = time.time() + + +# ── Stats ────────────────────────────────────────────────────────────────────── + +@router.get("/stats", response_model=AdminStatsResponse) +async def get_stats(admin=Depends(get_admin_user)): + """Platform-wide statistics.""" + supabase = get_supabase() + + # Total users + try: + users_resp = supabase.table("user_profiles").select("user_id", count="exact").execute() + total_users = users_resp.count or 0 + except Exception: + total_users = 0 + + # Total chatbots + try: + cb_resp = supabase.table("chatbots").select("id", count="exact").execute() + total_chatbots = cb_resp.count or 0 + pub_resp = supabase.table("chatbots").select("id", count="exact").eq("is_published", True).execute() + total_published = pub_resp.count or 0 + except Exception: + total_chatbots = 0 + total_published = 0 + + # Total conversations + try: + conv_resp = supabase.table("conversations").select("id", count="exact").execute() + total_convos = conv_resp.count or 0 + except Exception: + total_convos = 0 + + # Total messages + try: + msg_resp = supabase.table("messages").select("id", count="exact").execute() + total_messages = msg_resp.count or 0 + except Exception: + total_messages = 0 + + # Active subscriptions by plan + active_subs: Dict[str, int] = defaultdict(int) + try: + subs_resp = supabase.table("subscriptions").select("plan, status").eq("status", "active").execute() + for s in (subs_resp.data or []): + active_subs[s["plan"]] += 1 + except Exception: + pass + + return AdminStatsResponse( + total_users=total_users, + total_chatbots=total_chatbots, + total_published_chatbots=total_published, + total_conversations=total_convos, + total_messages=total_messages, + active_subscriptions=dict(active_subs), + ) + + +# ── Users ────────────────────────────────────────────────────────────────────── + +@router.get("/users") +async def list_users( + admin=Depends(get_admin_user), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + search: Optional[str] = None, + plan: Optional[str] = None, +): + """Paginated list of all users with company and subscription info.""" + supabase = get_supabase() + offset = (page - 1) * limit + + try: + # Fetch companies (contains owner_id and name) + companies_resp = supabase.table("companies").select("id, owner_id, name, website, industry").execute() + companies = {c["owner_id"]: c for c in (companies_resp.data or [])} + + # Fetch subscriptions + subs_resp = supabase.table("subscriptions").select("user_id, plan, status").execute() + subs = {s["user_id"]: s for s in (subs_resp.data or [])} + + # Fetch user profiles (suspension, admin flag) + profiles_resp = supabase.table("user_profiles").select("user_id, is_admin, suspended_at").execute() + profiles = {p["user_id"]: p for p in (profiles_resp.data or [])} + + # Fetch auth users via admin API + try: + auth_users_resp = supabase.auth.admin.list_users() + auth_users = auth_users_resp if isinstance(auth_users_resp, list) else getattr(auth_users_resp, 'users', []) + except Exception as e: + logger.warning(f"Could not list auth users: {e}") + auth_users = [] + + # Fetch chatbot counts per company + cb_resp = supabase.table("chatbots").select("company_id").execute() + cb_by_company: Dict[str, int] = defaultdict(int) + for cb in (cb_resp.data or []): + cb_by_company[cb["company_id"]] += 1 + + # Fetch conversation counts per chatbot (to get per-user conv count) + chatbots_resp = supabase.table("chatbots").select("id, company_id").execute() + chatbot_company_map = {cb["id"]: cb["company_id"] for cb in (chatbots_resp.data or [])} + + conv_resp = supabase.table("conversations").select("chatbot_id", count="exact").execute() + conv_by_chatbot: Dict[str, int] = defaultdict(int) + for conv in (conv_resp.data or []): + conv_by_chatbot[conv["chatbot_id"]] += 1 + + conv_by_company: Dict[str, int] = defaultdict(int) + for cb_id, count in conv_by_chatbot.items(): + company_id = chatbot_company_map.get(cb_id) + if company_id: + conv_by_company[company_id] += count + + # Build user list + users_list = [] + for auth_user in auth_users: + uid = getattr(auth_user, "id", None) or auth_user.get("id", "") + email = getattr(auth_user, "email", None) or auth_user.get("email", "") + created_at = getattr(auth_user, "created_at", None) or auth_user.get("created_at") + + # Apply filters + if search and search.lower() not in email.lower(): + company_info = companies.get(uid, {}) + if search.lower() not in (company_info.get("name") or "").lower(): + continue + + sub_info = subs.get(uid, {}) + user_plan = sub_info.get("plan", "free") + if plan and user_plan != plan: + continue + + company_info = companies.get(uid, {}) + profile_info = profiles.get(uid, {}) + + users_list.append(AdminUserListItem( + id=uid, + email=email, + company_name=company_info.get("name"), + plan=user_plan, + subscription_status=sub_info.get("status", "active"), + chatbot_count=cb_by_company.get(company_info.get("id", ""), 0), + conversations_count=conv_by_company.get(company_info.get("id", ""), 0), + is_suspended=bool(profile_info.get("suspended_at")), + is_admin=bool(profile_info.get("is_admin", False)), + created_at=created_at, + )) + + total = len(users_list) + paginated = users_list[offset:offset + limit] + + return { + "users": [u.model_dump() for u in paginated], + "total": total, + "page": page, + "pages": max(1, (total + limit - 1) // limit), + } + + except Exception as e: + logger.error(f"Admin list_users error: {e}") + raise HTTPException(status_code=500, detail="Failed to fetch users") + + +@router.get("/users/{user_id}", response_model=AdminUserDetail) +async def get_user(user_id: str, admin=Depends(get_admin_user)): + """Detailed info about a specific user.""" + supabase = get_supabase() + + try: + auth_user = supabase.auth.admin.get_user_by_id(user_id) + auth_u = getattr(auth_user, "user", auth_user) + email = getattr(auth_u, "email", "") or auth_u.get("email", "") + created_at = getattr(auth_u, "created_at", None) or auth_u.get("created_at") + except Exception: + email = "" + created_at = None + + company = supabase.table("companies").select("*").eq("owner_id", user_id).execute() + company_info = company.data[0] if company.data else {} + + sub = supabase.table("subscriptions").select("plan, status").eq("user_id", user_id).execute() + sub_info = sub.data[0] if sub.data else {} + + profile = supabase.table("user_profiles").select("is_admin, suspended_at").eq("user_id", user_id).execute() + profile_info = profile.data[0] if profile.data else {} + + chatbots = [] + if company_info.get("id"): + cb_resp = supabase.table("chatbots").select("id, name, is_published, created_at") \ + .eq("company_id", company_info["id"]).execute() + chatbots = cb_resp.data or [] + + return AdminUserDetail( + id=user_id, + email=email, + company_name=company_info.get("name"), + website=company_info.get("website"), + industry=company_info.get("industry"), + plan=sub_info.get("plan", "free"), + subscription_status=sub_info.get("status", "active"), + chatbot_count=len(chatbots), + conversations_count=0, + is_suspended=bool(profile_info.get("suspended_at")), + is_admin=bool(profile_info.get("is_admin", False)), + created_at=created_at, + chatbots=chatbots, + ) + + +@router.patch("/users/{user_id}/plan") +async def change_user_plan(user_id: str, data: AdminChangePlanRequest, admin=Depends(get_admin_user)): + """Manually grant or change a user's subscription plan.""" + valid_plans = ["free", "starter", "business", "agency", "enterprise"] + if data.plan not in valid_plans: + raise HTTPException(status_code=400, detail=f"Invalid plan. Must be one of: {valid_plans}") + + supabase = get_supabase() + + try: + supabase.table("subscriptions").upsert({ + "user_id": user_id, + "plan": data.plan, + "status": "active", + }, on_conflict="user_id").execute() + except Exception as e: + logger.error(f"Failed to change plan for {user_id}: {e}") + raise HTTPException(status_code=500, detail="Failed to update plan") + + logger.info(f"Admin {admin.id} changed plan for user {user_id} to {data.plan}. Reason: {data.reason}") + return {"message": f"Plan updated to {data.plan}", "user_id": user_id, "plan": data.plan} + + +@router.post("/users/{user_id}/suspend") +async def suspend_user(user_id: str, data: AdminSuspendRequest, admin=Depends(get_admin_user)): + """Suspend or unsuspend a user account.""" + supabase = get_supabase() + + update_data: dict = {"updated_at": datetime.utcnow().isoformat()} + if data.suspend: + update_data["suspended_at"] = datetime.utcnow().isoformat() + if data.reason: + update_data["suspended_reason"] = data.reason + action = "suspended" + else: + update_data["suspended_at"] = None + update_data["suspended_reason"] = None + action = "unsuspended" + + try: + supabase.table("user_profiles").upsert( + {"user_id": user_id, **update_data}, + on_conflict="user_id" + ).execute() + except Exception as e: + logger.error(f"Failed to {action} user {user_id}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to {action} user") + + logger.info(f"Admin {admin.id} {action} user {user_id}") + return {"message": f"User {action}", "user_id": user_id} + + +@router.delete("/users/{user_id}", response_model=SuccessResponse) +async def delete_user(user_id: str, admin=Depends(get_admin_user)): + """Permanently delete a user and all their data.""" + supabase = get_supabase() + + # 1. Get company + company = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + company_id = company.data[0]["id"] if company.data else None + + if company_id: + # 2. Get all chatbots and clean up Qdrant + storage + chatbots = supabase.table("chatbots").select("id, qdrant_collection_name, logo_url") \ + .eq("company_id", company_id).execute() + for cb in (chatbots.data or []): + if cb.get("qdrant_collection_name"): + try: + vector_store.delete_collection(cb["qdrant_collection_name"]) + except Exception as e: + logger.warning(f"Failed to delete Qdrant collection for chatbot {cb['id']}: {e}") + if cb.get("logo_url"): + delete_from_storage(supabase, "logos", cb["logo_url"]) + + # 3. Delete documents from storage + docs = supabase.table("documents").select("file_url") \ + .in_("chatbot_id", [cb["id"] for cb in (chatbots.data or [])]).execute() + for doc in (docs.data or []): + if doc.get("file_url"): + delete_from_storage(supabase, "documents", doc["file_url"]) + + # 4. Delete company (cascades to chatbots, documents, conversations) + supabase.table("companies").delete().eq("id", company_id).execute() + + # 5. Delete subscription + supabase.table("subscriptions").delete().eq("user_id", user_id).execute() + + # 6. Delete user profile + supabase.table("user_profiles").delete().eq("user_id", user_id).execute() + + # 7. Delete auth user + try: + supabase.auth.admin.delete_user(user_id) + except Exception as e: + logger.error(f"Failed to delete auth user {user_id}: {e}") + raise HTTPException(status_code=500, detail="Failed to delete auth user") + + logger.info(f"Admin {admin.id} deleted user {user_id}") + return SuccessResponse(success=True, message="User deleted successfully") + + +# ── Chatbots ─────────────────────────────────────────────────────────────────── + +@router.get("/chatbots") +async def list_all_chatbots( + admin=Depends(get_admin_user), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + search: Optional[str] = None, +): + """Paginated list of ALL chatbots across all users.""" + supabase = get_supabase() + offset = (page - 1) * limit + + try: + # Get chatbots with company info + q = supabase.table("chatbots").select("*, companies(name, owner_id)") \ + .order("created_at", desc=True) + result = q.execute() + all_chatbots = result.data or [] + + # Apply search filter + if search: + s = search.lower() + all_chatbots = [ + cb for cb in all_chatbots + if s in (cb.get("name") or "").lower() + or s in (cb.get("companies", {}) or {}).get("name", "").lower() + ] + + total = len(all_chatbots) + paginated = all_chatbots[offset:offset + limit] + + # Get owner emails for paginated set + owner_ids = list({(cb.get("companies") or {}).get("owner_id") for cb in paginated if cb.get("companies")}) + owner_emails: Dict[str, str] = {} + if owner_ids: + try: + for oid in owner_ids: + try: + u = supabase.auth.admin.get_user_by_id(oid) + u_obj = getattr(u, "user", u) + owner_emails[oid] = getattr(u_obj, "email", "") or u_obj.get("email", "") + except Exception: + pass + except Exception: + pass + + # Get doc and conv counts for paginated chatbots + cb_ids = [cb["id"] for cb in paginated] + doc_counts: Dict[str, int] = defaultdict(int) + conv_counts: Dict[str, int] = defaultdict(int) + + if cb_ids: + docs_resp = supabase.table("documents").select("chatbot_id").in_("chatbot_id", cb_ids).execute() + for d in (docs_resp.data or []): + doc_counts[d["chatbot_id"]] += 1 + + convs_resp = supabase.table("conversations").select("chatbot_id").in_("chatbot_id", cb_ids).execute() + for c in (convs_resp.data or []): + conv_counts[c["chatbot_id"]] += 1 + + items = [] + for cb in paginated: + company_info = cb.get("companies") or {} + owner_id = company_info.get("owner_id") + items.append(AdminChatbotListItem( + id=cb["id"], + name=cb.get("name", ""), + owner_email=owner_emails.get(owner_id), + company_name=company_info.get("name"), + is_published=cb.get("is_published", False), + document_count=doc_counts[cb["id"]], + conversation_count=conv_counts[cb["id"]], + created_at=cb.get("created_at"), + )) + + return { + "chatbots": [i.model_dump() for i in items], + "total": total, + "page": page, + "pages": max(1, (total + limit - 1) // limit), + } + + except Exception as e: + logger.error(f"Admin list_chatbots error: {e}") + raise HTTPException(status_code=500, detail="Failed to fetch chatbots") + + +@router.delete("/chatbots/{chatbot_id}", response_model=SuccessResponse) +async def delete_chatbot_admin(chatbot_id: str, admin=Depends(get_admin_user)): + """Force-delete any chatbot regardless of ownership.""" + supabase = get_supabase() + + cb = supabase.table("chatbots").select("*").eq("id", chatbot_id).execute() + if not cb.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + chatbot = cb.data[0] + + # Delete Qdrant collection + if chatbot.get("qdrant_collection_name"): + try: + vector_store.delete_collection(chatbot["qdrant_collection_name"]) + except Exception as e: + logger.warning(f"Failed to delete Qdrant collection: {e}") + + # Delete logo from storage + if chatbot.get("logo_url"): + delete_from_storage(supabase, "logos", chatbot["logo_url"]) + + supabase.table("chatbots").delete().eq("id", chatbot_id).execute() + logger.info(f"Admin {admin.id} deleted chatbot {chatbot_id}") + return SuccessResponse(success=True, message="Chatbot deleted") + + +# ── Conversations ────────────────────────────────────────────────────────────── + +@router.get("/conversations") +async def list_conversations( + admin=Depends(get_admin_user), + page: int = Query(1, ge=1), + limit: int = Query(50, ge=1, le=200), +): + """Recent conversations across all chatbots.""" + supabase = get_supabase() + offset = (page - 1) * limit + + try: + result = supabase.table("conversations") \ + .select("*, chatbots(name)") \ + .order("created_at", desc=True) \ + .range(offset, offset + limit - 1) \ + .execute() + convos = result.data or [] + + # Get first message for each conversation + conv_ids = [c["id"] for c in convos] + first_msgs: Dict[str, str] = {} + if conv_ids: + msgs_resp = supabase.table("messages") \ + .select("conversation_id, content, role") \ + .in_("conversation_id", conv_ids) \ + .eq("role", "user") \ + .order("created_at", desc=False) \ + .execute() + seen = set() + for m in (msgs_resp.data or []): + cid = m["conversation_id"] + if cid not in seen: + seen.add(cid) + first_msgs[cid] = (m.get("content") or "")[:120] + + # Total count + count_resp = supabase.table("conversations").select("id", count="exact").execute() + total = count_resp.count or 0 + + items = [ + AdminConversationListItem( + id=c["id"], + chatbot_name=(c.get("chatbots") or {}).get("name"), + session_id=c.get("session_id"), + language=c.get("language"), + message_count=c.get("message_count", 0), + created_at=c.get("created_at"), + first_message=first_msgs.get(c["id"]), + ) + for c in convos + ] + + return { + "conversations": [i.model_dump() for i in items], + "total": total, + "page": page, + "pages": max(1, (total + limit - 1) // limit), + } + + except Exception as e: + logger.error(f"Admin list_conversations error: {e}") + raise HTTPException(status_code=500, detail="Failed to fetch conversations") + + +# ── System Health ────────────────────────────────────────────────────────────── + +@router.get("/system/health", response_model=AdminSystemHealth) +async def system_health(admin=Depends(get_admin_user)): + """Check health of all system components.""" + + # Check database + db_status = "unhealthy" + try: + supabase = get_supabase() + supabase.table("subscriptions").select("id").limit(1).execute() + db_status = "healthy" + except Exception as e: + logger.warning(f"DB health check failed: {e}") + + # Check Qdrant + qdrant_status = "unhealthy" + try: + vector_store.client.get_collections() + qdrant_status = "healthy" + except Exception as e: + logger.warning(f"Qdrant health check failed: {e}") + + # Check LLM provider API key availability + llm_providers = { + "openai": bool(getattr(settings, "openai_api_key", None)), + "anthropic": bool(getattr(settings, "anthropic_api_key", None)), + "google": bool(getattr(settings, "google_api_key", None)), + "fireworks": bool(getattr(settings, "fireworks_api_key", None)), + } + + return AdminSystemHealth( + db=db_status, + qdrant=qdrant_status, + llm_providers=llm_providers, + timestamp=datetime.utcnow(), + ) diff --git a/app/routers/analytics.py b/app/routers/analytics.py index 78687d6..20843ec 100644 --- a/app/routers/analytics.py +++ b/app/routers/analytics.py @@ -9,6 +9,7 @@ from app.database import get_supabase from app.dependencies import get_current_user from app.config import PLAN_LIMITS from typing import List, Optional, Dict +from collections import defaultdict from pydantic import BaseModel from datetime import datetime, timedelta import logging @@ -127,14 +128,7 @@ async def get_analytics_overview(user=Depends(get_current_user)): conversations_used=0, ) - # Gather per-chatbot analytics - chatbot_analytics = [] - total_convos = 0 - total_msgs = 0 - total_sessions = 0 - month_convos = 0 - all_ratings = [] - + # ── Batch queries (fixes N+1) ──────────────────────────────────────────────── now = datetime.utcnow() month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) week_start = now - timedelta(days=now.weekday()) @@ -142,14 +136,60 @@ async def get_analytics_overview(user=Depends(get_current_user)): today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) thirty_days_ago = now - timedelta(days=30) + # Batch query 1: ALL conversations for all chatbots (single query) + all_convos_resp = supabase.table("conversations") \ + .select("id, chatbot_id, session_id, language, created_at") \ + .in_("chatbot_id", chatbot_ids) \ + .execute() + all_convos = all_convos_resp.data or [] + all_conv_ids = [c["id"] for c in all_convos] + + # Batch query 2: ALL messages for all conversations (single query) + all_msgs: List[Dict] = [] + if all_conv_ids: + # Split into chunks of 500 to avoid URL length limits + for i in range(0, len(all_conv_ids), 500): + chunk = all_conv_ids[i:i + 500] + msgs_resp = supabase.table("messages") \ + .select("id, conversation_id, role, content, created_at") \ + .in_("conversation_id", chunk) \ + .execute() + all_msgs.extend(msgs_resp.data or []) + + # Batch query 3: ALL feedback for all chatbots (single query) + all_feedback: List[Dict] = [] + if chatbot_ids: + fb_resp = supabase.table("message_feedback") \ + .select("chatbot_id, feedback") \ + .in_("chatbot_id", chatbot_ids) \ + .execute() + all_feedback = fb_resp.data or [] + + # Index data by chatbot_id for O(1) lookups + convos_by_chatbot: Dict[str, List[Dict]] = defaultdict(list) + for c in all_convos: + convos_by_chatbot[c["chatbot_id"]].append(c) + + msgs_by_conv: Dict[str, List[Dict]] = defaultdict(list) + for m in all_msgs: + msgs_by_conv[m["conversation_id"]].append(m) + + fb_by_chatbot: Dict[str, List[Dict]] = defaultdict(list) + for f in all_feedback: + fb_by_chatbot[f["chatbot_id"]].append(f) + + # ── Aggregate per chatbot ──────────────────────────────────────────────────── + chatbot_analytics = [] + total_convos = 0 + total_msgs = 0 + total_sessions = 0 + month_convos = 0 + all_ratings = [] + for chatbot in chatbot_list: cid = chatbot["id"] - - # Total conversations - convos = supabase.table("conversations").select("id, session_id, language, created_at", count="exact") \ - .eq("chatbot_id", cid).execute() - conv_count = convos.count or 0 - conv_data = convos.data or [] + conv_data = convos_by_chatbot[cid] + conv_count = len(conv_data) total_convos += conv_count # Unique sessions @@ -157,34 +197,35 @@ async def get_analytics_overview(user=Depends(get_current_user)): unique_sess = len(sessions) total_sessions += unique_sess - # Total messages - msgs = supabase.table("messages").select("id", count="exact") \ - .in_("conversation_id", [c["id"] for c in conv_data] if conv_data else [""]).execute() - msg_count = msgs.count or 0 + # Messages for this chatbot + chatbot_msgs = [] + for c in conv_data: + chatbot_msgs.extend(msgs_by_conv[c["id"]]) + msg_count = len(chatbot_msgs) total_msgs += msg_count # Time-based conversation counts - today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_start.strftime("%Y-%m-%d")) + today_str = today_start.strftime("%Y-%m-%d") + today_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"][:10] == today_str) week_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= week_start.isoformat()) month_count = sum(1 for c in conv_data if c.get("created_at") and c["created_at"] >= month_start.isoformat()) month_convos += month_count # Daily conversations (last 30 days) - daily = {} + daily: Dict[str, int] = {} for c in conv_data: if c.get("created_at") and c["created_at"] >= thirty_days_ago.isoformat(): day = c["created_at"][:10] daily[day] = daily.get(day, 0) + 1 - daily_list = [DailyConversations(date=d, count=n) for d, n in sorted(daily.items())] - # Languages used + # Languages lang_counts: Dict[str, int] = {} for c in conv_data: lang = c.get("language", "en") lang_counts[lang] = lang_counts.get(lang, 0) + 1 - # Peak hour (approximate from created_at) + # Peak hour hour_counts: Dict[int, int] = {} for c in conv_data: if c.get("created_at") and len(c["created_at"]) > 13: @@ -195,35 +236,25 @@ async def get_analytics_overview(user=Depends(get_current_user)): pass peak = max(hour_counts, key=hour_counts.get) if hour_counts else None - # Top queries (from user messages, get first message per conversation) - top_queries: List[TopQuery] = [] - if conv_data: - conv_ids = [c["id"] for c in conv_data[:100]] # limit to recent 100 - user_msgs = supabase.table("messages").select("content") \ - .in_("conversation_id", conv_ids) \ - .eq("role", "user") \ - .limit(200).execute() - query_counts: Dict[str, int] = {} - for m in (user_msgs.data or []): + # Top queries from user messages + query_counts: Dict[str, int] = {} + for m in chatbot_msgs: + if m.get("role") == "user": content = (m.get("content") or "")[:100].strip() if content: query_counts[content] = query_counts.get(content, 0) + 1 - top_sorted = sorted(query_counts.items(), key=lambda x: -x[1])[:5] - top_queries = [TopQuery(query=q, count=n) for q, n in top_sorted] + top_queries = [TopQuery(query=q, count=n) for q, n in sorted(query_counts.items(), key=lambda x: -x[1])[:5]] # Rating rating = chatbot.get("average_rating") if rating: all_ratings.append(rating) - # Feedback counts - fb_result = supabase.table("message_feedback").select("feedback", count="exact") \ - .eq("chatbot_id", cid).execute() - total_fb = fb_result.count or 0 - fb_pos = sum(1 for f in (fb_result.data or []) if f.get("feedback") == "positive") - fb_neg = total_fb - fb_pos + # Feedback + chatbot_fb = fb_by_chatbot[cid] + fb_pos = sum(1 for f in chatbot_fb if f.get("feedback") == "positive") + fb_neg = len(chatbot_fb) - fb_pos - # Average messages per conversation avg_msgs = round(msg_count / conv_count, 1) if conv_count > 0 else 0.0 chatbot_analytics.append(ChatbotAnalyticsResponse( @@ -234,7 +265,7 @@ async def get_analytics_overview(user=Depends(get_current_user)): total_messages=msg_count, average_messages_per_conversation=avg_msgs, average_rating=rating, - total_ratings=total_fb, + total_ratings=len(chatbot_fb), conversations_today=today_count, conversations_this_week=week_count, conversations_this_month=month_count, diff --git a/app/routers/appointments.py b/app/routers/appointments.py new file mode 100644 index 0000000..9a1a958 --- /dev/null +++ b/app/routers/appointments.py @@ -0,0 +1,287 @@ +from fastapi import APIRouter, HTTPException, Depends, Query +from app.database import get_supabase +from app.dependencies import get_current_user +from app.config import PLAN_LIMITS +from app.models import ( + AppointmentCreate, AppointmentResponse, AppointmentStatusUpdate, + BusinessHoursEntry, BusinessHoursSave, +) +from typing import List, Optional +from datetime import datetime, timedelta, date +import uuid +import logging + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/appointments", tags=["Appointments"]) +public_router = APIRouter(tags=["Appointments"]) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _check_booking_access(user_id: str, supabase): + sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + if plan not in ("starter", "business", "agency", "enterprise"): + raise HTTPException(status_code=402, detail="Appointment booking requires Starter plan or higher") + return plan + + +def _get_user_company_id(user_id: str, supabase) -> str: + result = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Company not found") + return result.data[0]["id"] + + +def _verify_chatbot_ownership(chatbot_id: str, company_id: str, supabase): + chatbot = supabase.table("chatbots").select("id").eq("id", chatbot_id).eq("company_id", company_id).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + + +def _parse_time(t: str) -> tuple[int, int]: + """Parse HH:MM into (hour, minute).""" + h, m = t.split(":") + return int(h), int(m) + + +def _get_available_slots(chatbot_id: str, target_date: date, supabase) -> List[dict]: + """Return list of available {slot_start, slot_end} dicts for the given date.""" + day_of_week = target_date.weekday() # 0=Mon + + hours = supabase.table("business_hours") \ + .select("*").eq("chatbot_id", chatbot_id).eq("day_of_week", day_of_week).execute() + + if not hours.data or not hours.data[0].get("is_open"): + return [] + + h = hours.data[0] + open_h, open_m = _parse_time(h["open_time"]) + close_h, close_m = _parse_time(h["close_time"]) + duration = h.get("slot_duration_minutes", 60) + + slot_start = datetime(target_date.year, target_date.month, target_date.day, open_h, open_m) + slot_end_limit = datetime(target_date.year, target_date.month, target_date.day, close_h, close_m) + + # Fetch already-booked slots for that day + day_start = datetime(target_date.year, target_date.month, target_date.day, 0, 0) + day_end = day_start + timedelta(days=1) + + booked = supabase.table("appointments") \ + .select("slot_start, slot_end") \ + .eq("chatbot_id", chatbot_id) \ + .neq("status", "cancelled") \ + .gte("slot_start", day_start.isoformat()) \ + .lt("slot_start", day_end.isoformat()) \ + .execute() + booked_starts = {b["slot_start"] for b in (booked.data or [])} + + slots = [] + now = datetime.utcnow() + while slot_start + timedelta(minutes=duration) <= slot_end_limit: + slot_end = slot_start + timedelta(minutes=duration) + # Skip past slots + if slot_start > now: + iso_start = slot_start.isoformat() + if iso_start not in booked_starts: + slots.append({"slot_start": iso_start, "slot_end": slot_end.isoformat()}) + slot_start = slot_end + + return slots + + +# ── Protected endpoints (business owner) ───────────────────────────────────── + +@router.get("", response_model=List[AppointmentResponse]) +async def list_appointments( + chatbot_id: Optional[str] = Query(None), + status: Optional[str] = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(50, ge=1, le=200), + user=Depends(get_current_user), +): + supabase = get_supabase() + _check_booking_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id) + if chatbot_id: + chatbots_q = chatbots_q.eq("id", chatbot_id) + chatbots = chatbots_q.execute() + chatbot_ids = [c["id"] for c in (chatbots.data or [])] + if not chatbot_ids: + return [] + + offset = (page - 1) * limit + q = supabase.table("appointments").select("*").in_("chatbot_id", chatbot_ids) + if status: + q = q.eq("status", status) + result = q.order("slot_start", desc=False).range(offset, offset + limit - 1).execute() + return [AppointmentResponse(**a) for a in (result.data or [])] + + +@router.patch("/{appointment_id}", response_model=AppointmentResponse) +async def update_appointment_status( + appointment_id: str, + data: AppointmentStatusUpdate, + user=Depends(get_current_user), +): + valid = ("pending", "confirmed", "cancelled", "completed") + if data.status not in valid: + raise HTTPException(status_code=400, detail=f"Status must be one of {valid}") + + supabase = get_supabase() + _check_booking_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + appt = supabase.table("appointments").select("*, chatbots(company_id)") \ + .eq("id", appointment_id).execute() + if not appt.data: + raise HTTPException(status_code=404, detail="Appointment not found") + if appt.data[0].get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + result = supabase.table("appointments").update({"status": data.status}) \ + .eq("id", appointment_id).execute() + return AppointmentResponse(**result.data[0]) + + +@router.get("/chatbot/{chatbot_id}/hours") +async def get_business_hours(chatbot_id: str, user=Depends(get_current_user)): + supabase = get_supabase() + _check_booking_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + _verify_chatbot_ownership(chatbot_id, company_id, supabase) + + result = supabase.table("business_hours").select("*") \ + .eq("chatbot_id", chatbot_id).order("day_of_week").execute() + return result.data or [] + + +@router.put("/chatbot/{chatbot_id}/hours") +async def save_business_hours( + chatbot_id: str, + data: BusinessHoursSave, + user=Depends(get_current_user), +): + supabase = get_supabase() + _check_booking_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + _verify_chatbot_ownership(chatbot_id, company_id, supabase) + + # Upsert each day + for entry in data.hours: + existing = supabase.table("business_hours").select("id") \ + .eq("chatbot_id", chatbot_id).eq("day_of_week", entry.day_of_week).execute() + row = { + "chatbot_id": chatbot_id, + "day_of_week": entry.day_of_week, + "is_open": entry.is_open, + "open_time": entry.open_time, + "close_time": entry.close_time, + "slot_duration_minutes": entry.slot_duration_minutes, + } + if existing.data: + supabase.table("business_hours").update(row).eq("id", existing.data[0]["id"]).execute() + else: + row["id"] = str(uuid.uuid4()) + supabase.table("business_hours").insert(row).execute() + + return {"success": True} + + +# ── Public endpoints (customers booking) ───────────────────────────────────── + +@public_router.get("/chatbots/{chatbot_id}/booking-info") +async def get_booking_info(chatbot_id: str): + """Return public booking info for the booking page (no auth required).""" + supabase = get_supabase() + result = supabase.table("chatbots") \ + .select("id, name, booking_enabled, companies(name)") \ + .eq("id", chatbot_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + chatbot = result.data[0] + if not chatbot.get("booking_enabled"): + raise HTTPException(status_code=400, detail="Booking is not enabled for this chatbot") + return { + "chatbot_id": chatbot["id"], + "chatbot_name": chatbot.get("name", ""), + "company_name": (chatbot.get("companies") or {}).get("name", ""), + } + + +@public_router.get("/chatbots/{chatbot_id}/available-slots") +async def get_available_slots( + chatbot_id: str, + date: str = Query(..., description="YYYY-MM-DD"), +): + """Return available time slots for a given date (public).""" + supabase = get_supabase() + + chatbot = supabase.table("chatbots").select("id, booking_enabled, is_published") \ + .eq("id", chatbot_id).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + if not chatbot.data[0].get("booking_enabled"): + raise HTTPException(status_code=400, detail="Booking not enabled for this chatbot") + + try: + target = datetime.strptime(date, "%Y-%m-%d").date() + except ValueError: + raise HTTPException(status_code=400, detail="Invalid date format, use YYYY-MM-DD") + + slots = _get_available_slots(chatbot_id, target, supabase) + return {"date": date, "slots": slots} + + +@public_router.post("/chatbots/{chatbot_id}/appointments", response_model=AppointmentResponse, status_code=201) +async def create_appointment(chatbot_id: str, data: AppointmentCreate): + """Create an appointment (public endpoint, no auth required).""" + supabase = get_supabase() + + chatbot = supabase.table("chatbots").select("id, booking_enabled") \ + .eq("id", chatbot_id).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + if not chatbot.data[0].get("booking_enabled"): + raise HTTPException(status_code=400, detail="Booking not enabled for this chatbot") + + # Verify the slot is still available + slot_start_dt = data.slot_start + target_date = slot_start_dt.date() + available = _get_available_slots(chatbot_id, target_date, supabase) + available_starts = {s["slot_start"] for s in available} + + slot_iso = slot_start_dt.isoformat() + # Try a few normalizations (with/without timezone suffix) + if slot_iso not in available_starts and slot_iso + "Z" not in available_starts: + # Check without microseconds + slot_iso_no_ms = slot_start_dt.replace(microsecond=0).isoformat() + if slot_iso_no_ms not in available_starts: + raise HTTPException(status_code=409, detail="This slot is no longer available") + + # Calculate slot_end based on business hours duration + hours = supabase.table("business_hours").select("slot_duration_minutes") \ + .eq("chatbot_id", chatbot_id).eq("day_of_week", target_date.weekday()).execute() + duration = hours.data[0]["slot_duration_minutes"] if hours.data else 60 + slot_end_dt = slot_start_dt + timedelta(minutes=duration) + + appt_data = { + "id": str(uuid.uuid4()), + "chatbot_id": chatbot_id, + "conversation_id": data.conversation_id, + "customer_name": data.customer_name, + "customer_contact": data.customer_contact, + "service": data.service, + "slot_start": data.slot_start.isoformat(), + "slot_end": slot_end_dt.isoformat(), + "status": "pending", + "notes": data.notes, + } + + result = supabase.table("appointments").insert(appt_data).execute() + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create appointment") + return AppointmentResponse(**result.data[0]) diff --git a/app/routers/auth.py b/app/routers/auth.py index 17add8c..2f1f54e 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -58,6 +58,12 @@ async def signup(data: UserSignup): } ).execute() + # Safety-net: ensure user_profiles row exists (trigger should handle it, but just in case) + try: + supabase.table("user_profiles").insert({"user_id": user.id}).execute() + except Exception: + pass # Row may already exist from trigger + token = auth_resp.session.access_token if auth_resp.session else "" return TokenResponse( access_token=token, @@ -66,6 +72,7 @@ async def signup(data: UserSignup): email=user.email, company_name=data.company_name, plan="free", + is_admin=False, ), ) except HTTPException: @@ -109,6 +116,13 @@ async def login(data: UserLogin): ) plan = sub.data[0]["plan"] if sub.data else "free" + # Get is_admin flag + try: + profile = supabase.table("user_profiles").select("is_admin").eq("user_id", user.id).execute() + is_admin = profile.data[0].get("is_admin", False) if profile.data else False + except Exception: + is_admin = False + return TokenResponse( access_token=auth_resp.session.access_token, user=UserResponse( @@ -116,6 +130,7 @@ async def login(data: UserLogin): email=user.email, company_name=company_name, plan=plan, + is_admin=is_admin, ), ) except HTTPException: @@ -230,9 +245,16 @@ async def get_me(user=Depends(get_current_user)): ) plan = sub.data[0]["plan"] if sub.data else "free" + try: + profile = supabase.table("user_profiles").select("is_admin").eq("user_id", user.id).execute() + is_admin = profile.data[0].get("is_admin", False) if profile.data else False + except Exception: + is_admin = False + return UserResponse( id=user.id, email=user.email, company_name=company_name, plan=plan, + is_admin=is_admin, ) diff --git a/app/routers/billing.py b/app/routers/billing.py index d71313a..2c1ef48 100644 --- a/app/routers/billing.py +++ b/app/routers/billing.py @@ -89,6 +89,17 @@ async def stripe_webhook( supabase = get_supabase() event_type = event.get("type", "") + event_id = event.get("id", "") + + # Idempotency check: skip already-processed events + if event_id: + existing = supabase.table("stripe_webhook_events") \ + .select("stripe_event_id") \ + .eq("stripe_event_id", event_id) \ + .execute() + if existing.data: + logger.info(f"Stripe event {event_id} already processed, skipping") + return {"received": True} if event_type == "checkout.session.completed": session = event["data"]["object"] @@ -140,6 +151,16 @@ async def stripe_webhook( except Exception as e: logger.warning(f"Failed to send cancellation notification: {e}") + # Record event as processed + if event_id: + try: + supabase.table("stripe_webhook_events").insert({ + "stripe_event_id": event_id, + "event_type": event_type, + }).execute() + except Exception as e: + logger.warning(f"Failed to record stripe event {event_id}: {e}") + return {"received": True} except HTTPException: diff --git a/app/routers/campaigns.py b/app/routers/campaigns.py new file mode 100644 index 0000000..32a8c81 --- /dev/null +++ b/app/routers/campaigns.py @@ -0,0 +1,167 @@ +from fastapi import APIRouter, HTTPException, Depends, Query +from app.database import get_supabase +from app.dependencies import get_current_user +from app.models import CampaignCreate, CampaignResponse +from typing import List, Optional +import uuid +import logging + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/campaigns", tags=["Campaigns"]) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _check_campaigns_access(user_id: str, supabase): + sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() + plan = sub.data[0]["plan"] if sub.data else "free" + if plan not in ("starter", "business", "agency", "enterprise"): + raise HTTPException(status_code=402, detail="Campaigns require Starter plan or higher") + return plan + + +def _get_user_company_id(user_id: str, supabase) -> str: + result = supabase.table("companies").select("id").eq("owner_id", user_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Company not found") + return result.data[0]["id"] + + +def _verify_chatbot_ownership(chatbot_id: str, company_id: str, supabase): + chatbot = supabase.table("chatbots").select("id").eq("id", chatbot_id).eq("company_id", company_id).execute() + if not chatbot.data: + raise HTTPException(status_code=404, detail="Chatbot not found") + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + +@router.get("", response_model=List[CampaignResponse]) +async def list_campaigns( + chatbot_id: Optional[str] = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + user=Depends(get_current_user), +): + supabase = get_supabase() + _check_campaigns_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id) + if chatbot_id: + chatbots_q = chatbots_q.eq("id", chatbot_id) + chatbots = chatbots_q.execute() + chatbot_ids = [c["id"] for c in (chatbots.data or [])] + if not chatbot_ids: + return [] + + offset = (page - 1) * limit + result = supabase.table("campaigns").select("*").in_("chatbot_id", chatbot_ids) \ + .order("created_at", desc=True).range(offset, offset + limit - 1).execute() + return [CampaignResponse(**c) for c in (result.data or [])] + + +@router.post("", response_model=CampaignResponse, status_code=201) +async def create_campaign(data: CampaignCreate, user=Depends(get_current_user)): + supabase = get_supabase() + _check_campaigns_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + _verify_chatbot_ownership(data.chatbot_id, company_id, supabase) + + # Count Telegram subscribers for this chatbot + subscribers = supabase.table("channel_sessions").select("id", count="exact") \ + .eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute() + recipients_count = subscribers.count or 0 + + campaign_data = { + "id": str(uuid.uuid4()), + "chatbot_id": data.chatbot_id, + "title": data.title, + "message": data.message, + "status": "draft", + "recipients_count": recipients_count, + "sent_count": 0, + } + result = supabase.table("campaigns").insert(campaign_data).execute() + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create campaign") + return CampaignResponse(**result.data[0]) + + +@router.post("/{campaign_id}/send", response_model=CampaignResponse) +async def send_campaign(campaign_id: str, user=Depends(get_current_user)): + """Broadcast the campaign message to all Telegram subscribers of the chatbot.""" + supabase = get_supabase() + _check_campaigns_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \ + .eq("id", campaign_id).execute() + if not campaign.data: + raise HTTPException(status_code=404, detail="Campaign not found") + c = campaign.data[0] + if c.get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + if c["status"] == "sent": + raise HTTPException(status_code=400, detail="Campaign already sent") + + chatbot_id = c["chatbot_id"] + + # Get the Telegram bot token for this chatbot + conn = supabase.table("channel_connections").select("bot_token") \ + .eq("chatbot_id", chatbot_id).eq("channel", "telegram").eq("is_active", True).execute() + if not conn.data or not conn.data[0].get("bot_token"): + raise HTTPException(status_code=400, detail="No active Telegram connection for this chatbot") + bot_token = conn.data[0]["bot_token"] + + # Get all Telegram subscribers (channel_sessions) + sessions = supabase.table("channel_sessions").select("external_id") \ + .eq("chatbot_id", chatbot_id).eq("channel", "telegram").execute() + subscribers = sessions.data or [] + + # Mark as sending + supabase.table("campaigns").update({"status": "sending"}).eq("id", campaign_id).execute() + + # Broadcast + from app.services.telegram_service import send_message as tg_send + sent = 0 + for sub in subscribers: + try: + # external_id format is "tg:{token_prefix}:{chat_id}" + parts = sub["external_id"].split(":") + if len(parts) >= 3: + chat_id = int(parts[2]) + await tg_send(bot_token, chat_id, c["message"]) + sent += 1 + except Exception as e: + logger.warning(f"Failed to send campaign to {sub['external_id']}: {e}") + + # Mark as sent + from datetime import datetime, timezone + supabase.table("campaigns").update({ + "status": "sent", + "sent_count": sent, + "recipients_count": len(subscribers), + "sent_at": datetime.now(timezone.utc).isoformat(), + }).eq("id", campaign_id).execute() + + updated = supabase.table("campaigns").select("*").eq("id", campaign_id).execute() + return CampaignResponse(**updated.data[0]) + + +@router.delete("/{campaign_id}") +async def delete_campaign(campaign_id: str, user=Depends(get_current_user)): + supabase = get_supabase() + _check_campaigns_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \ + .eq("id", campaign_id).execute() + if not campaign.data: + raise HTTPException(status_code=404, detail="Campaign not found") + if campaign.data[0].get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + if campaign.data[0]["status"] == "sending": + raise HTTPException(status_code=400, detail="Cannot delete a campaign that is currently sending") + + supabase.table("campaigns").delete().eq("id", campaign_id).execute() + return {"success": True} diff --git a/app/routers/channels.py b/app/routers/channels.py index 2773e00..807ee3c 100644 --- a/app/routers/channels.py +++ b/app/routers/channels.py @@ -1,11 +1,9 @@ -import json -import re import uuid import logging -from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response +from fastapi import APIRouter, Depends, HTTPException, Query, Request from pydantic import BaseModel -from typing import List, Optional +from typing import Optional from app.config import settings, PLAN_LIMITS from app.database import get_supabase @@ -14,7 +12,6 @@ from app.services.rag import rag_engine from app.services.telegram_service import ( delete_webhook, get_bot_info, send_message as tg_send, set_webhook, ) -from app.services.whatsapp_service import send_message as wa_send, verify_signature from app.routers.chat import ( _get_conversation_history, _get_or_create_conversation, _save_message, ) @@ -32,11 +29,6 @@ class TelegramConnectRequest(BaseModel): bot_token: str -class WhatsAppConnectRequest(BaseModel): - chatbot_id: str - wa_keyword: Optional[str] = None - - # ── Helpers ─────────────────────────────────────────────────────────────────── def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase): @@ -53,18 +45,6 @@ def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase): raise HTTPException(status_code=404, detail="Chatbot not found") -def _generate_keyword(chatbot_name: str, chatbot_id: str, supabase) -> str: - base = re.sub(r"[^A-Z0-9]", "", chatbot_name.upper())[:10] or "BOT" - existing = ( - supabase.table("channel_connections") - .select("id").eq("channel", "whatsapp").eq("wa_keyword", base).execute() - ) - if not existing.data: - return base - suffix = chatbot_id.replace("-", "")[:4].upper() - return f"{base[:6]}{suffix}" - - def _get_or_create_channel_session( chatbot_id: str, channel: str, external_id: str, supabase ) -> dict: @@ -85,32 +65,13 @@ def _get_or_create_channel_session( return result.data[0] -def _upsert_whatsapp_session(chatbot_id: str, phone: str, session_id: str, supabase): - existing = ( - supabase.table("channel_sessions") - .select("id").eq("channel", "whatsapp").eq("external_id", phone).execute() - ) - if existing.data: - supabase.table("channel_sessions").update( - {"chatbot_id": chatbot_id, "session_id": session_id} - ).eq("id", existing.data[0]["id"]).execute() - else: - supabase.table("channel_sessions").insert({ - "id": str(uuid.uuid4()), - "chatbot_id": chatbot_id, - "channel": "whatsapp", - "external_id": phone, - "session_id": session_id, - }).execute() - - def _check_channel_plan(user_id: str, channel: str, supabase): """Raise 402 if the user's plan doesn't include the requested channel.""" sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() plan = sub.data[0]["plan"] if sub.data else "free" allowed = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("channels", []) if channel not in allowed: - label = {"telegram": "Starter", "whatsapp": "Business"}.get(channel, "a higher") + label = {"telegram": "Starter"}.get(channel, "a higher") raise HTTPException(status_code=402, detail=f"{channel.title()} channel requires {label} plan or higher") @@ -151,12 +112,6 @@ def _detect_language(text: str) -> str: return best if scores[best] > 0.25 else "en" -def _wa_link(keyword: str) -> str: - if settings.whatsapp_display_number: - return f"https://wa.me/{settings.whatsapp_display_number}?text=START+{keyword}" - return "" - - # ── CRUD endpoints ──────────────────────────────────────────────────────────── @router.get("") @@ -165,17 +120,11 @@ async def list_channels(chatbot_id: str = Query(...), user=Depends(get_current_u _verify_chatbot_ownership(chatbot_id, user.id, supabase) result = ( supabase.table("channel_connections") - .select("id,channel,bot_username,wa_keyword,is_active,created_at") + .select("id,channel,bot_username,is_active,created_at") .eq("chatbot_id", chatbot_id) .execute() ) - rows = result.data or [] - for row in rows: - if row["channel"] == "whatsapp" and row.get("wa_keyword"): - row["wa_link"] = _wa_link(row["wa_keyword"]) - else: - row["wa_link"] = None - return rows + return result.data or [] @router.post("/telegram") @@ -224,47 +173,6 @@ async def connect_telegram(data: TelegramConnectRequest, user=Depends(get_curren } -@router.post("/whatsapp") -async def connect_whatsapp(data: WhatsAppConnectRequest, user=Depends(get_current_user)): - supabase = get_supabase() - _verify_chatbot_ownership(data.chatbot_id, user.id, supabase) - _check_channel_plan(user.id, "whatsapp", supabase) - - chatbot = supabase.table("chatbots").select("name").eq("id", data.chatbot_id).execute() - chatbot_name = chatbot.data[0]["name"] if chatbot.data else "BOT" - - keyword = (data.wa_keyword or _generate_keyword(chatbot_name, data.chatbot_id, supabase)).upper() - keyword = re.sub(r"[^A-Z0-9]", "", keyword)[:12] - if not keyword: - raise HTTPException(status_code=400, detail="Keyword must contain letters or numbers") - - taken = ( - supabase.table("channel_connections") - .select("id").eq("channel", "whatsapp").eq("wa_keyword", keyword) - .neq("chatbot_id", data.chatbot_id).execute() - ) - if taken.data: - raise HTTPException(status_code=400, detail=f"Keyword '{keyword}' is already taken. Choose a different one.") - - existing = ( - supabase.table("channel_connections") - .select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "whatsapp").execute() - ) - conn_data = { - "chatbot_id": data.chatbot_id, - "channel": "whatsapp", - "wa_keyword": keyword, - "is_active": True, - } - if existing.data: - supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute() - else: - conn_data["id"] = str(uuid.uuid4()) - supabase.table("channel_connections").insert(conn_data).execute() - - return {"success": True, "keyword": keyword, "wa_link": _wa_link(keyword)} - - @router.delete("/{connection_id}") async def disconnect_channel(connection_id: str, user=Depends(get_current_user)): supabase = get_supabase() @@ -383,136 +291,3 @@ async def telegram_webhook(bot_token: str, request: Request): return {"ok": True} -# ── WhatsApp webhooks ───────────────────────────────────────────────────────── - -@webhook_router.get("/whatsapp") -async def whatsapp_verify(request: Request): - """Meta webhook verification challenge.""" - params = dict(request.query_params) - if ( - params.get("hub.mode") == "subscribe" - and settings.whatsapp_verify_token - and params.get("hub.verify_token") == settings.whatsapp_verify_token - ): - return Response(content=params["hub.challenge"], media_type="text/plain") - raise HTTPException(status_code=403, detail="Forbidden") - - -@webhook_router.post("/whatsapp") -async def whatsapp_webhook(request: Request): - """Receive messages from WhatsApp Cloud API.""" - raw_body = await request.body() - - if settings.whatsapp_app_secret: - sig = request.headers.get("X-Hub-Signature-256", "") - if not verify_signature(raw_body, sig, settings.whatsapp_app_secret): - raise HTTPException(status_code=403, detail="Invalid signature") - - try: - body = json.loads(raw_body) - value = body["entry"][0]["changes"][0]["value"] - if "messages" not in value: - return {"ok": True} - msg = value["messages"][0] - if msg.get("type") != "text": - return {"ok": True} - from_number = msg["from"] - text = msg["text"]["body"].strip() - except (KeyError, IndexError, json.JSONDecodeError): - return {"ok": True} - - supabase = get_supabase() - - async def _wa_reply(to: str, message: str): - if settings.whatsapp_access_token and settings.whatsapp_phone_number_id: - await wa_send(settings.whatsapp_phone_number_id, to, message, settings.whatsapp_access_token) - - # Handle START - if text.upper().startswith("START "): - keyword = re.sub(r"[^A-Z0-9]", "", text[6:].strip().upper()) - conn = ( - supabase.table("channel_connections") - .select("*").eq("channel", "whatsapp").eq("wa_keyword", keyword).eq("is_active", True).execute() - ) - if not conn.data: - await _wa_reply(from_number, f"Sorry, chatbot '{keyword}' not found. Use the link from the business you're contacting.") - return {"ok": True} - - chatbot_id = conn.data[0]["chatbot_id"] - chatbot_result = supabase.table("chatbots").select("name,welcome_message").eq("id", chatbot_id).execute() - chatbot = chatbot_result.data[0] if chatbot_result.data else {} - - _upsert_whatsapp_session(chatbot_id, from_number, str(uuid.uuid4()), supabase) - welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?" - await _wa_reply(from_number, welcome) - return {"ok": True} - - # Regular message — find active session - session_result = ( - supabase.table("channel_sessions") - .select("*").eq("channel", "whatsapp").eq("external_id", from_number).execute() - ) - if not session_result.data: - await _wa_reply(from_number, "To start chatting, use the WhatsApp link from the business you're trying to contact.") - return {"ok": True} - - session = session_result.data[0] - chatbot_id = session["chatbot_id"] - - # Check subscription still allows WhatsApp - if not _check_chatbot_channel_subscription(chatbot_id, "whatsapp", supabase): - await _wa_reply(from_number, "This service is currently unavailable. Please contact the business directly.") - return {"ok": True} - - chatbot_result = ( - supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute() - ) - if not chatbot_result.data: - return {"ok": True} - chatbot = chatbot_result.data[0] - - collection_name = chatbot.get("qdrant_collection_name") - if not collection_name: - await _wa_reply(from_number, "This chatbot isn't ready yet. Please try again later.") - return {"ok": True} - - detected_lang = _detect_language(text) - company_data = chatbot.get("companies", {}) or {} - conversation = _get_or_create_conversation( - chatbot_id=chatbot_id, - session_id=session["session_id"], - user_id=None, - language=detected_lang, - supabase=supabase, - ) - history = _get_conversation_history(conversation["id"], supabase) - chatbot_config = {**chatbot, "company_name": company_data.get("name", "")} - - try: - result = await rag_engine.process_query( - query=text, - collection_name=collection_name, - chatbot_config=chatbot_config, - conversation_history=history, - language=detected_lang, - ) - except Exception as e: - logger.error(f"WhatsApp RAG error for chatbot {chatbot_id}: {e}") - await _wa_reply(from_number, "Sorry, I encountered an error. Please try again.") - return {"ok": True} - - confidence_score = max((s.score for s in result.get("sources", [])), default=0.0) - _save_message(conversation["id"], "user", text, supabase) - _save_message( - conversation["id"], "assistant", result["response"], supabase, - sources=[s.model_dump() for s in result.get("sources", [])], - model=result.get("model", ""), - confidence_score=confidence_score, - ) - supabase.table("conversations").update( - {"message_count": len(history) + 2} - ).eq("id", conversation["id"]).execute() - supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute() - - await _wa_reply(from_number, result["response"]) - return {"ok": True} diff --git a/app/routers/chat.py b/app/routers/chat.py index 3a7540f..1b448b0 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -124,6 +124,19 @@ async def chat( # Get conversation history history = _get_conversation_history(conversation["id"], supabase) + # If an agent has taken over this conversation, stop the bot from responding + conv_status = conversation.get("status", "open") + if conv_status == "agent_handling": + return ChatResponse( + response="", + session_id=session_id, + sources=[], + model_used="", + tokens_used=0, + needs_lead_capture=False, + handoff=False, + ) + # Get company info for context company_data = chatbot.get("companies", {}) or {} chatbot_config = { @@ -131,6 +144,19 @@ async def chat( "company_name": company_data.get("name", ""), } + # If booking is enabled, inject a note into the system prompt so the bot + # can guide users to the booking page + if chatbot.get("booking_enabled"): + from app.config import settings as _cfg + booking_url = f"{_cfg.app_url}/book/{chatbot_id}" + booking_note = ( + f"\n\nAppointment booking: This business accepts appointments online. " + f"If the user wants to book an appointment, meeting, or consultation, " + f"provide them this booking link: {booking_url}" + ) + existing_prompt = chatbot_config.get("system_prompt") or "" + chatbot_config["system_prompt"] = existing_prompt + booking_note + # Run RAG result = await rag_engine.process_query( query=message.message, diff --git a/app/routers/chatbots.py b/app/routers/chatbots.py index 77c2711..cb0d52b 100644 --- a/app/routers/chatbots.py +++ b/app/routers/chatbots.py @@ -5,6 +5,7 @@ from app.models import ( from app.database import get_supabase from app.dependencies import get_current_user, get_user_subscription from app.services.vector_store import vector_store +from app.services.storage import delete_from_storage from app.config import PLAN_LIMITS from typing import List import uuid @@ -83,8 +84,20 @@ async def create_chatbot(data: ChatbotCreate, user=Depends(get_current_user)): "handoff_keywords": data.handoff_keywords, } - result = supabase.table("chatbots").insert(chatbot_data).execute() - if not result.data: + try: + result = supabase.table("chatbots").insert(chatbot_data).execute() + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create chatbot") + except HTTPException: + raise + except Exception as e: + # Cleanup orphaned Qdrant collection if DB insert failed + if collection_name: + try: + vector_store.delete_collection(collection_name) + logger.warning(f"Cleaned up orphaned Qdrant collection {collection_name} after DB failure") + except Exception: + pass raise HTTPException(status_code=500, detail="Failed to create chatbot") return _format_chatbot(result.data[0], supabase) @@ -139,7 +152,11 @@ async def delete_chatbot(chatbot_id: str, user=Depends(get_current_user)): try: vector_store.delete_collection(chatbot["qdrant_collection_name"]) except Exception as e: - logger.warning(f"Failed to delete collection: {e}") + logger.warning(f"Failed to delete Qdrant collection: {e}") + + # Delete logo from Supabase Storage + if chatbot.get("logo_url"): + delete_from_storage(supabase, "logos", chatbot["logo_url"]) supabase.table("chatbots").delete().eq("id", chatbot_id).execute() return SuccessResponse(success=True, message="Chatbot deleted") @@ -303,4 +320,5 @@ def _format_chatbot(chatbot: dict, supabase) -> ChatbotResponse: handoff_message=chatbot.get("handoff_message", "I'll connect you with our team. Please wait."), handoff_email=chatbot.get("handoff_email"), handoff_keywords=chatbot.get("handoff_keywords") or ["human", "agent", "speak to someone", "talk to a person", "real person"], + booking_enabled=bool(chatbot.get("booking_enabled")), ) \ No newline at end of file diff --git a/app/routers/documents.py b/app/routers/documents.py index 8749f41..1cad3e8 100644 --- a/app/routers/documents.py +++ b/app/routers/documents.py @@ -5,6 +5,7 @@ from app.dependencies import get_current_user from app.services.document_processor import process_document from app.services.embeddings import embedding_service from app.services.vector_store import vector_store +from app.services.storage import delete_from_storage, extract_storage_path from app.config import settings from typing import List import uuid @@ -205,10 +206,72 @@ async def delete_document(chatbot_id: str, document_id: str, user=Depends(get_cu except Exception as e: logger.warning(f"Failed to delete vectors: {e}") + # Delete file from Supabase Storage + if doc.data[0].get("file_url"): + delete_from_storage(supabase, "documents", doc.data[0]["file_url"]) + supabase.table("documents").delete().eq("id", document_id).execute() return SuccessResponse(success=True, message="Document deleted") +@router.post("/{document_id}/retry", response_model=DocumentResponse) +async def retry_document_processing( + chatbot_id: str, + document_id: str, + background_tasks: BackgroundTasks, + user=Depends(get_current_user), +): + """Retry processing a failed document.""" + supabase = get_supabase() + chatbot = _get_user_chatbot(chatbot_id, user.id, supabase) + + doc = supabase.table("documents").select("*").eq("id", document_id).eq("chatbot_id", chatbot_id).execute() + if not doc.data: + raise HTTPException(status_code=404, detail="Document not found") + + document = doc.data[0] + if document.get("status") != "failed": + raise HTTPException(status_code=400, detail="Only failed documents can be retried") + + file_url = document.get("file_url") + if not file_url: + raise HTTPException( + status_code=400, + detail="No file URL stored. Please re-upload this document." + ) + + # Download file from storage + try: + path = extract_storage_path(file_url, "documents") + if not path: + raise HTTPException(status_code=400, detail="Cannot locate file in storage. Please re-upload.") + file_bytes = supabase.storage.from_("documents").download(path) + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to download document {document_id} for retry: {e}") + raise HTTPException(status_code=400, detail="Cannot retrieve file from storage. Please re-upload.") + + # Reset status to processing + result = supabase.table("documents").update({ + "status": "processing", + "error_message": None, + "chunk_count": 0, + }).eq("id", document_id).execute() + + # Re-enqueue background processing + background_tasks.add_task( + _process_document_bg, + file_bytes=file_bytes, + file_name=document["file_name"], + doc_id=document_id, + chatbot=chatbot, + supabase=supabase, + ) + + return DocumentResponse(**result.data[0]) + + # ── URL Sources ─────────────────────────────────────────────────────────────── @url_router.get("", response_model=List[UrlSourceResponse]) diff --git a/app/routers/inbox.py b/app/routers/inbox.py index e75c50e..1b57ef9 100644 --- a/app/routers/inbox.py +++ b/app/routers/inbox.py @@ -2,8 +2,9 @@ from fastapi import APIRouter, HTTPException, Depends, Query from app.database import get_supabase from app.dependencies import get_current_user from app.config import PLAN_LIMITS -from app.models import InboxConversation, InboxMessage +from app.models import InboxConversation, InboxMessage, ConversationStatusUpdate, AgentReplyCreate from typing import List, Optional +import uuid import logging logger = logging.getLogger(__name__) @@ -79,6 +80,8 @@ async def list_inbox_conversations( language=conv.get("language", "en"), message_count=conv.get("message_count", 0), first_message=first_message_text, + status=conv.get("status", "open"), + last_agent_reply_at=conv.get("last_agent_reply_at"), created_at=conv.get("created_at"), )) @@ -137,6 +140,67 @@ async def get_inbox_conversation( } +@router.patch("/conversations/{conversation_id}/status") +async def update_conversation_status( + conversation_id: str, + data: ConversationStatusUpdate, + user=Depends(get_current_user), +): + """Update conversation status (open, agent_handling, resolved).""" + if data.status not in ("open", "agent_handling", "resolved"): + raise HTTPException(status_code=400, detail="Invalid status") + + supabase = get_supabase() + _check_inbox_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + conv = supabase.table("conversations").select("*, chatbots(company_id)") \ + .eq("id", conversation_id).execute() + if not conv.data: + raise HTTPException(status_code=404, detail="Conversation not found") + if conv.data[0].get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + supabase.table("conversations").update({"status": data.status}).eq("id", conversation_id).execute() + return {"success": True, "status": data.status} + + +@router.post("/conversations/{conversation_id}/reply") +async def agent_reply( + conversation_id: str, + data: AgentReplyCreate, + user=Depends(get_current_user), +): + """Send an agent reply to a conversation.""" + supabase = get_supabase() + _check_inbox_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + conv = supabase.table("conversations").select("*, chatbots(company_id)") \ + .eq("id", conversation_id).execute() + if not conv.data: + raise HTTPException(status_code=404, detail="Conversation not found") + if conv.data[0].get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + msg_id = str(uuid.uuid4()) + supabase.table("messages").insert({ + "id": msg_id, + "conversation_id": conversation_id, + "role": "agent", + "content": data.message, + }).execute() + + # Mark as agent_handling if not already, and record reply time + current_status = conv.data[0].get("status", "open") + update_data: dict = {"last_agent_reply_at": "now()"} + if current_status == "open": + update_data["status"] = "agent_handling" + supabase.table("conversations").update(update_data).eq("id", conversation_id).execute() + + return {"success": True, "message_id": msg_id} + + @router.delete("/conversations/{conversation_id}") async def delete_inbox_conversation( conversation_id: str, diff --git a/app/routers/leads.py b/app/routers/leads.py index 625dd9e..e1bd613 100644 --- a/app/routers/leads.py +++ b/app/routers/leads.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, HTTPException, Depends, Query from fastapi.responses import StreamingResponse from app.database import get_supabase from app.dependencies import get_current_user -from app.models import LeadCreate, LeadResponse +from app.models import LeadCreate, LeadResponse, LeadUpdate from typing import List, Optional import uuid import csv @@ -60,6 +60,40 @@ async def list_leads( return [LeadResponse(**lead) for lead in (result.data or [])] +@router.patch("/{lead_id}", response_model=LeadResponse) +async def update_lead( + lead_id: str, + data: LeadUpdate, + user=Depends(get_current_user), +): + """Update lead status or notes.""" + supabase = get_supabase() + _check_leads_access(user.id, supabase) + company_id = _get_user_company_id(user.id, supabase) + + lead = supabase.table("leads").select("*, chatbots(company_id)") \ + .eq("id", lead_id).execute() + if not lead.data: + raise HTTPException(status_code=404, detail="Lead not found") + if lead.data[0].get("chatbots", {}).get("company_id") != company_id: + raise HTTPException(status_code=403, detail="Not authorized") + + update_fields: dict = {} + if data.status is not None: + valid_statuses = ("new", "contacted", "qualified", "closed", "lost") + if data.status not in valid_statuses: + raise HTTPException(status_code=400, detail=f"Status must be one of {valid_statuses}") + update_fields["status"] = data.status + if data.notes is not None: + update_fields["notes"] = data.notes + + if not update_fields: + return LeadResponse(**lead.data[0]) + + result = supabase.table("leads").update(update_fields).eq("id", lead_id).execute() + return LeadResponse(**result.data[0]) + + @router.get("/export") async def export_leads_csv( chatbot_id: Optional[str] = Query(None), diff --git a/app/routers/models.py b/app/routers/models.py index 9a26158..bf38544 100644 --- a/app/routers/models.py +++ b/app/routers/models.py @@ -96,7 +96,7 @@ async def get_available_models(user=Depends(get_current_user)): if plan == "free": upgrade_label = "Upgrade to Starter for more AI models and messaging channels" elif plan == "starter": - upgrade_label = "Upgrade to Business for GPT-4o, Claude, Gemini and WhatsApp" + upgrade_label = "Upgrade to Business for GPT-4o, Claude, and Gemini" return ModelsResponse( models=models, diff --git a/app/services/rag.py b/app/services/rag.py index 8fe5b2a..3c05817 100644 --- a/app/services/rag.py +++ b/app/services/rag.py @@ -7,7 +7,7 @@ import logging logger = logging.getLogger(__name__) -RAG_SYSTEM_PROMPT = """You are a helpful AI assistant for {company_name}. +RAG_SYSTEM_PROMPT = """You are a helpful AI assistant for {company_name}. Your role is to answer questions based on the provided context from company documents. IMPORTANT RULES: @@ -16,13 +16,20 @@ IMPORTANT RULES: 3. Be concise and helpful 4. Always maintain a professional, friendly tone 5. If asked about topics completely outside the context, politely redirect to relevant topics - +{language_instruction} {custom_instructions} Context from knowledge base: {context} """ +LANGUAGE_NAMES = { + "en": "English", "fr": "French", "es": "Spanish", "de": "German", + "it": "Italian", "pt": "Portuguese", "ar": "Arabic", "zh": "Chinese", + "ja": "Japanese", "ko": "Korean", "ru": "Russian", "nl": "Dutch", + "tr": "Turkish", "pl": "Polish", "vi": "Vietnamese", "th": "Thai", +} + class RAGEngine: def __init__(self): @@ -102,8 +109,15 @@ class RAGEngine: logger.warning(f"[RAG] No context found for query: '{query}' in collection '{collection_name}'") # Step 4: Build messages + lang_name = LANGUAGE_NAMES.get(language, "English") if language and language != "en" else "" + language_instruction = ( + f"\n6. Respond in {lang_name}. Match the language of the user's message." + if lang_name else "" + ) + system_prompt = RAG_SYSTEM_PROMPT.format( company_name=chatbot_config.get("company_name", ""), + language_instruction=language_instruction, custom_instructions=chatbot_config.get("system_prompt") or "", context=context, ) diff --git a/app/services/storage.py b/app/services/storage.py new file mode 100644 index 0000000..899c137 --- /dev/null +++ b/app/services/storage.py @@ -0,0 +1,46 @@ +""" +Supabase Storage helper utilities. +Used to delete files from storage buckets when deleting documents or chatbots. +""" +import logging +from typing import Optional +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +def extract_storage_path(url: str, bucket: str) -> Optional[str]: + """ + Extract the file path from a Supabase Storage public URL. + + URL format: {supabase_url}/storage/v1/object/public/{bucket}/{path} + Returns the path portion after the bucket name, or None if not parseable. + """ + if not url: + return None + try: + parsed = urlparse(url) + prefix = f"/storage/v1/object/public/{bucket}/" + if prefix in parsed.path: + idx = parsed.path.index(prefix) + len(prefix) + return parsed.path[idx:] + except Exception as e: + logger.warning(f"Failed to extract storage path from URL '{url}': {e}") + return None + + +def delete_from_storage(supabase, bucket: str, url: str) -> bool: + """ + Delete a file from a Supabase Storage bucket given its public URL. + Returns True on success, False if the URL couldn't be parsed or deletion failed. + """ + path = extract_storage_path(url, bucket) + if not path: + return False + try: + supabase.storage.from_(bucket).remove([path]) + logger.info(f"Deleted storage file: {bucket}/{path}") + return True + except Exception as e: + logger.warning(f"Failed to delete storage file {bucket}/{path}: {e}") + return False diff --git a/app/services/whatsapp_service.py b/app/services/whatsapp_service.py deleted file mode 100644 index 708a850..0000000 --- a/app/services/whatsapp_service.py +++ /dev/null @@ -1,36 +0,0 @@ -import httpx -import hashlib -import hmac -import logging - -logger = logging.getLogger(__name__) - -_META_API = "https://graph.facebook.com/v19.0" - - -async def send_message(phone_number_id: str, to: str, text: str, access_token: str) -> bool: - try: - async with httpx.AsyncClient(timeout=10) as client: - r = await client.post( - f"{_META_API}/{phone_number_id}/messages", - headers={"Authorization": f"Bearer {access_token}"}, - json={ - "messaging_product": "whatsapp", - "to": to, - "type": "text", - "text": {"body": text}, - }, - ) - return r.status_code == 200 - except Exception as e: - logger.error(f"WhatsApp send error: {e}") - return False - - -def verify_signature(payload: bytes, signature: str, app_secret: str) -> bool: - expected = "sha256=" + hmac.new( - app_secret.encode("utf-8"), - payload, - hashlib.sha256, - ).hexdigest() - return hmac.compare_digest(expected, signature) diff --git a/app/services/widget.py b/app/services/widget.py index 9cb9906..9942611 100644 --- a/app/services/widget.py +++ b/app/services/widget.py @@ -1,140 +1,182 @@ -def generate_widget_js(app_url: str) -> str: - """Generate the embeddable widget JavaScript with app_url baked in.""" - return f""" -(function() {{ - var APP_URL = "{app_url}"; - - // Find script tag to get chatbot ID - var scripts = document.querySelectorAll('script[data-chatbot]'); - var chatbotId = null; - if (scripts.length > 0) {{ - chatbotId = scripts[scripts.length - 1].getAttribute('data-chatbot'); - }} - if (!chatbotId) {{ - console.warn('[Contexta] No data-chatbot attribute found on script tag'); - return; - }} - - // Styles - var style = document.createElement('style'); - style.textContent = ` - .contexta-btn {{ - position: fixed; - bottom: 24px; - right: 24px; - width: 56px; - height: 56px; - border-radius: 50%; - background: #6366f1; - border: none; - cursor: pointer; - box-shadow: 0 4px 12px rgba(0,0,0,0.2); - display: flex; - align-items: center; - justify-content: center; - z-index: 999998; - transition: transform 0.2s, box-shadow 0.2s; - }} - .contexta-btn:hover {{ - transform: scale(1.08); - box-shadow: 0 6px 20px rgba(0,0,0,0.25); - }} - .contexta-btn svg {{ - width: 26px; - height: 26px; - fill: white; - }} - .contexta-container {{ - position: fixed; - bottom: 92px; - right: 24px; - width: 380px; - height: 580px; - border-radius: 16px; - box-shadow: 0 8px 32px rgba(0,0,0,0.15); - z-index: 999999; - overflow: hidden; - display: none; - flex-direction: column; - border: 1px solid #e5e7eb; - }} - .contexta-container.open {{ - display: flex; - }} - .contexta-iframe {{ - width: 100%; - height: 100%; - border: none; - flex: 1; - }} - .contexta-close {{ - position: absolute; - top: 8px; - right: 8px; - background: rgba(0,0,0,0.3); - border: none; - color: white; - border-radius: 50%; - width: 24px; - height: 24px; - cursor: pointer; - font-size: 14px; - display: flex; - align-items: center; - justify-content: center; - z-index: 1; - }} - @media (max-width: 480px) {{ - .contexta-container {{ - bottom: 0; - right: 0; - width: 100vw; - height: 100vh; - border-radius: 0; - }} - }} - `; - document.head.appendChild(style); - - // Button - var btn = document.createElement('button'); - btn.className = 'contexta-btn'; - btn.setAttribute('aria-label', 'Open chat'); - btn.innerHTML = ''; - document.body.appendChild(btn); - - // Container with iframe - var container = document.createElement('div'); - container.className = 'contexta-container'; - - var closeBtn = document.createElement('button'); - closeBtn.className = 'contexta-close'; - closeBtn.innerHTML = '×'; - closeBtn.setAttribute('aria-label', 'Close chat'); - container.appendChild(closeBtn); - - var iframe = document.createElement('iframe'); - iframe.className = 'contexta-iframe'; - iframe.src = APP_URL + '/chat/' + chatbotId; - iframe.setAttribute('allow', 'microphone'); - container.appendChild(iframe); - - document.body.appendChild(container); - - // Toggle logic - var isOpen = false; - function toggle() {{ - isOpen = !isOpen; - if (isOpen) {{ - container.classList.add('open'); - btn.innerHTML = ''; - }} else {{ - container.classList.remove('open'); - btn.innerHTML = ''; - }} - }} - - btn.addEventListener('click', toggle); - closeBtn.addEventListener('click', toggle); -}})(); """ +Widget JS generator. + +Produces a self-contained, framework-agnostic JavaScript bundle served at +GET /widget.js. Embed on any page with: + + + +Works on vanilla HTML, WordPress, Webflow, Shopify, Next.js (_document), +and any framework where you control the HTML shell. + +For React/Vue projects that want a native component, host-side devs can call + window.Contexta.open() / .close() / .toggle() +from their own button, or await the dedicated npm package (@contexta/widget). + +Design decisions +---------------- +- All CSS is ID-scoped (#ctxa-*) to avoid colliding with host-page styles. +- The iframe src is set lazily on first open — zero network cost until use. +- document.currentScript is captured synchronously (before any async code) + so it works even when the host page has many script tags. +- z-index 2147483647 is the highest a browser will honour. +- sandbox attribute restricts the iframe while still allowing forms/popups. +""" + +_TEMPLATE = r"""(function () { + 'use strict'; + + /* ── Double-init guard ─────────────────────────────────────────────── */ + if (window.__ctxa) return; + + /* ── Read chatbot ID from world", + ) + assert "