mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-13 08:45:24 +00:00
- Add new routers: admin, appointments, campaigns - Add storage service and logging config - Add migrations directory and test suite with pytest config - Add supabase_migration_features.sql - Update models, dependencies, config, and existing routers - Remove whatsapp_service (deleted) - Update pyproject.toml and uv.lock dependencies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
294 lines
11 KiB
Python
294 lines
11 KiB
Python
import uuid
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
|
|
from app.config import settings, PLAN_LIMITS
|
|
from app.database import get_supabase
|
|
from app.dependencies import get_current_user
|
|
from app.services.rag import rag_engine
|
|
from app.services.telegram_service import (
|
|
delete_webhook, get_bot_info, send_message as tg_send, set_webhook,
|
|
)
|
|
from app.routers.chat import (
|
|
_get_conversation_history, _get_or_create_conversation, _save_message,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/channels", tags=["Channels"])
|
|
webhook_router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
|
|
|
|
|
|
# ── Request models ────────────────────────────────────────────────────────────
|
|
|
|
class TelegramConnectRequest(BaseModel):
|
|
chatbot_id: str
|
|
bot_token: str
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _verify_chatbot_ownership(chatbot_id: str, user_id: str, supabase):
|
|
company = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
|
|
if not company.data:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
chatbot = (
|
|
supabase.table("chatbots").select("id")
|
|
.eq("id", chatbot_id)
|
|
.eq("company_id", company.data[0]["id"])
|
|
.execute()
|
|
)
|
|
if not chatbot.data:
|
|
raise HTTPException(status_code=404, detail="Chatbot not found")
|
|
|
|
|
|
def _get_or_create_channel_session(
|
|
chatbot_id: str, channel: str, external_id: str, supabase
|
|
) -> dict:
|
|
existing = (
|
|
supabase.table("channel_sessions")
|
|
.select("*").eq("channel", channel).eq("external_id", external_id).execute()
|
|
)
|
|
if existing.data:
|
|
return existing.data[0]
|
|
row = {
|
|
"id": str(uuid.uuid4()),
|
|
"chatbot_id": chatbot_id,
|
|
"channel": channel,
|
|
"external_id": external_id,
|
|
"session_id": str(uuid.uuid4()),
|
|
}
|
|
result = supabase.table("channel_sessions").insert(row).execute()
|
|
return result.data[0]
|
|
|
|
|
|
def _check_channel_plan(user_id: str, channel: str, supabase):
|
|
"""Raise 402 if the user's plan doesn't include the requested channel."""
|
|
sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute()
|
|
plan = sub.data[0]["plan"] if sub.data else "free"
|
|
allowed = PLAN_LIMITS.get(plan, PLAN_LIMITS["free"]).get("channels", [])
|
|
if channel not in allowed:
|
|
label = {"telegram": "Starter"}.get(channel, "a higher")
|
|
raise HTTPException(status_code=402, detail=f"{channel.title()} channel requires {label} plan or higher")
|
|
|
|
|
|
def _check_chatbot_channel_subscription(chatbot_id: str, channel: str, supabase) -> bool:
|
|
"""Return False (drop message) if the chatbot owner's plan no longer includes this channel."""
|
|
chatbot = supabase.table("chatbots").select("company_id").eq("id", chatbot_id).execute()
|
|
if not chatbot.data:
|
|
return False
|
|
company = supabase.table("companies").select("owner_id").eq("id", chatbot.data[0]["company_id"]).execute()
|
|
if not company.data:
|
|
return False
|
|
owner_id = company.data[0]["owner_id"]
|
|
sub = supabase.table("subscriptions").select("plan, status").eq("user_id", owner_id).execute()
|
|
if not sub.data:
|
|
return False
|
|
row = sub.data[0]
|
|
if row.get("status") != "active":
|
|
return False
|
|
allowed = PLAN_LIMITS.get(row.get("plan", "free"), PLAN_LIMITS["free"]).get("channels", [])
|
|
return channel in allowed
|
|
|
|
|
|
def _detect_language(text: str) -> str:
|
|
"""Simple script-based language detection for channel messages."""
|
|
sample = text[:200]
|
|
total = sum(1 for c in sample if not c.isspace())
|
|
if total == 0:
|
|
return "en"
|
|
scores = {
|
|
"ar": sum(1 for c in sample if "\u0600" <= c <= "\u06FF") / total,
|
|
"zh": sum(1 for c in sample if "\u4E00" <= c <= "\u9FFF") / total,
|
|
"ja": sum(1 for c in sample if "\u3040" <= c <= "\u30FF") / total,
|
|
"he": sum(1 for c in sample if "\u0590" <= c <= "\u05FF") / total,
|
|
"ru": sum(1 for c in sample if "\u0400" <= c <= "\u04FF") / total,
|
|
"fr": 0.0, "es": 0.0, # Latin-based — can't detect from scripts alone
|
|
}
|
|
best = max(scores, key=scores.get)
|
|
return best if scores[best] > 0.25 else "en"
|
|
|
|
|
|
# ── CRUD endpoints ────────────────────────────────────────────────────────────
|
|
|
|
@router.get("")
|
|
async def list_channels(chatbot_id: str = Query(...), user=Depends(get_current_user)):
|
|
supabase = get_supabase()
|
|
_verify_chatbot_ownership(chatbot_id, user.id, supabase)
|
|
result = (
|
|
supabase.table("channel_connections")
|
|
.select("id,channel,bot_username,is_active,created_at")
|
|
.eq("chatbot_id", chatbot_id)
|
|
.execute()
|
|
)
|
|
return result.data or []
|
|
|
|
|
|
@router.post("/telegram")
|
|
async def connect_telegram(data: TelegramConnectRequest, user=Depends(get_current_user)):
|
|
supabase = get_supabase()
|
|
_verify_chatbot_ownership(data.chatbot_id, user.id, supabase)
|
|
_check_channel_plan(user.id, "telegram", supabase)
|
|
|
|
bot_info = await get_bot_info(data.bot_token)
|
|
if not bot_info:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid bot token. Check your token from @BotFather.",
|
|
)
|
|
|
|
if not settings.api_url:
|
|
raise HTTPException(status_code=500, detail="API_URL not configured on the server")
|
|
|
|
webhook_url = f"{settings.api_url.rstrip('/')}/api/v1/webhooks/telegram/{data.bot_token}"
|
|
ok = await set_webhook(data.bot_token, webhook_url)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="Failed to register webhook with Telegram")
|
|
|
|
existing = (
|
|
supabase.table("channel_connections")
|
|
.select("id").eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute()
|
|
)
|
|
conn_data = {
|
|
"chatbot_id": data.chatbot_id,
|
|
"channel": "telegram",
|
|
"bot_token": data.bot_token,
|
|
"bot_username": bot_info.get("username", ""),
|
|
"is_active": True,
|
|
}
|
|
if existing.data:
|
|
supabase.table("channel_connections").update(conn_data).eq("id", existing.data[0]["id"]).execute()
|
|
else:
|
|
conn_data["id"] = str(uuid.uuid4())
|
|
supabase.table("channel_connections").insert(conn_data).execute()
|
|
|
|
username = bot_info.get("username", "")
|
|
return {
|
|
"success": True,
|
|
"bot_username": username,
|
|
"bot_link": f"https://t.me/{username}",
|
|
}
|
|
|
|
|
|
@router.delete("/{connection_id}")
|
|
async def disconnect_channel(connection_id: str, user=Depends(get_current_user)):
|
|
supabase = get_supabase()
|
|
conn = supabase.table("channel_connections").select("*").eq("id", connection_id).execute()
|
|
if not conn.data:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
c = conn.data[0]
|
|
_verify_chatbot_ownership(c["chatbot_id"], user.id, supabase)
|
|
|
|
if c["channel"] == "telegram" and c.get("bot_token"):
|
|
try:
|
|
await delete_webhook(c["bot_token"])
|
|
except Exception:
|
|
pass
|
|
|
|
supabase.table("channel_connections").delete().eq("id", connection_id).execute()
|
|
return {"success": True}
|
|
|
|
|
|
# ── Telegram webhook ──────────────────────────────────────────────────────────
|
|
|
|
@webhook_router.post("/telegram/{bot_token}")
|
|
async def telegram_webhook(bot_token: str, request: Request):
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return {"ok": True}
|
|
|
|
message = body.get("message") or body.get("edited_message")
|
|
if not message or "text" not in message:
|
|
return {"ok": True}
|
|
|
|
chat_id = message["chat"]["id"]
|
|
text = message["text"].strip()
|
|
|
|
supabase = get_supabase()
|
|
conn = (
|
|
supabase.table("channel_connections")
|
|
.select("*").eq("channel", "telegram").eq("bot_token", bot_token).eq("is_active", True).execute()
|
|
)
|
|
if not conn.data:
|
|
return {"ok": True}
|
|
|
|
chatbot_id = conn.data[0]["chatbot_id"]
|
|
|
|
# Check subscription still allows Telegram
|
|
if not _check_chatbot_channel_subscription(chatbot_id, "telegram", supabase):
|
|
await tg_send(bot_token, chat_id, "This service is currently unavailable. Please contact the business directly.")
|
|
return {"ok": True}
|
|
|
|
chatbot_result = (
|
|
supabase.table("chatbots").select("*, companies(name, logo_url)").eq("id", chatbot_id).execute()
|
|
)
|
|
if not chatbot_result.data:
|
|
return {"ok": True}
|
|
chatbot = chatbot_result.data[0]
|
|
|
|
if not chatbot.get("is_published"):
|
|
await tg_send(bot_token, chat_id, "This chatbot is not yet published.")
|
|
return {"ok": True}
|
|
|
|
collection_name = chatbot.get("qdrant_collection_name")
|
|
if not collection_name:
|
|
await tg_send(bot_token, chat_id, "This chatbot has no knowledge base configured yet.")
|
|
return {"ok": True}
|
|
|
|
if text.startswith("/start"):
|
|
welcome = chatbot.get("welcome_message") or f"Hello! I'm {chatbot.get('name', 'your assistant')}. How can I help you?"
|
|
await tg_send(bot_token, chat_id, welcome)
|
|
return {"ok": True}
|
|
|
|
# Use first 8 chars of token as namespace to avoid collisions between bots
|
|
external_id = f"tg:{bot_token[:8]}:{chat_id}"
|
|
session = _get_or_create_channel_session(chatbot_id, "telegram", external_id, supabase)
|
|
|
|
detected_lang = _detect_language(text)
|
|
company_data = chatbot.get("companies", {}) or {}
|
|
conversation = _get_or_create_conversation(
|
|
chatbot_id=chatbot_id,
|
|
session_id=session["session_id"],
|
|
user_id=None,
|
|
language=detected_lang,
|
|
supabase=supabase,
|
|
)
|
|
history = _get_conversation_history(conversation["id"], supabase)
|
|
chatbot_config = {**chatbot, "company_name": company_data.get("name", "")}
|
|
|
|
try:
|
|
result = await rag_engine.process_query(
|
|
query=text,
|
|
collection_name=collection_name,
|
|
chatbot_config=chatbot_config,
|
|
conversation_history=history,
|
|
language=detected_lang,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Telegram RAG error for chatbot {chatbot_id}: {e}")
|
|
await tg_send(bot_token, chat_id, "Sorry, I encountered an error. Please try again.")
|
|
return {"ok": True}
|
|
|
|
confidence_score = max((s.score for s in result.get("sources", [])), default=0.0)
|
|
_save_message(conversation["id"], "user", text, supabase)
|
|
_save_message(
|
|
conversation["id"], "assistant", result["response"], supabase,
|
|
sources=[s.model_dump() for s in result.get("sources", [])],
|
|
model=result.get("model", ""),
|
|
confidence_score=confidence_score,
|
|
)
|
|
supabase.table("conversations").update(
|
|
{"message_count": len(history) + 2}
|
|
).eq("id", conversation["id"]).execute()
|
|
supabase.table("channel_sessions").update({"last_active": "now()"}).eq("id", session["id"]).execute()
|
|
|
|
await tg_send(bot_token, chat_id, result["response"])
|
|
return {"ok": True}
|
|
|
|
|