"""Tests for channels and webhook endpoints.""" import pytest from unittest.mock import MagicMock, patch, AsyncMock AUTH = {"Authorization": "Bearer test-token"} def _make_channels_sb(plan="starter", company=True, chatbot=True, connection=None): """Build a supabase mock for channel tests.""" sb = MagicMock() def table_side(name): m = MagicMock() m.select.return_value = m m.insert.return_value = m m.update.return_value = m m.delete.return_value = m m.eq.return_value = m m.in_.return_value = m m.limit.return_value = m m.order.return_value = m m.range.return_value = m m.gte.return_value = m m.lt.return_value = m if name == "subscriptions": m.execute.return_value = MagicMock(data=[{"plan": plan, "status": "active"}]) elif name == "companies": m.execute.return_value = MagicMock( data=[{"id": "comp-1", "owner_id": "test-user-id"}] if company else [] ) elif name == "chatbots": m.execute.return_value = MagicMock( data=[{"id": "cb-1", "company_id": "comp-1", "qdrant_collection_name": "col-1", "is_published": True, "name": "Test Bot", "welcome_message": "Hi!", "companies": {"name": "Acme", "logo_url": None}}] if chatbot else [] ) elif name == "channel_connections": conn = connection if connection is not None else [] m.execute.return_value = MagicMock(data=conn) elif name == "channel_sessions": m.execute.return_value = MagicMock( data=[{"id": "sess-1", "session_id": "s-123", "chatbot_id": "cb-1", "channel": "telegram", "external_id": "tg:abc:12345"}] ) elif name == "conversations": m.execute.return_value = MagicMock(data=[{"id": "conv-1", "session_id": "s-123", "status": "open", "message_count": 0}]) elif name == "messages": m.execute.return_value = MagicMock(data=[], count=0) else: m.execute.return_value = MagicMock(data=[], count=0) return m sb.table.side_effect = table_side sb.auth = MagicMock() return sb class TestChannelsAuth: def test_list_channels_requires_auth(self, client): resp = client.get("/api/v1/channels?chatbot_id=cb-1") assert resp.status_code == 401 def test_connect_telegram_requires_auth(self, client): resp = client.post("/api/v1/channels/telegram", json={"chatbot_id": "cb-1", "bot_token": "fake:token"}) assert resp.status_code == 401 def test_disconnect_requires_auth(self, client): resp = client.delete("/api/v1/channels/conn-1") assert resp.status_code == 401 class TestListChannels: def test_returns_empty_list_when_no_channels(self, client): with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb() resp = client.get("/api/v1/channels?chatbot_id=cb-1", headers=AUTH) assert resp.status_code == 200 assert resp.json() == [] def test_returns_connection_list(self, client): conn = [{"id": "conn-1", "channel": "telegram", "bot_username": "mybot", "is_active": True, "created_at": "2024-01-01T00:00:00"}] with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb(connection=conn) resp = client.get("/api/v1/channels?chatbot_id=cb-1", headers=AUTH) assert resp.status_code == 200 assert len(resp.json()) == 1 assert resp.json()[0]["channel"] == "telegram" def test_chatbot_not_found_returns_404(self, client): with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb(chatbot=False) resp = client.get("/api/v1/channels?chatbot_id=bad-id", headers=AUTH) assert resp.status_code == 404 class TestConnectTelegram: def test_free_plan_blocked(self, client): with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.get_bot_info", new_callable=AsyncMock) as mock_bot: mock_bot.return_value = {"username": "mybot"} mock_sb.return_value = _make_channels_sb(plan="free") resp = client.post("/api/v1/channels/telegram", json={"chatbot_id": "cb-1", "bot_token": "abc:token"}, headers=AUTH) assert resp.status_code == 402 assert "Starter" in resp.json()["detail"] def test_invalid_bot_token_returns_400(self, client): with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.get_bot_info", new_callable=AsyncMock) as mock_bot: mock_bot.return_value = None # Invalid token mock_sb.return_value = _make_channels_sb(plan="starter") resp = client.post("/api/v1/channels/telegram", json={"chatbot_id": "cb-1", "bot_token": "invalid:token"}, headers=AUTH) assert resp.status_code == 400 assert "Invalid bot token" in resp.json()["detail"] def test_successful_connection(self, client): with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.get_bot_info", new_callable=AsyncMock) as mock_bot, \ patch("app.routers.channels.set_webhook", new_callable=AsyncMock) as mock_webhook, \ patch("app.routers.channels.settings") as mock_settings: mock_bot.return_value = {"username": "mybot"} mock_webhook.return_value = True mock_settings.api_url = "https://api.example.com" mock_sb.return_value = _make_channels_sb(plan="starter") resp = client.post("/api/v1/channels/telegram", json={"chatbot_id": "cb-1", "bot_token": "valid:token"}, headers=AUTH) assert resp.status_code == 200 body = resp.json() assert body["success"] is True assert body["bot_username"] == "mybot" assert "t.me/mybot" in body["bot_link"] def test_missing_api_url_returns_500(self, client): with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.get_bot_info", new_callable=AsyncMock) as mock_bot, \ patch("app.routers.channels.settings") as mock_settings: mock_bot.return_value = {"username": "mybot"} mock_settings.api_url = None mock_sb.return_value = _make_channels_sb(plan="starter") resp = client.post("/api/v1/channels/telegram", json={"chatbot_id": "cb-1", "bot_token": "valid:token"}, headers=AUTH) assert resp.status_code == 500 class TestDisconnectChannel: def test_connection_not_found_returns_404(self, client): with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb(connection=[]) resp = client.delete("/api/v1/channels/no-such-conn", headers=AUTH) assert resp.status_code == 404 def test_disconnect_success(self, client): conn = [{"id": "conn-1", "channel": "telegram", "chatbot_id": "cb-1", "bot_token": "tok:en", "is_active": True}] with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.delete_webhook", new_callable=AsyncMock): mock_sb.return_value = _make_channels_sb(connection=conn) resp = client.delete("/api/v1/channels/conn-1", headers=AUTH) assert resp.status_code == 200 assert resp.json()["success"] is True class TestTelegramWebhook: def _post_webhook(self, client, body, bot_token="abc:token"): return client.post(f"/api/v1/webhooks/telegram/{bot_token}", json=body) def test_webhook_non_message_returns_ok(self, client): with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb() resp = self._post_webhook(client, {"update_id": 1}) assert resp.status_code == 200 assert resp.json()["ok"] is True def test_webhook_no_matching_connection_returns_ok(self, client): with patch("app.routers.channels.get_supabase") as mock_sb: mock_sb.return_value = _make_channels_sb(connection=[]) resp = self._post_webhook(client, { "message": {"chat": {"id": 12345}, "text": "Hello"} }) assert resp.status_code == 200 assert resp.json()["ok"] is True def test_webhook_start_command_sends_welcome(self, client): conn = [{"id": "conn-1", "chatbot_id": "cb-1", "channel": "telegram", "bot_token": "abc:token", "is_active": True}] with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.tg_send", new_callable=AsyncMock) as mock_send, \ patch("app.routers.channels._check_chatbot_channel_subscription", return_value=True): mock_sb.return_value = _make_channels_sb(connection=conn) resp = self._post_webhook(client, { "message": {"chat": {"id": 12345}, "text": "/start"} }) assert resp.status_code == 200 mock_send.assert_called_once() def test_webhook_processes_message_via_rag(self, client): conn = [{"id": "conn-1", "chatbot_id": "cb-1", "channel": "telegram", "bot_token": "abc:token", "is_active": True}] rag_result = {"response": "I can help!", "sources": [], "model": "gpt-4", "tokens_used": 10} with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.tg_send", new_callable=AsyncMock) as mock_send, \ patch("app.routers.channels.rag_engine") as mock_rag, \ patch("app.routers.channels._check_chatbot_channel_subscription", return_value=True): mock_rag.process_query = AsyncMock(return_value=rag_result) mock_sb.return_value = _make_channels_sb(connection=conn) resp = self._post_webhook(client, { "message": {"chat": {"id": 12345}, "text": "What are your hours?"} }) assert resp.status_code == 200 mock_send.assert_called_once() args = mock_send.call_args[0] assert args[2] == "I can help!" def test_webhook_subscription_expired_sends_unavailable(self, client): conn = [{"id": "conn-1", "chatbot_id": "cb-1", "channel": "telegram", "bot_token": "abc:token", "is_active": True}] with patch("app.routers.channels.get_supabase") as mock_sb, \ patch("app.routers.channels.tg_send", new_callable=AsyncMock) as mock_send, \ patch("app.routers.channels._check_chatbot_channel_subscription", return_value=False): mock_sb.return_value = _make_channels_sb(connection=conn) resp = self._post_webhook(client, { "message": {"chat": {"id": 12345}, "text": "Hello"} }) assert resp.status_code == 200 mock_send.assert_called_once() assert "unavailable" in mock_send.call_args[0][2].lower() class TestDetectLanguage: def test_arabic_text_detected(self): from app.routers.channels import _detect_language assert _detect_language("مرحبا كيف حالك") == "ar" def test_chinese_text_detected(self): from app.routers.channels import _detect_language assert _detect_language("你好世界这是中文") == "zh" def test_english_default(self): from app.routers.channels import _detect_language assert _detect_language("Hello how are you doing today") == "en" def test_empty_string_defaults_to_english(self): from app.routers.channels import _detect_language assert _detect_language("") == "en"