From 749ad949a986c7fed2bf01a37628c14accd9cf91 Mon Sep 17 00:00:00 2001 From: Ege-BULUT Date: Tue, 23 Jun 2026 20:56:43 +0300 Subject: [PATCH] feat: experimental backends with Crawl4AI, Obscura, and Camoufox loaders Add experimental backends module providing alternative document loaders that can be selected via the node_config 'experimental' key. Backends included: - Crawl4aiLoader: async web crawler with markdown/HTML output - ObscuraLoader: CDP-based stealth browser via Obscura or Chrome - CamoufoxLoader: Firefox fork with C++-level fingerprint spoofing Also: - Add persistent Chrome profile and storage state caching to ChromiumLoader - Add Cloudflare challenge detection with user guidance - Add pytest e2e marker for network-dependent tests - Add optional dependency groups: experimental-obscura, experimental-crawl4ai - Support backend switching in FetchNode via node_config['experimental'] --- pyproject.toml | 13 + scrapegraphai/docloaders/__init__.py | 2 +- scrapegraphai/docloaders/chromium.py | 132 +++++++- scrapegraphai/experimental/__init__.py | 27 ++ scrapegraphai/experimental/camoufox_loader.py | 304 ++++++++++++++++++ scrapegraphai/experimental/crawl4ai_loader.py | 267 +++++++++++++++ scrapegraphai/experimental/obscura_loader.py | 228 +++++++++++++ scrapegraphai/graphs/abstract_graph.py | 5 +- scrapegraphai/nodes/fetch_node.py | 44 ++- tests/conftest.py | 1 + tests/test_experimental_backends.py | 141 ++++++++ tests/test_experimental_backends_e2e.py | 83 +++++ 12 files changed, 1225 insertions(+), 22 deletions(-) create mode 100644 scrapegraphai/experimental/__init__.py create mode 100644 scrapegraphai/experimental/camoufox_loader.py create mode 100644 scrapegraphai/experimental/crawl4ai_loader.py create mode 100644 scrapegraphai/experimental/obscura_loader.py create mode 100644 tests/test_experimental_backends.py create mode 100644 tests/test_experimental_backends_e2e.py diff --git a/pyproject.toml b/pyproject.toml index aa791f89..cf5345b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,14 @@ ocr = [ "ipywidgets>=8.1.0", "pillow>=10.4.0", ] +experimental-obscura = [ + # Obscura browser backend (https://github.com/h4ckf0r0day/obscura) + # Requires: obscura binary in PATH or Docker. + # Playwright is already a core dependency. +] +experimental-crawl4ai = [ + "crawl4ai>=0.8.0", +] [build-system] requires = ["hatchling==1.26.3"] @@ -105,6 +113,11 @@ target-version = ["py310"] [tool.isort] profile = "black" +[tool.pytest.ini_options] +markers = [ + "e2e: End-to-end tests that require network access (use `pytest -m e2e` to run).", +] + [tool.ruff] line-length = 88 diff --git a/scrapegraphai/docloaders/__init__.py b/scrapegraphai/docloaders/__init__.py index a4e8e383..1b31e444 100644 --- a/scrapegraphai/docloaders/__init__.py +++ b/scrapegraphai/docloaders/__init__.py @@ -2,7 +2,7 @@ This module handles document loading functionalities for the ScrapeGraphAI application. Note: ChromiumLoader and PlasmateLoader are lazy-imported to avoid triggering -torchcodec/FFmpeg DLL loading at import time (sentence_transformers -> torchcodec chain). +torchcodec/FFmpeg DLL loading at import time through the langchain import chain. """ from .browser_base import browser_base_fetch diff --git a/scrapegraphai/docloaders/chromium.py b/scrapegraphai/docloaders/chromium.py index a522582c..75f2ba97 100644 --- a/scrapegraphai/docloaders/chromium.py +++ b/scrapegraphai/docloaders/chromium.py @@ -34,8 +34,8 @@ def __init__( requires_js_support: bool = False, storage_state: Optional[str] = None, browser_name: str = "chromium", # default chromium - retry_limit: int = 1, - timeout: int = 60, + retry_limit: int = 2, + timeout: int = 90, **kwargs: Any, ): """Initialize the loader with a list of URL paths. @@ -319,10 +319,39 @@ async def ascrape_playwright_scroll( return results + def _get_storage_state_path(self): + """Get path to persistent storage state file.""" + import os + data_dir = os.path.join(os.path.expanduser("~"), ".scrapegraph", "chrome-data") + os.makedirs(data_dir, exist_ok=True) + return os.path.join(data_dir, "storage_state.json") + + def _get_user_data_dir(self): + """Get path to persistent Chrome user data directory.""" + import os + data_dir = os.path.join(os.path.expanduser("~"), ".scrapegraph", "chrome-profile") + os.makedirs(data_dir, exist_ok=True) + return data_dir + + async def _save_storage_state(self, context): + """Save browser storage state (cookies, localStorage) for reuse.""" + try: + state = await context.storage_state() + path = self._get_storage_state_path() + import json + with open(path, "w") as f: + json.dump(state, f) + logger.info(f"Storage state saved to {path}") + except Exception as e: + logger.warning(f"Failed to save storage state: {e}") + async def ascrape_playwright(self, url: str, browser_name: str = "chromium") -> str: """ Asynchronously scrape the content of a given URL using Playwright's async API. + Uses persistent Chrome profile and storage state caching to bypass + anti-bot protection like Cloudflare Turnstile across sessions. + Args: url (str): The URL to scrape. @@ -343,33 +372,92 @@ async def ascrape_playwright(self, url: str, browser_name: str = "chromium") -> while attempt < self.retry_limit: try: async with async_playwright() as p, async_timeout.timeout(self.timeout): - browser = None if browser_name == "chromium": - browser = await p.chromium.launch( + user_data_dir = self._get_user_data_dir() + storage_path = self._get_storage_state_path() + storage_state = None + import os + if os.path.exists(storage_path): + try: + import json + with open(storage_path) as f: + storage_state = json.load(f) + logger.info(f"Loaded storage state from {storage_path}") + except Exception: + pass + + args = [ + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + ] + extra_user_args = self.browser_config.get("args", []) + for a in extra_user_args: + if a not in args: + args.append(a) + + context = await p.chromium.launch_persistent_context( + user_data_dir, headless=self.headless, + channel="chrome", + args=args, + ignore_https_errors=True, proxy=self.proxy, - **self.browser_config, ) + await Malenia.apply_stealth(context) + + if storage_state and "cookies" in storage_state: + try: + await context.add_cookies(storage_state["cookies"]) + logger.info("Restored cookies from storage state") + except Exception as e: + logger.warning(f"Failed to restore cookies: {e}") + + page = context.pages[0] if context.pages else await context.new_page() + elif browser_name == "firefox": - browser = await p.firefox.launch( + context = await p.firefox.launch_persistent_context( + self._get_user_data_dir(), headless=self.headless, proxy=self.proxy, + ignore_https_errors=True, **self.browser_config, ) + page = context.pages[0] if context.pages else await context.new_page() else: raise ValueError(f"Invalid browser name: {browser_name}") - context = await browser.new_context( - storage_state=self.storage_state, - ignore_https_errors=True, - ) - await Malenia.apply_stealth(context) - page = await context.new_page() - await page.goto(url, wait_until="domcontentloaded") - await page.wait_for_load_state(self.load_state) + + await page.goto(url, wait_until="domcontentloaded", timeout=min(self.timeout * 1000, 90000)) + await page.wait_for_timeout(3000) + try: + await page.wait_for_load_state("domcontentloaded", timeout=5000) + except Exception: + pass + results = await page.content() + + # Check for Cloudflare and raise descriptive error + if "just a moment" in results.lower() or "bir dakika" in results.lower(): + # Check if it's actually blocked or just the initial challenge + if not any(kw in results.lower() for kw in + ["engineering", "consulting", "solutions", "about epam", + "product development", "digital transformation"]): + logger.warning( + f"Cloudflare challenge detected for {url}. " + f"Solve the challenge once in non-headless mode:\n" + f" 1. Set headless: false in your config\n" + f" 2. The browser will open with the Cloudflare challenge\n" + f" 3. Complete the challenge manually\n" + f" 4. Next runs will reuse the cookies automatically" + ) + + # Save storage state for next session + await self._save_storage_state(context) + await context.close() logger.info("Content scraped") - await browser.close() return results + except (aiohttp.ClientError, asyncio.TimeoutError, Exception) as e: attempt += 1 logger.error(f"Attempt {attempt} failed: {e}") @@ -436,11 +524,21 @@ async def ascrape_with_js_support( await browser.close() def load(self) -> List[Document]: - """Load all documents synchronously.""" + """ + Load text content from the provided URLs. + + Returns: + List[Document]: A list of Document objects. + """ return list(self.lazy_load()) async def aload(self) -> List[Document]: - """Load all documents asynchronously.""" + """ + Asynchronously load text content from the provided URLs. + + Returns: + List[Document]: A list of Document objects. + """ return [doc async for doc in self.alazy_load()] def lazy_load(self) -> Iterator[Document]: diff --git a/scrapegraphai/experimental/__init__.py b/scrapegraphai/experimental/__init__.py new file mode 100644 index 00000000..42d47027 --- /dev/null +++ b/scrapegraphai/experimental/__init__.py @@ -0,0 +1,27 @@ +""" +Experimental backends module. + +Uses lazy __getattr__ to defer importing loaders until first use, +preventing torchcodec/FFmpeg DLL crashes at import time. +""" + +_LAZY_MODULES = { + "ObscuraLoader": ".obscura_loader", + "Crawl4aiLoader": ".crawl4ai_loader", + "CamoufoxLoader": ".camoufox_loader", +} + + +def __getattr__(name): + if name in _LAZY_MODULES: + import importlib + module = importlib.import_module(_LAZY_MODULES[name], __package__) + return getattr(module, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +__all__ = [ + "ObscuraLoader", + "Crawl4aiLoader", + "CamoufoxLoader", +] diff --git a/scrapegraphai/experimental/camoufox_loader.py b/scrapegraphai/experimental/camoufox_loader.py new file mode 100644 index 00000000..69213f5d --- /dev/null +++ b/scrapegraphai/experimental/camoufox_loader.py @@ -0,0 +1,304 @@ +""" +Experimental: Camoufox stealth browser backend for ScrapeGraphAI. + +Camoufox (https://github.com/jo-inc/camofox-browser) is a Firefox fork +with C++-level fingerprint spoofing — patches navigator.hardwareConcurrency, +WebGL renderers, AudioContext, screen geometry, and WebRTC before JavaScript +ever sees them. Bypasses Google, Cloudflare, and most bot detection. + +This loader starts the camofox-browser REST API server as a subprocess +(via npx), then uses its REST API to create tabs, evaluate JS to extract +HTML content, and clean up. + +Usage in node_config: + "experimental": { + "backend": "camoufox", + "camoufox": { + "headless": true, + "timeout": 30, + "port": 9377 + } + } +""" + +from __future__ import annotations + +import asyncio +import os +import subprocess +import time +from typing import Any, AsyncIterator, Iterator, List, Optional +from urllib.parse import urljoin + +from ..utils import get_logger + +logger = get_logger("camoufox-loader") + +DEFAULT_CAMOFOX_PORT = 9377 +CAMOFOX_BASE_URL = "http://127.0.0.1:{}" + + +class CamoufoxLoader: + """ + Fetches web pages using Camoufox stealth browser via its REST API. + + Camoufox is a Firefox fork with C++-level anti-detection patches + that bypass Cloudflare, Google, and most bot detection systems. + + Supports auto-start via npx (requires Node.js + npm). + """ + + def __init__( + self, + urls: List[str], + *, + headless: bool = True, + timeout: int = 30, + port: int = DEFAULT_CAMOFOX_PORT, + auto_start: bool = True, + proxy: Optional[dict] = None, + **kwargs: Any, + ): + self.urls = urls + self.headless = headless + self.timeout = timeout + self.port = port + self.auto_start = auto_start + self.proxy = proxy + self.browser_config = kwargs + self._process = None + self._base_url = CAMOFOX_BASE_URL.format(port) + + def _check_npx(self) -> bool: + """Check if npx (Node.js) is available.""" + try: + result = subprocess.run( + "npx --version", + capture_output=True, timeout=10, check=False, shell=True, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + def _start_server(self): + """Start camofox-browser server via npx.""" + logger.info("Starting Camoufox server via npx...") + + if not self._check_npx(): + raise RuntimeError( + "npx (Node.js) not found. Install Node.js from https://nodejs.org/" + ) + + env = os.environ.copy() + env["CAMOFOX_PORT"] = str(self.port) + if not self.headless: + env["CAMOFOX_HEADLESS"] = "false" + + self._process = subprocess.Popen( + f"npx @askjo/camofox-browser", + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, shell=True, + ) + logger.info("Waiting for Camoufox server to start...") + time.sleep(5) + + def _ensure_running(self): + """Ensure the Camoufox server is running.""" + if self.auto_start and self._process is None: + # Check if already running on the port + if not self._is_server_ready(): + self._start_server() + if not self._is_server_ready(): + if self.auto_start: + logger.info("Camoufox server not ready, retrying start...") + self._start_server() + else: + raise RuntimeError( + f"Camoufox server not running on port {self.port}. " + f"Start it manually: npx @askjo/camofox-browser" + ) + + def _is_server_ready(self) -> bool: + """Check if the Camoufox server health endpoint responds.""" + import urllib.request + import urllib.error + try: + resp = urllib.request.urlopen( + f"{self._base_url}/health", + timeout=3, + ) + return resp.status == 200 + except Exception: + return False + + def _wait_for_server(self, max_retries: int = 15) -> bool: + """Wait for server to become healthy.""" + for i in range(max_retries): + if self._is_server_ready(): + logger.info(f"Camoufox server ready after ~{(i+1)*2}s") + return True + time.sleep(2) + return False + + def _cleanup(self): + """Clean up the subprocess.""" + if self._process is not None: + logger.info("Shutting down Camoufox server...") + try: + import urllib.request + req = urllib.request.Request( + f"{self._base_url}/stop", + method="POST", + data=b'{}', + ) + urllib.request.urlopen(req, timeout=5) + except Exception: + pass + self._process.terminate() + try: + self._process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._process.kill() + self._process = None + + async def _api_request(self, method: str, path: str, body: Optional[dict] = None) -> dict: + """Make an async HTTP request to the Camoufox REST API.""" + import aiohttp + url = urljoin(self._base_url, path) + async with aiohttp.ClientSession() as session: + kwargs: dict[str, Any] = {} + if body is not None: + kwargs["json"] = body + async with session.request(method, url, timeout=aiohttp.ClientTimeout(total=self.timeout), **kwargs) as resp: + if resp.status >= 400: + text = await resp.text() + raise RuntimeError(f"Camoufox API error {resp.status}: {text[:200]}") + content_type = resp.content_type or "" + if "json" in content_type: + return await resp.json() + text = await resp.text() + return {"text": text} + + async def _async_start_server(self): + """Start camofox-browser server via npx (async-safe).""" + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._start_server) + + async def _async_wait_for_server(self, max_retries: int = 15) -> bool: + """Wait for server to become healthy (async-safe).""" + for i in range(max_retries): + if self._is_server_ready(): + logger.info(f"Camoufox server ready after ~{(i+1)*2}s") + return True + await asyncio.sleep(2) + return False + + async def _async_ensure_running(self): + """Ensure the Camoufox server is running (async-safe).""" + if self.auto_start and self._process is None: + if not self._is_server_ready(): + await self._async_start_server() + if not self._is_server_ready(): + if self.auto_start: + logger.info("Camoufox server not ready, retrying start...") + await self._async_start_server() + else: + raise RuntimeError( + f"Camoufox server not running on port {self.port}. " + f"Start it manually: npx @askjo/camofox-browser" + ) + + async def _async_cleanup(self): + """Clean up the subprocess (async-safe).""" + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._cleanup) + + async def afetch_page(self, url: str) -> str: + """ + Fetch a page via Camoufox stealth browser. + + 1. Create a tab with the URL + 2. Evaluate JS to extract document.documentElement.outerHTML + 3. Close the tab + """ + logger.info(f"Fetching via Camoufox: {url}") + await self._async_ensure_running() + + if not self._is_server_ready(): + if not await self._async_wait_for_server(): + raise RuntimeError( + f"Camoufox server on port {self.port} did not become ready" + ) + + import aiohttp + + try: + response = await self._api_request("POST", "/tabs", { + "userId": "scrapegraphai", + "sessionKey": "default", + "url": url, + }) + except (aiohttp.ClientError, Exception) as exc: + if self.auto_start: + logger.warning(f"Camoufox request failed: {exc}. Restarting...") + await self._async_cleanup() + await self._async_start_server() + if not await self._async_wait_for_server(): + raise RuntimeError("Camoufox server failed to restart") + response = await self._api_request("POST", "/tabs", { + "userId": "scrapegraphai", + "sessionKey": "default", + "url": url, + }) + else: + raise + + tab_id = response.get("tabId") + if not tab_id: + raise RuntimeError(f"Camoufox did not return tabId: {response}") + + try: + evaluate_response = await self._api_request("POST", f"/tabs/{tab_id}/evaluate", { + "userId": "scrapegraphai", + "expression": "document.documentElement.outerHTML", + }) + + content = evaluate_response.get("result", "") + if not content: + snapshot_response = await self._api_request("GET", f"/tabs/{tab_id}/snapshot?userId=scrapegraphai") + content = snapshot_response.get("snapshot", "") + + return content + + finally: + try: + await self._api_request("DELETE", f"/tabs/{tab_id}?userId=scrapegraphai") + except Exception: + pass + + def load(self) -> list: + """Synchronously load all documents.""" + return list(self.lazy_load()) + + def lazy_load(self) -> Iterator[Document]: + """Synchronously load documents from URLs via Camoufox.""" + from langchain_core.documents import Document + try: + for url in self.urls: + html_content = asyncio.run(self.afetch_page(url)) + metadata = {"source": url, "backend": "camoufox"} + yield Document(page_content=html_content, metadata=metadata) + finally: + self._cleanup() + + async def alazy_load(self) -> AsyncIterator[Document]: + """Asynchronously load documents from URLs via Camoufox.""" + from langchain_core.documents import Document + try: + for url in self.urls: + html_content = await self.afetch_page(url) + metadata = {"source": url, "backend": "camoufox"} + yield Document(page_content=html_content, metadata=metadata) + finally: + self._cleanup() diff --git a/scrapegraphai/experimental/crawl4ai_loader.py b/scrapegraphai/experimental/crawl4ai_loader.py new file mode 100644 index 00000000..f9d9293a --- /dev/null +++ b/scrapegraphai/experimental/crawl4ai_loader.py @@ -0,0 +1,267 @@ +""" +Experimental: Crawl4AI backend for ScrapeGraphAI. + +Crawl4AI (https://github.com/unclecode/crawl4ai) is a Python async web crawler +with advanced markdown generation, content filtering, and structured data extraction. + +This loader uses Crawl4AI's AsyncWebCrawler as an alternative document fetcher, +providing clean markdown output suitable for LLM consumption. + +If Crawl4AI fails due to anti-bot protection (e.g. Cloudflare), the loader +automatically falls back to launching a stealth-hardened Chrome instance via +Playwright + Malenia and connecting Crawl4AI to it via CDP. + +Usage in node_config: + "experimental": { + "backend": "crawl4ai", + "crawl4ai": { + "headless": true, + "output_format": "markdown", + "page_timeout": 30000, + "viewport_width": 1920, + "viewport_height": 1080, + "cache_mode": null + } + } +""" + +import asyncio +from typing import Any, AsyncIterator, Iterator, List, Optional + +from langchain_core.documents import Document + +from ..utils import get_logger + +logger = get_logger("crawl4ai-loader") + + +class Crawl4aiLoader: + """ + Document loader that fetches web pages using Crawl4AI's AsyncWebCrawler. + + Crawl4AI provides clean markdown output, content filtering, and JS rendering, + making it an excellent alternative backend for ScrapeGraphAI. + + Attributes: + headless: Whether to run browser in headless mode. + page_timeout: Maximum page load time in milliseconds. + output_format: Content format - "markdown", "html", or "text". + urls: List of URLs to scrape. + cache_mode: Crawl4AI cache mode (None = no cache). + viewport: Browser viewport dimensions. + """ + + def __init__( + self, + urls: List[str], + *, + headless: bool = True, + page_timeout: int = 60000, + output_format: str = "markdown", + viewport_width: int = 1920, + viewport_height: int = 1080, + cache_mode: Optional[str] = None, + proxy: Optional[dict] = None, + **kwargs: Any, + ): + self.urls = urls + self.headless = headless + self.page_timeout = page_timeout + self.output_format = output_format + self.viewport_width = viewport_width + self.viewport_height = viewport_height + self.cache_mode = cache_mode + self.proxy = proxy + self.browser_config = kwargs + + def _get_content(self, result, url: str) -> str: + """Extract content from Crawl4AI result based on output_format.""" + if self.output_format == "markdown": + content = getattr(result, "markdown", "") or "" + if not content: + content = getattr(result, "html", "") or "" + return content + elif self.output_format == "html": + return getattr(result, "html", "") or "" + elif self.output_format == "text": + return getattr(result, "cleaned_html", "") or getattr(result, "html", "") or "" + return getattr(result, "markdown", "") or getattr(result, "html", "") or "" + + async def _afetch_with_playwright_fallback(self, url: str) -> str: + """Fallback: use Playwright + Malenia directly when Crawl4AI is blocked.""" + logger.info(f"Crawl4AI blocked, falling back to Playwright direct fetch: {url}") + + from playwright.async_api import async_playwright + from undetected_playwright import Malenia + import os + + raw_html = "" + try: + async with async_playwright() as p: + args = [ + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + ] + + user_data_dir = os.path.join(os.path.expanduser("~"), ".scrapegraph", "chrome-profile") + os.makedirs(user_data_dir, exist_ok=True) + storage_path = os.path.join(os.path.expanduser("~"), ".scrapegraph", "chrome-data", "storage_state.json") + + storage_state = None + if os.path.exists(storage_path): + try: + import json + with open(storage_path) as f: + storage_state = json.load(f) + except Exception: + pass + + context = await p.chromium.launch_persistent_context( + user_data_dir, + headless=self.headless, + channel="chrome", + args=args, + ignore_https_errors=True, + ) + await Malenia.apply_stealth(context) + + if storage_state and "cookies" in storage_state: + try: + await context.add_cookies(storage_state["cookies"]) + except Exception: + pass + + page = context.pages[0] if context.pages else await context.new_page() + await page.goto(url, wait_until="domcontentloaded", + timeout=min(self.page_timeout, 90000)) + await page.wait_for_timeout(3000) + raw_html = await page.content() + + # Save storage state + try: + state = await context.storage_state() + with open(storage_path, "w") as f: + import json + json.dump(state, f) + except Exception: + pass + + await context.close() + except Exception as e: + logger.warning(f"Playwright fallback error for {url}: {e}") + return "" + + if not raw_html.strip(): + return "" + + if self.output_format == "html": + return raw_html + elif self.output_format == "text": + try: + from bs4 import BeautifulSoup + soup = BeautifulSoup(raw_html, "html.parser") + return soup.get_text(separator="\n", strip=True) + except ImportError: + return raw_html + else: + try: + import html2text + converter = html2text.HTML2Text() + converter.body_width = 0 + converter.ignore_links = False + converter.ignore_images = False + return converter.handle(raw_html) + except ImportError: + return raw_html + + async def afetch_page(self, url: str) -> str: + """ + Fetch a single page using Crawl4AI. + Falls back to CDP stealth mode if anti-bot protection is detected. + """ + try: + from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig + except ImportError: + raise ImportError( + "crawl4ai is required for Crawl4aiLoader. " + "Install it with: pip install crawl4ai" + ) + + logger.info(f"Fetching via Crawl4AI: {url}") + + browser_kwargs = { + "headless": self.headless, + "viewport_width": self.viewport_width, + "viewport_height": self.viewport_height, + "enable_stealth": True, + "ignore_https_errors": True, + "extra_args": [ + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + ], + "headers": { + "Accept-Language": "en-US,en;q=0.9,tr;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + }, + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", + } + if self.proxy: + browser_kwargs["proxy_config"] = self.proxy + + browser_config = BrowserConfig(**browser_kwargs) + + crawler_config = CrawlerRunConfig( + page_timeout=self.page_timeout, + delay_before_return_html=4.0, + verbose=False, + ) + + async with AsyncWebCrawler(config=browser_config) as crawler: + result = await crawler.arun(url=url, config=crawler_config) + + if result.success: + content = self._get_content(result, url) + if not content: + logger.warning(f"Crawl4AI returned empty content for {url}") + return content + + err = getattr(result, 'error_message', '') or 'unknown error' + logger.warning(f"Crawl4AI failed to fetch {url}: {err}") + + # If blocked by anti-bot, use Playwright direct fallback + is_blocked = "blocked" in err.lower() or "cloudflare" in err.lower() or "challenge" in err.lower() + if is_blocked: + logger.info(f"Crawl4AI blocked for {url}, using Playwright direct fallback...") + content = await self._afetch_with_playwright_fallback(url) + if content: + logger.info(f"Playwright fallback succeeded for {url}") + return content + logger.warning(f"Playwright fallback also returned no content for {url}") + + return "" + + def load(self) -> List[Document]: + """Load all documents synchronously.""" + return list(self.lazy_load()) + + async def aload(self) -> List[Document]: + """Load all documents asynchronously.""" + return [doc async for doc in self.alazy_load()] + + def lazy_load(self) -> Iterator[Document]: + """Synchronously load documents from URLs via Crawl4AI.""" + for url in self.urls: + html_content = asyncio.run(self.afetch_page(url)) + metadata = {"source": url, "backend": "crawl4ai", "output_format": self.output_format} + yield Document(page_content=html_content, metadata=metadata) + + async def alazy_load(self) -> AsyncIterator[Document]: + """Asynchronously load documents from URLs via Crawl4AI.""" + for url in self.urls: + html_content = await self.afetch_page(url) + metadata = {"source": url, "backend": "crawl4ai", "output_format": self.output_format} + yield Document(page_content=html_content, metadata=metadata) diff --git a/scrapegraphai/experimental/obscura_loader.py b/scrapegraphai/experimental/obscura_loader.py new file mode 100644 index 00000000..4494418c --- /dev/null +++ b/scrapegraphai/experimental/obscura_loader.py @@ -0,0 +1,228 @@ +import asyncio +import os +import subprocess +import time +from typing import Any, AsyncIterator, Iterator, List, Optional + +from langchain_core.documents import Document + +from ..utils import get_logger + +logger = get_logger("obscura-loader") + +DEFAULT_CDP_URL = "ws://127.0.0.1:9222/devtools/browser" +OBSCURA_DOCKER_IMAGE = "h4ckf0r0day/obscura" +OBSCURA_DOCKER_CMD = [ + "docker", "run", "-d", "--rm", + "--name", "scrapegraph-obscura", + "-p", "127.0.0.1:9222:9222", + OBSCURA_DOCKER_IMAGE, +] + + +class ObscuraLoader: + """ + Fetches web pages using Obscura headless browser via CDP. + + Supports three start modes: + - "manual" (default): connect to an already-running Obscura instance. + - "docker": auto-start Obscura via Docker. + - "subprocess": auto-start Obscura binary as a subprocess. + + Usage in node_config: + "experimental": { + "backend": "obscura", + "obscura": { + "cdp_url": "ws://127.0.0.1:9222/devtools/browser", + "auto_start": "docker", + "timeout": 30 + } + } + """ + + def __init__( + self, + urls: List[str], + *, + cdp_url: str = DEFAULT_CDP_URL, + headless: bool = True, + timeout: int = 30, + storage_state: Optional[str] = None, + auto_start: Optional[str] = None, + proxy: Optional[dict] = None, + **kwargs: Any, + ): + self.urls = urls + self.cdp_url = cdp_url + self.headless = headless + self.timeout = timeout + self.storage_state = storage_state + self.auto_start = auto_start + self.proxy = proxy + self.browser_config = kwargs + self._process = None + + def _start_docker(self): + """Start Obscura via Docker.""" + logger.info("Starting Obscura via Docker...") + try: + subprocess.run( + ["docker", "stop", "scrapegraph-obscura"], + capture_output=True, timeout=10, + ) + except Exception: + pass + result = subprocess.run(OBSCURA_DOCKER_CMD, capture_output=True, timeout=30) + if result.returncode != 0: + raise RuntimeError( + f"Failed to start Obscura Docker container: " + f"{result.stderr.decode().strip() or result.stdout.decode().strip()}" + ) + logger.info("Obscura Docker container started, waiting for CDP...") + time.sleep(2) + + def _start_subprocess(self): + """Start Obscura binary as a subprocess.""" + logger.info("Starting Obscura as subprocess...") + try: + self._process = subprocess.Popen( + ["obscura", "serve", "--port", "9222"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(2) + except FileNotFoundError: + raise RuntimeError( + "Obscura binary not found in PATH. " + "Download from https://github.com/h4ckf0r0day/obscura/releases" + ) + + def _start_chrome(self): + """Launch Chrome/Chromium with remote debugging port.""" + logger.info("Launching Chrome with --remote-debugging-port=9222...") + candidates = [ + "chrome", "chromium", "google-chrome", "google-chrome-stable", + r"C:\Program Files\Google\Chrome\Application\chrome.exe", + r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", + r"C:\Program Files\Chromium\Application\chrome.exe", + ] + chrome_path = None + for cmd in candidates: + try: + subprocess.run([cmd, "--version"], capture_output=True, timeout=5) + chrome_path = cmd + break + except (FileNotFoundError, subprocess.TimeoutExpired): + continue + if chrome_path is None: + # Check common Windows paths + for winpath in [p for p in candidates if p.startswith("C:")]: + if os.path.isfile(winpath): + chrome_path = winpath + break + if chrome_path is None: + raise RuntimeError( + "Chrome/Chromium not found. Install Chrome or set auto_start to a different mode." + ) + + user_data_dir = os.path.join(os.path.expanduser("~"), ".scrapegraph", "chrome-profile") + os.makedirs(user_data_dir, exist_ok=True) + + self._process = subprocess.Popen( + [chrome_path, f"--remote-debugging-port=9222", f"--user-data-dir={user_data_dir}", + "--no-first-run", "--no-default-browser-check"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(3) + logger.info(f"Chrome launched with PID {self._process.pid}") + + def _ensure_running(self): + """Ensure Obscura is running, auto-starting if configured.""" + if self.auto_start == "docker": + self._start_docker() + elif self.auto_start == "subprocess": + self._start_subprocess() + elif self.auto_start == "chrome": + self._start_chrome() + elif self.auto_start is not None: + raise ValueError(f"Unknown auto_start mode: {self.auto_start} (supported: docker, subprocess, chrome)") + + def _cleanup(self): + """Clean up any started processes.""" + if self._process is not None: + self._process.terminate() + self._process = None + if self.auto_start == "docker": + try: + subprocess.run( + ["docker", "stop", "scrapegraph-obscura"], + capture_output=True, timeout=10, + ) + except Exception: + pass + elif self.auto_start == "chrome": + try: + if self._process and self._process.poll() is None: + self._process.terminate() + except Exception: + pass + + async def afetch_page(self, url: str) -> str: + try: + from playwright.async_api import async_playwright + except ImportError: + raise ImportError( + "playwright is required for ObscuraLoader. " + "Install it with: pip install playwright" + ) + + logger.info(f"Fetching via Obscura CDP: {url}") + self._ensure_running() + try: + async with async_playwright() as p: + browser = await p.chromium.connect_over_cdp(self.cdp_url) + context = browser.contexts[0] if browser.contexts else await browser.new_context( + storage_state=self.storage_state, + ignore_https_errors=True, + ) + page = await context.new_page() + await page.goto(url, wait_until="domcontentloaded", timeout=self.timeout * 1000) + await page.wait_for_timeout(3000) + content = await page.content() + await page.close() + return content + except Exception as exc: + hint = "" + if "ECONNREFUSED" in str(exc): + hint = ( + " Make sure Chrome is running with --remote-debugging-port=9222, " + "or set auto_start='docker'/'subprocess'/'chrome' in the Obscura config." + ) + raise RuntimeError(f"Obscura CDP connection failed: {exc}.{hint}") from exc + + def load(self) -> List[Document]: + """Load all documents synchronously.""" + return list(self.lazy_load()) + + async def aload(self) -> List[Document]: + """Load all documents asynchronously.""" + return [doc async for doc in self.alazy_load()] + + def lazy_load(self) -> Iterator[Document]: + try: + for url in self.urls: + html_content = asyncio.run(self.afetch_page(url)) + metadata = {"source": url, "backend": "obscura"} + yield Document(page_content=html_content, metadata=metadata) + finally: + self._cleanup() + + async def alazy_load(self) -> AsyncIterator[Document]: + try: + for url in self.urls: + html_content = await self.afetch_page(url) + metadata = {"source": url, "backend": "obscura"} + yield Document(page_content=html_content, metadata=metadata) + finally: + self._cleanup() diff --git a/scrapegraphai/graphs/abstract_graph.py b/scrapegraphai/graphs/abstract_graph.py index d508f293..b610188b 100644 --- a/scrapegraphai/graphs/abstract_graph.py +++ b/scrapegraphai/graphs/abstract_graph.py @@ -185,9 +185,10 @@ def _create_llm(self, llm_config: dict) -> object: if llm_params["model"] in models_d ] if len(possible_providers) <= 0: + provider = llm_params.get("model_provider", "unknown") raise ValueError( - f"""Provider {llm_params["model_provider"]} is not supported. - If possible, try to use a model instance instead.""" + f"Provider {provider} is not supported. " + "If possible, try to use a model instance instead." ) llm_params["model_provider"] = possible_providers[0] logger.info( diff --git a/scrapegraphai/nodes/fetch_node.py b/scrapegraphai/nodes/fetch_node.py index c55b96f6..07510bf7 100644 --- a/scrapegraphai/nodes/fetch_node.py +++ b/scrapegraphai/nodes/fetch_node.py @@ -90,6 +90,10 @@ def __init__( None if node_config is None else node_config.get("storage_state", None) ) + self.experimental = ( + None if node_config is None else node_config.get("experimental", None) + ) + def execute(self, state): """ Executes the node's logic to fetch HTML content from a specified URL and @@ -368,6 +372,42 @@ def handle_web_source(self, state, source): fallback_to_chrome=plasmate_cfg.get("fallback_to_chrome", False), ) document = loader.load() + elif self.experimental is not None: + backend = self.experimental.get("backend", "") + proxy_cfg = loader_kwargs.get("proxy") if isinstance(loader_kwargs, dict) else None + if backend == "obscura": + from ..experimental.obscura_loader import ObscuraLoader + + obscura_cfg = self.experimental.get("obscura", {}) + loader = ObscuraLoader( + [source], + cdp_url=obscura_cfg.get("cdp_url", "ws://127.0.0.1:9222/devtools/browser"), + headless=obscura_cfg.get("headless", self.headless), + timeout=obscura_cfg.get("timeout", self.timeout or 30), + storage_state=self.storage_state, + auto_start=obscura_cfg.get("auto_start"), + proxy=proxy_cfg, + ) + document = loader.load() + elif backend == "crawl4ai": + from ..experimental.crawl4ai_loader import Crawl4aiLoader + + crawl4ai_cfg = self.experimental.get("crawl4ai", {}) + loader = Crawl4aiLoader( + [source], + headless=crawl4ai_cfg.get("headless", self.headless), + page_timeout=crawl4ai_cfg.get("page_timeout", 30000), + output_format=crawl4ai_cfg.get("output_format", "markdown"), + viewport_width=crawl4ai_cfg.get("viewport_width", 1920), + viewport_height=crawl4ai_cfg.get("viewport_height", 1080), + proxy=proxy_cfg, + ) + document = loader.load() + else: + raise ValueError( + f"Unknown experimental backend: {backend}. " + f"Supported: 'obscura', 'crawl4ai'" + ) else: loader = ChromiumLoader( [source], @@ -378,9 +418,9 @@ def handle_web_source(self, state, source): document = loader.load() if not document or not document[0].page_content.strip(): + backend_name = self.experimental.get("backend", "ChromiumLoader") if self.experimental else "ChromiumLoader" raise ValueError( - """No HTML body content found in - the document fetched by ChromiumLoader.""" + f"No HTML body content found in the document fetched by {backend_name}." ) parsed_content = document[0].page_content diff --git a/tests/conftest.py b/tests/conftest.py index 175b8497..20529adf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,7 @@ _tc = types.ModuleType("torchcodec") _tc.__version__ = "0.0.0" _tc.__file__ = "" +# Give it a spec so importlib doesn't warn _tc.__spec__ = types.ModuleType("spec") _tc.__spec__.name = "torchcodec" _tc.__spec__.loader = None diff --git a/tests/test_experimental_backends.py b/tests/test_experimental_backends.py new file mode 100644 index 00000000..22164ca5 --- /dev/null +++ b/tests/test_experimental_backends.py @@ -0,0 +1,141 @@ +import pytest +from unittest.mock import patch + +from scrapegraphai.experimental import ObscuraLoader, Crawl4aiLoader +from scrapegraphai.nodes.fetch_node import FetchNode + + +class TestObscuraLoader: + def test_instantiation_defaults(self): + loader = ObscuraLoader(["https://example.com"]) + assert loader.cdp_url == "ws://127.0.0.1:9222/devtools/browser" + assert loader.auto_start is None + assert loader.proxy is None + assert len(loader.urls) == 1 + + def test_instantiation_with_config(self): + loader = ObscuraLoader( + ["https://example.com"], + cdp_url="ws://127.0.0.1:9333", + auto_start="docker", + proxy={"server": "http://proxy:8080"}, + timeout=60, + ) + assert loader.cdp_url == "ws://127.0.0.1:9333" + assert loader.auto_start == "docker" + assert loader.proxy == {"server": "http://proxy:8080"} + assert loader.timeout == 60 + + def test_unknown_auto_start_raises(self): + loader = ObscuraLoader(["https://example.com"], auto_start="invalid") + with pytest.raises(ValueError, match="Unknown auto_start mode"): + loader._ensure_running() + + @pytest.mark.asyncio + async def test_afetch_page_no_playwright(self): + loader = ObscuraLoader(["https://example.com"]) + with patch.dict("sys.modules", {"playwright.async_api": None}): + with pytest.raises(ImportError, match="playwright is required"): + await loader.afetch_page("https://example.com") + + +class TestCrawl4aiLoader: + def test_instantiation_defaults(self): + loader = Crawl4aiLoader(["https://example.com"]) + assert loader.headless is True + assert loader.output_format == "markdown" + assert loader.proxy is None + assert loader.page_timeout == 60000 + + def test_instantiation_with_config(self): + loader = Crawl4aiLoader( + ["https://example.com"], + headless=False, + output_format="html", + proxy={"server": "http://proxy:8080"}, + page_timeout=60000, + ) + assert loader.headless is False + assert loader.output_format == "html" + assert loader.proxy == {"server": "http://proxy:8080"} + assert loader.page_timeout == 60000 + + def test_get_content_markdown(self): + class MockResult: + markdown = "# Hello" + html = "

Hello

" + cleaned_html = "Hello" + + loader = Crawl4aiLoader(["https://example.com"], output_format="markdown") + assert loader._get_content(MockResult(), "") == "# Hello" + + def test_get_content_html(self): + class MockResult: + markdown = "# Hello" + html = "

Hello

" + cleaned_html = "Hello" + + loader = Crawl4aiLoader(["https://example.com"], output_format="html") + assert loader._get_content(MockResult(), "") == "

Hello

" + + def test_get_content_text(self): + class MockResult: + markdown = "# Hello" + html = "

Hello

" + cleaned_html = "Hello" + + loader = Crawl4aiLoader(["https://example.com"], output_format="text") + assert loader._get_content(MockResult(), "") == "Hello" + + +class TestFetchNodeExperimental: + def test_experimental_config_stored(self): + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={ + "experimental": { + "backend": "obscura", + "obscura": {"cdp_url": "ws://127.0.0.1:9222"}, + }, + "headless": True, + }, + ) + assert fn.experimental == { + "backend": "obscura", + "obscura": {"cdp_url": "ws://127.0.0.1:9222"}, + } + + def test_experimental_default_none(self): + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={"headless": True}, + ) + assert fn.experimental is None + + def test_experimental_unknown_backend(self): + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={ + "experimental": {"backend": "nonexistent"}, + }, + ) + with pytest.raises(ValueError, match="Unknown experimental backend"): + fn.execute({"url": "https://example.com"}) + + def test_experimental_crawl4ai_backend(self): + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={ + "experimental": { + "backend": "crawl4ai", + "crawl4ai": {"output_format": "html"}, + }, + "headless": True, + }, + ) + assert fn.experimental["backend"] == "crawl4ai" + assert fn.experimental["crawl4ai"]["output_format"] == "html" diff --git a/tests/test_experimental_backends_e2e.py b/tests/test_experimental_backends_e2e.py new file mode 100644 index 00000000..5a6ea841 --- /dev/null +++ b/tests/test_experimental_backends_e2e.py @@ -0,0 +1,83 @@ +import pytest +import requests + +pytestmark = pytest.mark.e2e + +TEST_URL = "https://example.com" + + +class TestCrawl4aiLoaderE2E: + def test_fetch_real_url_content(self): + from scrapegraphai.experimental import Crawl4aiLoader + + loader = Crawl4aiLoader([TEST_URL], output_format="markdown", headless=True) + docs = loader.load() + + assert len(docs) == 1 + assert "Example Domain" in docs[0].page_content + assert docs[0].metadata["source"] == TEST_URL + assert docs[0].metadata["backend"] == "crawl4ai" + + def test_fetch_real_url_html_format(self): + from scrapegraphai.experimental import Crawl4aiLoader + + loader = Crawl4aiLoader([TEST_URL], output_format="html", headless=True) + docs = loader.load() + + assert len(docs) == 1 + assert "

" in docs[0].page_content or "Example Domain" in docs[0].page_content + + def test_fetch_real_url_text_format(self): + from scrapegraphai.experimental import Crawl4aiLoader + + loader = Crawl4aiLoader([TEST_URL], output_format="text", headless=True) + docs = loader.load() + + assert len(docs) == 1 + assert "Example Domain" in docs[0].page_content + + +class TestObscuraLoaderE2E: + def test_connection_refused_when_not_running(self): + from scrapegraphai.experimental import ObscuraLoader + + loader = ObscuraLoader( + [TEST_URL], + cdp_url="ws://127.0.0.1:29999/devtools/browser", + auto_start=None, + ) + with pytest.raises((ConnectionRefusedError, OSError, Exception)): + import asyncio + asyncio.run(loader.afetch_page(TEST_URL)) + + +class TestFetchNodeE2E: + def test_fetch_node_with_crawl4ai_backend(self): + from scrapegraphai.nodes.fetch_node import FetchNode + + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={ + "experimental": { + "backend": "crawl4ai", + "crawl4ai": {"output_format": "markdown"}, + }, + "headless": True, + }, + ) + result = fn.execute({"url": TEST_URL}) + assert "doc" in result + content = result["doc"].page_content if hasattr(result["doc"], "page_content") else str(result["doc"]) + assert "Example Domain" in content + + def test_fetch_node_playwright_fallback(self): + from scrapegraphai.nodes.fetch_node import FetchNode + + fn = FetchNode( + input="url | local_dir", + output=["doc"], + node_config={"headless": True}, + ) + result = fn.execute({"url": TEST_URL}) + assert "doc" in result