import httpx import logging from typing import Optional logger = logging.getLogger(__name__) MAX_TEXT_BYTES = 100 * 1024 # 100KB async def scrape_url(url: str) -> dict: """ Fetch a URL and extract clean text content. Returns: {title, text, url} or {error, url} """ try: from bs4 import BeautifulSoup headers = { "User-Agent": "Mozilla/5.0 (compatible; ContextaBot/1.0; +https://contexta.ai)", } async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: response = await client.get(url, headers=headers) response.raise_for_status() content_type = response.headers.get("content-type", "") if "text/html" not in content_type and "text/plain" not in content_type: return {"error": f"Unsupported content type: {content_type}", "url": url} html = response.text soup = BeautifulSoup(html, "html.parser") # Extract title title_tag = soup.find("title") title = title_tag.get_text(strip=True) if title_tag else "" # Remove unwanted tags for tag in soup.find_all(["nav", "header", "footer", "script", "style", "noscript", "aside", "advertisement"]): tag.decompose() # Extract main content (prefer article/main/body) main = soup.find("main") or soup.find("article") or soup.find("body") or soup text = main.get_text(separator="\n", strip=True) # Clean up whitespace lines = [line.strip() for line in text.splitlines() if line.strip()] text = "\n".join(lines) # Limit size if len(text.encode("utf-8")) > MAX_TEXT_BYTES: text = text[:MAX_TEXT_BYTES].rsplit("\n", 1)[0] if not text: return {"error": "No text content found on page", "url": url} logger.info(f"Scraped {url}: {len(text)} chars, title='{title}'") return {"title": title, "text": text, "url": url} except httpx.TimeoutException: return {"error": "Request timed out", "url": url} except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}", "url": url} except Exception as e: logger.error(f"Scrape error for {url}: {e}") return {"error": str(e)[:200], "url": url}