import uuid import logging from fastapi import APIRouter, Depends, HTTPException, Query, Request from pydantic import BaseModel from typing import Optional from app.config import settings, PLAN_LIMITS from app.database import get_supabase from app.dependencies import get_current_user from app.services.rag import rag_engine from app.services.telegram_service import ( delete_webhook, get_bot_info, send_message as tg_send, set_webhook, ) from app.routers.chat import ( _get_conversation_history, _get_or_create_conversation, _save_message, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/channels", tags=["Channels"]) webhook_router = APIRouter(prefix="/webhooks", tags=["Webhooks"]) # ── Request models ──────────────────────────────────────────────────────────── class TelegramConnectRequest(BaseModel): chatbot_id: str bot_token: str # ── Helpers ─────────────────────────────────────────────────────────────────── def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase): company = supabase.table("companies").select("id").eq("owner_id", user_id).execute() if not company.data: raise HTTPException(status_code=404, detail="Company not found") chatbot = ( supabase.table("chatbots").select("id") .eq("id", chatbot_id) .eq("company_id", company.data[0]["id"]) .execute() ) if not chatbot.data: raise HTTPException(status_code=404, detail="Chatbot not found") def _get_or_create_channel_session( chatbot_id: str, channel: str, external_id: str, supabase ) -> dict: existing = ( supabase.table("channel_sessions") .select("*").eq("channel", channel).eq("external_id", external_id).execute() ) if existing.data: return existing.data[0] row = { "id": str(uuid.uuid4()), "chatbot_id": chatbot_id, "channel": channel, "external_id": external_id, "session_id": str(uuid.uuid4()), } result = supabase.table("channel_sessions").insert(row).execute() return result.data[0] def _check_channel_plan(user_id: str, channel: str, supabase): """Raise 402 if the user's plan doesn't include the requested channel.""" sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute() plan = sub.data[0]["plan"] if sub.data else "free" allowed = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("channels", []) if channel not in allowed: label = {"telegram": "Starter"}.get(channel, "a higher") raise HTTPException(status_code=402, detail=f"{channel.title()} channel requires {label} plan or higher") def _check_chatbot_channel_subscription(chatbot_id: str, channel: str, supabase) -> bool: """Return False (drop message) if the chatbot owner's plan no longer includes this channel.""" chatbot = supabase.table("chatbots").select("company_id").eq("id", chatbot_id).execute() if not chatbot.data: return False company = supabase.table("companies").select("owner_id").eq("id", chatbot.data[0]["company_id"]).execute() if not company.data: return False owner_id = company.data[0]["owner_id"] sub = supabase.table("subscriptions").select("plan, status").eq("user_id", owner_id).execute() if not sub.data: return False row = sub.data[0] if row.get("status") != "active": return False allowed = PLAN_LIMITS.get(row.get("plan", "free"), PLAN_LIMITS["free"]).get("channels", []) return channel in allowed def _detect_language(text: str) -> str: """Simple script-based language detection for channel messages.""" sample = text[:200] total = sum(1 for c in sample if not c.isspace()) if total == 0: return "en" scores = { "ar": sum(1 for c in sample if "\u0600" <= c <= "\u06FF") / total, "zh": sum(1 for c in sample if "\u4E00" <= c <= "\u9FFF") / total, "ja": sum(1 for c in sample if "\u3040" <= c <= "\u30FF") / total, "he": sum(1 for c in sample if "\u0590" <= c <= "\u05FF") / total, "ru": sum(1 for c in sample if "\u0400" <= c <= "\u04FF") / total, "fr": 0.0, "es": 0.0, # Latin-based — can't detect from scripts alone } best = max(scores, key=scores.get) return best if scores[best] > 0.25 else "en" # ── CRUD endpoints ──────────────────────────────────────────────────────────── @router.get("") async def list_channels(chatbot_id: str = Query(...), user=Depends(get_current_user)): supabase = get_supabase() _verify_chatbot_ownership(chatbot_id, user.id, supabase) result = ( supabase.table("channel_connections") .select("id,channel,bot_username,is_active,created_at") .eq("chatbot_id", chatbot_id) .execute() ) return result.data or [] @router.post("/telegram") async def connect_telegram(data: TelegramConnectRequest, user=Depends(get_current_user)): supabase = get_supabase() _verify_chatbot_ownership(data.chatbot_id, user.id, supabase) _check_channel_plan(user.id, "telegram", supabase) bot_info = await get_bot_info(data.bot_token) if not bot_info: raise HTTPException( status_code=400, detail="Invalid bot token. Check your token from @BotFather.", ) if not settings.api_url: raise HTTPException(status_code=500, detail="API_URL not configured on the server") webhook_url = f"{settings.api_url.rstrip('/')}/api/v1/webhooks/telegram/{data.bot_token}" ok = await set_webhook(data.bot_token, webhook_url) if not ok: raise HTTPException(status_code=500, detail="Failed to register webhook with Telegram") existing = ( supabase.table("channel_connections") .select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute() ) conn_data = { "chatbot_id": data.chatbot_id, "channel": "telegram", "bot_token": data.bot_token, "bot_username": bot_info.get("username", ""), "is_active": True, } if existing.data: supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute() else: conn_data["id"] = str(uuid.uuid4()) supabase.table("channel_connections").insert(conn_data).execute() username = bot_info.get("username", "") return { "success": True, "bot_username": username, "bot_link": f"https://t.me/{username}", } @router.delete("/{connection_id}") async def disconnect_channel(connection_id: str, user=Depends(get_current_user)): supabase = get_supabase() conn = supabase.table("channel_connections").select("*").eq("id", connection_id).execute() if not conn.data: raise HTTPException(status_code=404, detail="Connection not found") c = conn.data[0] _verify_chatbot_ownership(c["chatbot_id"], user.id, supabase) if c["channel"] == "telegram" and c.get("bot_token"): try: await delete_webhook(c["bot_token"]) except Exception: pass supabase.table("channel_connections").delete().eq("id", connection_id).execute() return {"success": True} # ── Telegram webhook ────────────────────────────────────────────────────────── @webhook_router.post("/telegram/{bot_token}") async def telegram_webhook(bot_token: str, request: Request): try: body = await request.json() except Exception: return {"ok": True} message = body.get("message") or body.get("edited_message") if not message or "text" not in message: return {"ok": True} chat_id = message["chat"]["id"] text = message["text"].strip() supabase = get_supabase() conn = ( supabase.table("channel_connections") .select("*").eq("channel", "telegram").eq("bot_token", bot_token).eq("is_active", True).execute() ) if not conn.data: return {"ok": True} chatbot_id = conn.data[0]["chatbot_id"] # Check subscription still allows Telegram if not _check_chatbot_channel_subscription(chatbot_id, "telegram", supabase): await tg_send(bot_token, chat_id, "This service is currently unavailable. Please contact the business directly.") return {"ok": True} chatbot_result = ( supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute() ) if not chatbot_result.data: return {"ok": True} chatbot = chatbot_result.data[0] if not chatbot.get("is_published"): await tg_send(bot_token, chat_id, "This chatbot is not yet published.") return {"ok": True} collection_name = chatbot.get("qdrant_collection_name") if not collection_name: await tg_send(bot_token, chat_id, "This chatbot has no knowledge base configured yet.") return {"ok": True} if text.startswith("/start"): welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?" await tg_send(bot_token, chat_id, welcome) return {"ok": True} if text == "/owner": supabase.table("channel_connections").update( {"owner_chat_id": str(chat_id)} ).eq("channel", "telegram").eq("bot_token", bot_token).execute() await tg_send(bot_token, chat_id, "✅ You're registered as the owner of this bot. You'll receive handoff alerts here when a visitor needs human support.") return {"ok": True} # Use first 8 chars of token as namespace to avoid collisions between bots external_id = f"tg:{bot_token[:8]}:{chat_id}" session = _get_or_create_channel_session(chatbot_id, "telegram", external_id, supabase) detected_lang = _detect_language(text) company_data = chatbot.get("companies", {}) or {} conversation = _get_or_create_conversation( chatbot_id=chatbot_id, session_id=session["session_id"], user_id=None, language=detected_lang, supabase=supabase, ) history = _get_conversation_history(conversation["id"], supabase) chatbot_config = {**chatbot, "company_name": company_data.get("name", "")} try: result = await rag_engine.process_query( query=text, collection_name=collection_name, chatbot_config=chatbot_config, conversation_history=history, language=detected_lang, ) except Exception as e: logger.error(f"Telegram RAG error for chatbot {chatbot_id}: {e}") await tg_send(bot_token, chat_id, "Sorry, I encountered an error. Please try again.") return {"ok": True} confidence_score = result.get("confidence_score", 0.0) _save_message(conversation["id"], "user", text, supabase) _save_message( conversation["id"], "assistant", result["response"], supabase, sources=[s.model_dump() for s in result.get("sources", [])], model=result.get("model", ""), confidence_score=confidence_score, ) supabase.table("conversations").update( {"message_count": len(history) + 2} ).eq("id", conversation["id"]).execute() supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute() await tg_send(bot_token, chat_id, result["response"]) return {"ok": True}