Files
contexta_be/app/routers/campaigns.py
belviskhoremk 92d4c2fc5e feat: add appointments, campaigns, admin, storage, tests and various updates
- Add new routers: admin, appointments, campaigns
- Add storage service and logging config
- Add migrations directory and test suite with pytest config
- Add supabase_migration_features.sql
- Update models, dependencies, config, and existing routers
- Remove whatsapp_service (deleted)
- Update pyproject.toml and uv.lock dependencies

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 09:11:58 +00:00

168 lines
7.1 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Query
from app.database import get_supabase
from app.dependencies import get_current_user
from app.models import CampaignCreate, CampaignResponse
from typing import List, Optional
import uuid
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/campaigns", tags=["Campaigns"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def _check_campaigns_access(user_id: str, supabase):
sub = supabase.table("subscriptions").select("plan").eq("user_id", user_id).eq("status", "active").execute()
plan = sub.data[0]["plan"] if sub.data else "free"
if plan not in ("starter", "business", "agency", "enterprise"):
raise HTTPException(status_code=402, detail="Campaigns require Starter plan or higher")
return plan
def _get_user_company_id(user_id: str, supabase) -> str:
result = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Company not found")
return result.data[0]["id"]
def _verify_chatbot_ownership(chatbot_id: str, company_id: str, supabase):
chatbot = supabase.table("chatbots").select("id").eq("id", chatbot_id).eq("company_id", company_id).execute()
if not chatbot.data:
raise HTTPException(status_code=404, detail="Chatbot not found")
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("", response_model=List[CampaignResponse])
async def list_campaigns(
chatbot_id: Optional[str] = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
user=Depends(get_current_user),
):
supabase = get_supabase()
_check_campaigns_access(user.id, supabase)
company_id = _get_user_company_id(user.id, supabase)
chatbots_q = supabase.table("chatbots").select("id").eq("company_id", company_id)
if chatbot_id:
chatbots_q = chatbots_q.eq("id", chatbot_id)
chatbots = chatbots_q.execute()
chatbot_ids = [c["id"] for c in (chatbots.data or [])]
if not chatbot_ids:
return []
offset = (page - 1) * limit
result = supabase.table("campaigns").select("*").in_("chatbot_id", chatbot_ids) \
.order("created_at", desc=True).range(offset, offset + limit - 1).execute()
return [CampaignResponse(**c) for c in (result.data or [])]
@router.post("", response_model=CampaignResponse, status_code=201)
async def create_campaign(data: CampaignCreate, user=Depends(get_current_user)):
supabase = get_supabase()
_check_campaigns_access(user.id, supabase)
company_id = _get_user_company_id(user.id, supabase)
_verify_chatbot_ownership(data.chatbot_id, company_id, supabase)
# Count Telegram subscribers for this chatbot
subscribers = supabase.table("channel_sessions").select("id", count="exact") \
.eq("chatbot_id", data.chatbot_id).eq("channel", "telegram").execute()
recipients_count = subscribers.count or 0
campaign_data = {
"id": str(uuid.uuid4()),
"chatbot_id": data.chatbot_id,
"title": data.title,
"message": data.message,
"status": "draft",
"recipients_count": recipients_count,
"sent_count": 0,
}
result = supabase.table("campaigns").insert(campaign_data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to create campaign")
return CampaignResponse(**result.data[0])
@router.post("/{campaign_id}/send", response_model=CampaignResponse)
async def send_campaign(campaign_id: str, user=Depends(get_current_user)):
"""Broadcast the campaign message to all Telegram subscribers of the chatbot."""
supabase = get_supabase()
_check_campaigns_access(user.id, supabase)
company_id = _get_user_company_id(user.id, supabase)
campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \
.eq("id", campaign_id).execute()
if not campaign.data:
raise HTTPException(status_code=404, detail="Campaign not found")
c = campaign.data[0]
if c.get("chatbots", {}).get("company_id") != company_id:
raise HTTPException(status_code=403, detail="Not authorized")
if c["status"] == "sent":
raise HTTPException(status_code=400, detail="Campaign already sent")
chatbot_id = c["chatbot_id"]
# Get the Telegram bot token for this chatbot
conn = supabase.table("channel_connections").select("bot_token") \
.eq("chatbot_id", chatbot_id).eq("channel", "telegram").eq("is_active", True).execute()
if not conn.data or not conn.data[0].get("bot_token"):
raise HTTPException(status_code=400, detail="No active Telegram connection for this chatbot")
bot_token = conn.data[0]["bot_token"]
# Get all Telegram subscribers (channel_sessions)
sessions = supabase.table("channel_sessions").select("external_id") \
.eq("chatbot_id", chatbot_id).eq("channel", "telegram").execute()
subscribers = sessions.data or []
# Mark as sending
supabase.table("campaigns").update({"status": "sending"}).eq("id", campaign_id).execute()
# Broadcast
from app.services.telegram_service import send_message as tg_send
sent = 0
for sub in subscribers:
try:
# external_id format is "tg:{token_prefix}:{chat_id}"
parts = sub["external_id"].split(":")
if len(parts) >= 3:
chat_id = int(parts[2])
await tg_send(bot_token, chat_id, c["message"])
sent += 1
except Exception as e:
logger.warning(f"Failed to send campaign to {sub['external_id']}: {e}")
# Mark as sent
from datetime import datetime, timezone
supabase.table("campaigns").update({
"status": "sent",
"sent_count": sent,
"recipients_count": len(subscribers),
"sent_at": datetime.now(timezone.utc).isoformat(),
}).eq("id", campaign_id).execute()
updated = supabase.table("campaigns").select("*").eq("id", campaign_id).execute()
return CampaignResponse(**updated.data[0])
@router.delete("/{campaign_id}")
async def delete_campaign(campaign_id: str, user=Depends(get_current_user)):
supabase = get_supabase()
_check_campaigns_access(user.id, supabase)
company_id = _get_user_company_id(user.id, supabase)
campaign = supabase.table("campaigns").select("*, chatbots(company_id)") \
.eq("id", campaign_id).execute()
if not campaign.data:
raise HTTPException(status_code=404, detail="Campaign not found")
if campaign.data[0].get("chatbots", {}).get("company_id") != company_id:
raise HTTPException(status_code=403, detail="Not authorized")
if campaign.data[0]["status"] == "sending":
raise HTTPException(status_code=400, detail="Cannot delete a campaign that is currently sending")
supabase.table("campaigns").delete().eq("id", campaign_id).execute()
return {"success": True}