mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-13 10:14:58 +00:00
58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
import httpx
|
|
import logging
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BASE = "https://api.telegram.org/bot{token}"
|
|
|
|
|
|
async def get_bot_info(bot_token: str) -> Optional[dict]:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
r = await client.get(f"https://api.telegram.org/bot{bot_token}/getMe")
|
|
data = r.json()
|
|
if data.get("ok"):
|
|
return data["result"]
|
|
except Exception as e:
|
|
logger.error(f"Telegram getMe error: {e}")
|
|
return None
|
|
|
|
|
|
async def set_webhook(bot_token: str, webhook_url: str) -> bool:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
r = await client.post(
|
|
f"https://api.telegram.org/bot{bot_token}/setWebhook",
|
|
json={"url": webhook_url, "allowed_updates": ["message"]},
|
|
)
|
|
return r.json().get("ok", False)
|
|
except Exception as e:
|
|
logger.error(f"Telegram setWebhook error: {e}")
|
|
return False
|
|
|
|
|
|
async def delete_webhook(bot_token: str) -> bool:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
r = await client.post(
|
|
f"https://api.telegram.org/bot{bot_token}/deleteWebhook"
|
|
)
|
|
return r.json().get("ok", False)
|
|
except Exception as e:
|
|
logger.error(f"Telegram deleteWebhook error: {e}")
|
|
return False
|
|
|
|
|
|
async def send_message(bot_token: str, chat_id, text: str) -> bool:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
r = await client.post(
|
|
f"https://api.telegram.org/bot{bot_token}/sendMessage",
|
|
json={"chat_id": chat_id, "text": text},
|
|
)
|
|
return r.json().get("ok", False)
|
|
except Exception as e:
|
|
logger.error(f"Telegram sendMessage error: {e}")
|
|
return False
|