import httpx import logging logger = logging.getLogger(__name__) MAX_TEXT_BYTES = 100 * 1024 # 100KB async def scrape_url(url: str) -> dict: """ Fetch a URL and extract clean text content. Uses Playwright (headless Chromium) to handle JS-rendered pages, falls back to direct httpx for simple static pages. Returns: {title, text, url} or {error, url} """ result = await _scrape_via_playwright(url) if "error" not in result: return result logger.warning(f"Playwright scrape failed for {url}: {result['error']} — falling back to direct fetch") return await _scrape_direct(url) async def _scrape_via_playwright(url: str) -> dict: """Headless Chromium scrape — handles JS-rendered SPAs.""" try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() await page.goto(url, wait_until="networkidle", timeout=30000) title = await page.title() text = await page.evaluate("""() => { document.querySelectorAll('nav, header, footer, script, style, noscript, aside').forEach(e => e.remove()); return document.body ? document.body.innerText.trim() : ''; }""") await browser.close() text = _clean_text(text) if not text: return {"error": "No text content found on page", "url": url} logger.info(f"Scraped {url} via Playwright: {len(text)} chars, title='{title}'") return {"title": title, "text": text, "url": url} except Exception as e: return {"error": str(e)[:200], "url": url} async def _scrape_direct(url: str) -> dict: """Direct httpx scrape — works for server-rendered pages.""" try: from bs4 import BeautifulSoup headers = {"User-Agent": "Mozilla/5.0 (compatible; ContextaBot/1.0; +https://contexta.ai)"} async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: response = await client.get(url, headers=headers) response.raise_for_status() content_type = response.headers.get("content-type", "") if "text/html" not in content_type and "text/plain" not in content_type: return {"error": f"Unsupported content type: {content_type}", "url": url} soup = BeautifulSoup(response.text, "html.parser") title_tag = soup.find("title") title = title_tag.get_text(strip=True) if title_tag else "" for tag in soup.find_all(["nav", "header", "footer", "script", "style", "noscript", "aside"]): tag.decompose() main = soup.find("main") or soup.find("article") or soup.find("body") or soup text = _clean_text(main.get_text(separator="\n", strip=True)) if not text: return {"error": "No text content found on page", "url": url} logger.info(f"Scraped {url} directly: {len(text)} chars, title='{title}'") return {"title": title, "text": text, "url": url} except httpx.TimeoutException: return {"error": "Request timed out", "url": url} except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}", "url": url} except Exception as e: logger.error(f"Scrape error for {url}: {e}") return {"error": str(e)[:200], "url": url} def _clean_text(text: str) -> str: seen: set[str] = set() lines = [] for line in text.splitlines(): line = line.strip() if not line or len(line) < 5 or line in seen: continue seen.add(line) lines.append(line) text = "\n".join(lines) if len(text.encode("utf-8")) > MAX_TEXT_BYTES: text = text[:MAX_TEXT_BYTES].rsplit("\n", 1)[0] return text