diff --git a/docs/03_guides/06_scrapy.mdx b/docs/03_guides/06_scrapy.mdx index 12525609a..b1b81c1e7 100644 --- a/docs/03_guides/06_scrapy.mdx +++ b/docs/03_guides/06_scrapy.mdx @@ -46,7 +46,7 @@ The Apify SDK provides several custom components to support integration with the - `apify.scrapy.ApifyScheduler` - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests. - `apify.scrapy.ActorDatasetPushPipeline` - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset. - `apify.scrapy.ApifyHttpProxyMiddleware` - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service. -- `apify.scrapy.extensions.ApifyCacheStorage` - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work. +- `apify.scrapy.extensions.ApifyCacheStorage` - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). To enable caching, set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings. By default, when the spider closes, up to 100 expired and unreadable entries per run are cleaned up. To change this number, update `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS`. Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include: diff --git a/docs/04_upgrading/upgrading_to_v4.md b/docs/04_upgrading/upgrading_to_v4.md index a69c84882..32cc13ca7 100644 --- a/docs/04_upgrading/upgrading_to_v4.md +++ b/docs/04_upgrading/upgrading_to_v4.md @@ -198,3 +198,36 @@ await client.run('my-run').charge('my-event', count=5) ### Async `iterate_*` are no longer coroutine functions `DatasetClientAsync.iterate_items()` and `KeyValueStoreClientAsync.iterate_keys()` are now plain `def` functions returning `AsyncIterator[T]`. Consumer code (`async for ...`) is unchanged; if you annotate the call's return value, change `AsyncGenerator[T, None]` to `AsyncIterator[T]`. + +## Scrapy requests and HTTP cache stored as JSON + +This applies only if you use the Scrapy integration (`apify[scrapy]`). + +The integration now serializes data as JSON instead of pickle. `ApifyScheduler` stores Scrapy requests in the Apify request queue (under `user_data['scrapy_request']`), and `ApifyCacheStorage` stores HTTP cache entries in the key-value store. Both now hold JSON. Unlike pickle, JSON stays stable across Python and library versions. + +### Persisted data from before the upgrade is not read back + +Data written by an older SDK uses the pickle format, which v4 does not load. The two storages handle this differently: + +- HTTP cache: a legacy entry is treated as a cache miss. Scrapy re-fetches the page and re-stores it as JSON, so the cache heals itself. No action is needed. +- Request queue: a request stored by an older SDK cannot be reconstructed, so it is skipped and the failure is logged. This matters only when pre-upgrade requests are still in the queue, for example after a run is migrated or restarted, or when you reuse a named request queue. A fresh run is not affected. + +### `meta` and `cb_kwargs` must be JSON-serializable + +Pickle could store arbitrary Python objects. JSON cannot, so the values in a request's `meta` and `cb_kwargs` are now subject to JSON's type system: + +- A `tuple` comes back as a `list`. +- Non-string `dict` keys come back as strings, so `{1: 'a'}` becomes `{'1': 'a'}`. +- A value JSON cannot represent (`datetime`, `set`, `Decimal`, a custom object) is no longer stored silently. The request is skipped and the failure is logged. Pydantic models are still supported and are dumped with `model_dump()`. + +Convert such values to a JSON-friendly form before yielding the request: + +```python +from datetime import datetime + +# Before (v3): relied on pickle to store the datetime object. +yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1)}) + +# After (v4): store a JSON-serializable value. +yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1).isoformat()}) +``` diff --git a/src/apify/scrapy/_serialization.py b/src/apify/scrapy/_serialization.py new file mode 100644 index 000000000..0cd66b35d --- /dev/null +++ b/src/apify/scrapy/_serialization.py @@ -0,0 +1,138 @@ +"""JSON serialization of Scrapy requests and cached responses for storage on the Apify platform. + +Scrapy requests and cached responses are stored in the Apify request queue and key-value store which hold JSON, +so they are serialized as JSON here rather than pickled. + +Only `body` (`bytes`) and `headers` (`{bytes: [bytes]}`) are not natively JSON-serializable; both sit at fixed keys +and are base64-encoded in place. A `str` `body` is encoded as its UTF-8 bytes and comes back as `bytes`, matching +Scrapy, which always stores `body` as `bytes`. Pydantic models such as Crawlee's `UserData` are dumped via +`model_dump()`. Everything else, notably `meta` and `cb_kwargs`, must already be JSON-serializable, otherwise +serialization fails with a clear error naming the offending value. No in-band sentinel is used, so no user value +can collide with the encoding. + +Known limitations of the pickle -> JSON switch (a documented breaking change): JSON has fewer types than pickle, +so values in `meta`/`cb_kwargs` are subject to JSON's coercions. A `tuple` round-trips as a `list` and non-string +`dict` keys round-trip as strings (e.g. `{1: 'a'}` becomes `{'1': 'a'}`). Values JSON cannot represent at all +(`datetime`, `set`, `Decimal`, arbitrary objects, ...) are not coerced silently: serialization raises and the request +is skipped loudly rather than stored in a corrupted form. +""" + +from __future__ import annotations + +import base64 +import json +from typing import Any + +from pydantic import BaseModel + +# Cap the offending value's repr in a serialization error message so a huge value cannot bloat the log. +_MAX_ERROR_VALUE_REPR_LEN = 200 + + +def encode_to_json(data: dict[str, Any]) -> str: + """Serialize a Scrapy request/response dict to a JSON string. + + The `body` and `headers` fields are base64-encoded in place (a `str` `body` via its UTF-8 bytes); pydantic + models are dumped to plain dicts. A `TypeError` is raised if any other value cannot be JSON-encoded. + + Args: + data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`. + + Returns: + The JSON-encoded string. + """ + if not isinstance(data, dict): + raise TypeError(f'Expected a dict to serialize, got {type(data)}') + + safe = dict(data) + + # `body` is base64-encoded so binary payloads survive; a `str` body is taken as its UTF-8 bytes, which keeps + # encode/decode symmetric (decode always base64-decodes `body` back to `bytes`). + body = safe.get('body') + if isinstance(body, (bytes, str)): + raw_body = body.encode('utf-8') if isinstance(body, str) else body + safe['body'] = base64.b64encode(raw_body).decode('ascii') + + if isinstance(safe.get('headers'), dict): + safe['headers'] = _encode_headers(safe['headers']) + + try: + # `ensure_ascii=False` keeps non-ASCII URLs/meta as their UTF-8 form instead of `\uXXXX` escapes, which + # would otherwise roughly double the size of non-Latin text in storage. + return json.dumps(safe, default=_json_default, ensure_ascii=False) + except TypeError as exc: + raise TypeError( + 'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. ' + 'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, ' + 'list, dict, or a pydantic model).' + ) from exc + + +def decode_from_json(text: str) -> Any: + """Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`. + + The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation. + + Args: + text: The JSON-encoded string. + + Returns: + The decoded object (a dict for valid request/response payloads). + """ + data = json.loads(text) + if not isinstance(data, dict): + return data + + # `validate=True` makes a non-base64 body raise loudly instead of silently decoding to garbage. + if isinstance(data.get('body'), str): + data['body'] = base64.b64decode(data['body'], validate=True) + + if isinstance(data.get('headers'), dict): + data['headers'] = _decode_headers(data['headers']) + + return data + + +def _json_default(obj: Any) -> Any: + """Fallback for values `json.dumps` cannot serialize: pydantic models are dumped, anything else raises. + + The error names the offending value (type and a truncated repr) so a failed serialization points straight + at the bad `meta`/`cb_kwargs` entry instead of just reporting that something failed. + """ + if isinstance(obj, BaseModel): + return obj.model_dump(by_alias=True) + value_repr = repr(obj) + if len(value_repr) > _MAX_ERROR_VALUE_REPR_LEN: + value_repr = value_repr[:_MAX_ERROR_VALUE_REPR_LEN] + '...' + raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable: {value_repr}') + + +def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]: + """Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`.""" + encoded: dict[str, list[str]] = {} + for key, value in headers.items(): + str_key = key.decode('latin-1') if isinstance(key, bytes) else key + values = value if isinstance(value, (list, tuple)) else [value] + encoded[str_key] = [_b64encode_value(item) for item in values] + return encoded + + +def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]: + """Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects.""" + decoded: dict[bytes, list[bytes]] = {} + for key, value in headers.items(): + bytes_key = key.encode('latin-1') if isinstance(key, str) else key + values = value if isinstance(value, list) else [value] + decoded[bytes_key] = [base64.b64decode(item, validate=True) for item in values] + return decoded + + +def _b64encode_value(value: Any) -> str: + """Base64-encode a single header value. + + Scrapy stores header values as `bytes`; a `str` is encoded as its UTF-8 bytes. Any other type is coerced with + `str()` as a lenient last resort. That coercion is lossy (e.g. `5` becomes `b'5'`), but Scrapy does not produce + non-`bytes`/`str` header values, so it is not hit on the real path. + """ + raw = value if isinstance(value, bytes) else str(value).encode('utf-8') + return base64.b64encode(raw).decode('ascii') diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 14d8753d3..0909c583b 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -2,7 +2,6 @@ import gzip import io -import pickle import re import struct from logging import getLogger @@ -14,6 +13,7 @@ from apify import Configuration from apify.scrapy._async_thread import AsyncThread +from apify.scrapy._serialization import decode_from_json, encode_to_json from apify.storage_clients import ApifyStorageClient from apify.storages import KeyValueStore @@ -29,14 +29,14 @@ class ApifyCacheStorage: """A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses. - It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches - responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) - in the Scrapy documentation for more information. Requires the asyncio Twisted reactor - to be installed. + It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches responses to requests. + See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) in the Scrapy documentation for more information. + Requires the asyncio Twisted reactor to be installed. """ def __init__(self, settings: BaseSettings) -> None: - self._expiration_max_items = 100 + # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). + self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') self._spider: Spider | None = None self._kvs: KeyValueStore | None = None @@ -79,23 +79,26 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: async def expire_kvs() -> None: if self._kvs is None: raise ValueError('Key value store not initialized') - i = 0 + # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, + # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats + # an expired entry as a cache miss. + processed = 0 async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 value = await self._kvs.get_value(item.key) try: gzip_time = read_gzip_time(value) except Exception as e: logger.warning(f'Malformed cache item {item.key}: {e}') - await self._kvs.set_value(item.key, None) + await self._kvs.delete_value(item.key) else: if self._expiration_secs < current_time - gzip_time: logger.debug(f'Expired cache item {item.key}') - await self._kvs.set_value(item.key, None) + await self._kvs.delete_value(item.key) else: logger.debug(f'Valid cache item {item.key}') - if i == self._expiration_max_items: - break - i += 1 self._async_thread.run_coro(expire_kvs()) @@ -127,17 +130,25 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non if current_time is None: current_time = int(time()) - if 0 < self._expiration_secs < current_time - read_gzip_time(value): - logger.debug('Cache expired', extra={'request': request}) + + # A malformed or legacy cache entry must not crash retrieval; treat it as a cache miss so Scrapy re-fetches + # and re-stores it in the current format. The field reads stay inside the `try` as well: a value that decodes + # to a dict missing any expected key (a forward/older format, or a truncated-but-valid JSON payload) must + # also degrade to a miss rather than raising an uncaught `KeyError`. + try: + if 0 < self._expiration_secs < current_time - read_gzip_time(value): + logger.debug('Cache expired', extra={'request': request}) + return None + data = from_gzip(value) + url = data['url'] + status = data['status'] + headers = Headers(data['headers']) + body = data['body'] + except Exception as exc: + logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request}) return None - data = from_gzip(value) - url = data['url'] - status = data['status'] - headers = Headers(data['headers']) - body = data['body'] respcls = responsetypes.from_args(headers=headers, url=url, body=body) - logger.debug('Cache hit', extra={'request': request}) return respcls(url=url, headers=headers, status=status, body=body) @@ -162,18 +173,25 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non def to_gzip(data: dict, mtime: int | None = None) -> bytes: - """Dump a dictionary to a gzip-compressed byte stream.""" + """Dump a dictionary to a gzip-compressed JSON byte stream. + + Cache entries live in the Apify key-value store, which holds JSON, so they are serialized as JSON rather + than pickled. See `apify.scrapy._serialization` for the encoding. + """ + payload = encode_to_json(data).encode('utf-8') with io.BytesIO() as byte_stream: with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file: - pickle.dump(data, gzip_file, protocol=4) + gzip_file.write(payload) return byte_stream.getvalue() def from_gzip(gzip_bytes: bytes) -> dict: - """Load a dictionary from a gzip-compressed byte stream.""" + """Load a dictionary from a gzip-compressed JSON byte stream.""" with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file: - data: dict = pickle.load(gzip_file) - return data + data = decode_from_json(gzip_file.read().decode('utf-8')) + if not isinstance(data, dict): + raise TypeError(f'Expected a dict from the cached payload, got {type(data)}') + return data def read_gzip_time(gzip_bytes: bytes) -> int: @@ -187,17 +205,15 @@ def read_gzip_time(gzip_bytes: bytes) -> int: def get_kvs_name(spider_name: str, max_length: int = 60) -> str: """Get the key value store name for a spider. - The key value store name is derived from the spider name by replacing all special characters - with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with - 'httpcache-' and truncated to the maximum length. + The key value store name is derived from the spider name by replacing all special characters with hyphens + and trimming leading and trailing hyphens. The resulting name is prefixed with 'httpcache-' and truncated + to the maximum length. - The documentation - [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages) + The documentation [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages) mentions that names can be up to 63 characters long, so the default max length is set to 60. - Such naming isn't unique per spider, but should be sufficiently unique for most use cases. - The name of the key value store should indicate to which spider it belongs, e.g. in - the listing in the Apify's console. + Such naming isn't unique per spider, but should be sufficiently unique for most use cases. The name + of the key-value store should indicate to which spider it belongs, e.g. in the listing in the Apify's console. Args: spider_name: Value of the Spider instance's name attribute. diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index b5ab0d498..38d6648df 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -1,23 +1,51 @@ from __future__ import annotations -import codecs -import pickle from logging import getLogger from typing import Any, cast from scrapy import Request as ScrapyRequest from scrapy import Spider from scrapy.http.headers import Headers +from scrapy.utils.misc import load_object from scrapy.utils.request import request_from_dict from crawlee._request import UserData from crawlee._types import HttpHeaders +from ._serialization import decode_from_json, encode_to_json from apify import Request as ApifyRequest logger = getLogger(__name__) +def _ensure_known_request_class(request_dict: dict[str, Any]) -> None: + """Validate the optional `_class` entry before `request_from_dict` instantiates it. + + `request_from_dict` resolves `_class` with `load_object` and calls it with the request kwargs. The dotted path + is resolved here first and rejected unless it is a `scrapy.Request` subclass, so a payload can never coerce + reconstruction into instantiating an arbitrary callable. Resolving may import the class's module (the same + import `request_from_dict` would do, and far safer than the arbitrary code execution the previous pickle + format allowed). That import is what lets a custom `Request` subclass be rebuilt in a fresh process after + an Actor migration, before the spider has lazily imported it. + """ + class_path = request_dict.get('_class') + if class_path is None: + return + + if not isinstance(class_path, str): + raise TypeError(f'Invalid scrapy_request `_class`, expected a string, got {type(class_path)}') + + try: + request_cls = load_object(class_path) + except (ImportError, AttributeError, ValueError, NameError) as exc: + raise TypeError(f'Refusing to reconstruct a Scrapy request: cannot resolve `_class` {class_path!r}.') from exc + + if not (isinstance(request_cls, type) and issubclass(request_cls, ScrapyRequest)): + raise TypeError( + f'Refusing to reconstruct a Scrapy request of type {class_path!r}: it is not a scrapy.Request subclass.' + ) + + def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequest | None: """Convert a Scrapy request to an Apify request. @@ -35,6 +63,9 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ logger.debug(f'to_apify_request was called (scrapy_request={scrapy_request})...') # Configuration to behave as similarly as possible to Scrapy's default RFPDupeFilter. + # + # The body is stored twice on purpose: as `payload` (used for the extended unique key) and inside the serialized + # Scrapy request below (used to reconstruct it). Both come from `scrapy_request.body`. request_kwargs: dict[str, Any] = { 'url': scrapy_request.url, 'method': scrapy_request.method, @@ -46,50 +77,69 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ try: if scrapy_request.dont_filter: request_kwargs['always_enqueue'] = True - else: - if scrapy_request.meta.get('apify_request_unique_key'): - request_kwargs['unique_key'] = scrapy_request.meta['apify_request_unique_key'] - - if scrapy_request.meta.get('apify_request_id'): - request_kwargs['id'] = scrapy_request.meta['apify_request_id'] + elif scrapy_request.meta.get('apify_request_unique_key'): + request_kwargs['unique_key'] = scrapy_request.meta['apify_request_unique_key'] user_data = scrapy_request.meta.get('userData', {}) - # Convert UserData Pydantic model to a plain dict to prevent CrawleeRequestData objects - # from leaking into Request.from_url() during Scrapy-Apify roundtrips. + # Convert UserData Pydantic model to a plain dict to prevent CrawleeRequestData objects from leaking + # into Request.from_url() during Scrapy-Apify roundtrips. if isinstance(user_data, UserData): user_data = user_data.model_dump(by_alias=True) - # Remove internal Crawlee data since it's managed by Request.from_url() and values - # from previous roundtrips cause incorrect state. + # Remove internal Crawlee data since it's managed by Request.from_url() and values from previous roundtrips + # cause incorrect state. if isinstance(user_data, dict): user_data.pop('__crawlee', None) request_kwargs['user_data'] = user_data if isinstance(user_data, dict) else {} - # Convert Scrapy's headers to a HttpHeaders and store them in the apify_request + # Store an Apify-platform view of the headers. The authoritative copy with exact bytes travels in + # the serialized scrapy_request below, so non-UTF-8 headers (which make `to_unicode_dict()` raise) are + # tolerated rather than dropping the whole request. + # + # Trade-off: with `use_extended_unique_key=True` the unique key includes the headers, so when non-UTF-8 + # headers are omitted here two requests differing only in those headers share a unique key and one is + # deduplicated away. This is rare (header values are normally ASCII/UTF-8) and still strictly better than + # the old behavior, which dropped such requests entirely. if isinstance(scrapy_request.headers, Headers): - headers = cast('dict[str, str]', dict(scrapy_request.headers.to_unicode_dict())) - request_kwargs['headers'] = HttpHeaders(headers) + try: + headers = cast('dict[str, str]', dict(scrapy_request.headers.to_unicode_dict())) + request_kwargs['headers'] = HttpHeaders(headers) + except UnicodeDecodeError: + logger.warning( + 'Could not represent Scrapy request headers as Apify request headers (non-UTF-8 values); ' + 'they are preserved in the serialized request instead.' + ) else: logger.warning( f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}' ) apify_request = ApifyRequest.from_url(**request_kwargs) - - # Serialize the Scrapy ScrapyRequest and store it in the apify_request. - # - This process involves converting the Scrapy ScrapyRequest object into a dictionary, encoding it to base64, - # and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request. - # - The serialization process can be referenced at: https://stackoverflow.com/questions/30469575/. scrapy_request_dict = scrapy_request.to_dict(spider=spider) - scrapy_request_dict_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() - apify_request.user_data['scrapy_request'] = scrapy_request_dict_encoded except Exception as exc: logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}') return None + # Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so + # a non-JSON-serializable `meta`/`cb_kwargs` is logged with a traceback and the request skipped (returning + # None per this function's contract), rather than crashing the crawl. + try: + scrapy_request_json = encode_to_json(scrapy_request_dict) + except TypeError: + logger.exception( + f'Failed to serialize Scrapy request {scrapy_request} for storage on the Apify platform; skipping it. ' + 'Ensure all values in `meta` and `cb_kwargs` are JSON-serializable.' + ) + return None + + # `scrapy_request_json` is already JSON-safe text (binary fields are base64-encoded inside it), so it is stored + # as-is. The request queue serializes `user_data` to JSON, which escapes the string correctly; wrapping it in + # a second base64 layer would only add ~33% overhead on the enqueue path. + apify_request.user_data['scrapy_request'] = scrapy_request_json + logger.debug(f'scrapy_request was converted to the apify_request={apify_request}') return apify_request @@ -102,32 +152,34 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ spider: The Scrapy spider that the request is associated with. Raises: - TypeError: If the Apify request is not an instance of the `ApifyRequest` class. - ValueError: If the Apify request does not contain the required keys. + TypeError: If `apify_request` is not an `ApifyRequest`, if the stored Scrapy request payload is malformed, + or if its `_class` cannot be resolved to a `scrapy.Request` subclass. Returns: The converted Scrapy request. """ if not isinstance(cast('Any', apify_request), ApifyRequest): - raise TypeError('apify_request must be a crawlee.ScrapyRequest instance') + raise TypeError('apify_request must be an apify.Request instance') logger.debug(f'to_scrapy_request was called (apify_request={apify_request})...') # If the apify_request comes from the Scrapy if 'scrapy_request' in apify_request.user_data: - # Deserialize the Scrapy ScrapyRequest from the apify_request. - # - This process involves decoding the base64-encoded request data and reconstructing - # the Scrapy ScrapyRequest object from its dictionary representation. + # Deserialize the Scrapy ScrapyRequest from the apify_request by parsing the stored JSON and reconstructing + # the Scrapy ScrapyRequest object from its dictionary representation. logger.debug('Restoring the Scrapy ScrapyRequest from the apify_request...') - scrapy_request_dict_encoded = apify_request.user_data['scrapy_request'] - if not isinstance(scrapy_request_dict_encoded, str): - raise TypeError('scrapy_request_dict_encoded must be a string') + scrapy_request_json = apify_request.user_data['scrapy_request'] + if not isinstance(scrapy_request_json, str): + raise TypeError('the stored scrapy_request must be a string') - scrapy_request_dict = pickle.loads(codecs.decode(scrapy_request_dict_encoded.encode(), 'base64')) + scrapy_request_dict = decode_from_json(scrapy_request_json) if not isinstance(scrapy_request_dict, dict): raise TypeError('scrapy_request_dict must be a dictionary') + # Validate any `_class` entry before request_from_dict resolves and imports it. + _ensure_known_request_class(scrapy_request_dict) + scrapy_request = request_from_dict(scrapy_request_dict, spider=spider) if not isinstance(scrapy_request, ScrapyRequest): raise TypeError('scrapy_request must be an instance of the ScrapyRequest class') diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 23db7b62b..2b95d30c8 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -162,20 +162,24 @@ def next_request(self) -> Request | None: if not isinstance(self.spider, Spider): raise TypeError('self.spider must be an instance of the Spider class') - # Let the request queue know that the request is being handled. Every request should - # be marked as handled, retrying is handled by the Scrapy's RetryMiddleware. + # Reconstruct the Scrapy request before consuming the queue entry. A malformed entry must not crash + # the whole run, so on failure it is logged and skipped (None) rather than propagating. + try: + scrapy_request = to_scrapy_request(apify_request, spider=self.spider) + except Exception: + logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + scrapy_request = None + + # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry + # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back + # forever. Retrying genuine failures is the RetryMiddleware's job. try: self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) except Exception: traceback.print_exc() raise - # Reconstruct the Scrapy request. A malformed queue entry must not crash the whole run: it - # has already been marked handled above, so log it and skip it instead of propagating. - try: - scrapy_request = to_scrapy_request(apify_request, spider=self.spider) - except Exception: - logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + if scrapy_request is None: return None logger.debug(f'Converted to scrapy_request: {scrapy_request}') diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 8e250aa05..6f34853b5 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -1,44 +1,279 @@ +from __future__ import annotations + +import asyncio +import gzip +import io +import json +import pickle from time import time +from types import SimpleNamespace +from typing import TYPE_CHECKING, Any, cast import pytest +from scrapy import Request +from scrapy.http import HtmlResponse +from scrapy.settings import Settings + +from apify.scrapy.extensions._httpcache import ApifyCacheStorage, from_gzip, get_kvs_name, read_gzip_time, to_gzip -from apify.scrapy.extensions._httpcache import from_gzip, get_kvs_name, read_gzip_time, to_gzip +if TYPE_CHECKING: + from collections.abc import AsyncIterator FIXTURE_DICT = {'name': 'Alice'} +# Gzip-compressed JSON (the pickle-free format) of FIXTURE_DICT with mtime=0. FIXTURE_BYTES = ( - b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s' - b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00' + b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xabV\xcaK\xccMU\xb2RPr\xcc\xc9LNU' + b'\xaa\x05\x00\x03\x9a\x9d\xb0\x11\x00\x00\x00' ) def test_gzip() -> None: + """`to_gzip` then `from_gzip` round-trips a dict unchanged.""" assert from_gzip(to_gzip(FIXTURE_DICT)) == FIXTURE_DICT def test_to_gzip() -> None: + """`to_gzip` produces the expected gzip-compressed JSON bytes for a fixed mtime.""" data_bytes = to_gzip(FIXTURE_DICT, mtime=0) assert data_bytes == FIXTURE_BYTES def test_from_gzip() -> None: + """`from_gzip` decodes gzip-compressed JSON bytes back into the original dict.""" data_dict = from_gzip(FIXTURE_BYTES) assert data_dict == FIXTURE_DICT def test_read_gzip_time() -> None: + """`read_gzip_time` reads the mtime stored in the gzip header (here 0).""" assert read_gzip_time(FIXTURE_BYTES) == 0 def test_read_gzip_time_non_zero() -> None: + """`read_gzip_time` reads back a non-zero mtime written into the gzip header.""" current_time = int(time()) data_bytes = to_gzip(FIXTURE_DICT, mtime=current_time) assert read_gzip_time(data_bytes) == current_time +def test_gzip_round_trips_binary_response() -> None: + """A cached response with a binary body and Scrapy-style bytes headers round-trips.""" + data = { + 'status': 200, + 'url': 'https://example.com', + 'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}, + 'body': b'\xff\xfe', + } + + assert from_gzip(to_gzip(data)) == data + + +def test_from_gzip_rejects_pickle_payload() -> None: + """Cache entries are gzip-compressed JSON, so a pickle payload is rejected rather than loaded.""" + with io.BytesIO() as byte_stream: + with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file: + pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4) + pickle_payload = byte_stream.getvalue() + + with pytest.raises((UnicodeDecodeError, json.JSONDecodeError, ValueError)): + from_gzip(pickle_payload) + + +class _FakeAsyncThread: + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + def close(self) -> None: + """No-op; `close_spider` calls this when shutting the cache storage down.""" + + +class _FakeKvs: + def __init__(self, value: bytes | None) -> None: + self._value = value + + async def get_value(self, _: str) -> bytes | None: + return self._value + + async def set_value(self, _: str, value: bytes) -> None: + self._value = value + + +class _FakeFingerprinter: + def fingerprint(self, _: Request) -> bytes: + return b'\xab\xcd' + + +def _make_storage(value: bytes | None, *, expiration_secs: int = 0) -> ApifyCacheStorage: + storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': expiration_secs})) + storage._async_thread = _FakeAsyncThread() # ty: ignore[invalid-assignment] + storage._kvs = _FakeKvs(value) # ty: ignore[invalid-assignment] + storage._fingerprinter = _FakeFingerprinter() # ty: ignore[invalid-assignment] + return storage + + +def test_retrieve_response_returns_cached_response() -> None: + """A stored gzip-JSON entry is returned as a reconstructed Scrapy response.""" + data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'hello'} + storage = _make_storage(to_gzip(data)) + response = storage.retrieve_response(None, Request('https://example.com')) # ty: ignore[invalid-argument-type] + assert response is not None + assert response.status == 200 + assert response.body == b'hello' + + +def test_retrieve_response_ignores_legacy_pickle_item() -> None: + """A legacy gzip-pickle entry degrades to a cache miss instead of raising.""" + # A gzip-wrapped pickle payload is the legacy (pre-JSON) cache format that the JSON reader cannot + # load. After the upgrade, such an item must degrade to a cache miss instead of raising and breaking + # the download, so the cache self-heals (re-fetch and re-store as JSON) rather than crashing. + with io.BytesIO() as byte_stream: + with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file: + pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4) + legacy_pickle = byte_stream.getvalue() + storage = _make_storage(legacy_pickle) + assert storage.retrieve_response(None, Request('https://example.com')) is None # ty: ignore[invalid-argument-type] + + +def test_retrieve_response_missing_key_is_cache_miss() -> None: + """A valid payload missing an expected field (here `url`) degrades to a cache miss, not a `KeyError`.""" + value = to_gzip({'status': 200, 'headers': {}, 'body': b'x'}) # no 'url' + storage = _make_storage(value) + assert storage.retrieve_response(None, Request('https://example.com')) is None # ty: ignore[invalid-argument-type] + + +def test_store_then_retrieve_round_trips_response() -> None: + """A stored response round-trips through the cache and is persisted as gzip-JSON, never pickle.""" + storage = _make_storage(None) + request = Request('https://example.com') + response = HtmlResponse( + url='https://example.com', + status=200, + headers={'Content-Type': 'text/html'}, + body=b'cached', + ) + + storage.store_response(None, request, response) # ty: ignore[invalid-argument-type] + + # The persisted bytes are gzip-compressed JSON (decodable by `from_gzip`), never a pickle payload. + stored = cast('_FakeKvs', storage._kvs)._value + assert isinstance(stored, bytes) + assert from_gzip(stored)['body'] == b'cached' + + retrieved = storage.retrieve_response(None, request) # ty: ignore[invalid-argument-type] + assert retrieved is not None + assert retrieved.status == 200 + assert retrieved.url == 'https://example.com' + assert retrieved.body == b'cached' + assert retrieved.headers.get('Content-Type') == b'text/html' + + +def test_retrieve_response_treats_expired_entry_as_miss() -> None: + """A cached entry older than `HTTPCACHE_EXPIRATION_SECS` is a cache miss at retrieval time.""" + data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'x'} + storage = _make_storage(to_gzip(data, mtime=0), expiration_secs=100) + request = Request('https://example.com') + # Retrieved 1000s after the entry's mtime (0), well past the 100s expiration window. + assert storage.retrieve_response(None, request, current_time=1000) is None # ty: ignore[invalid-argument-type] + + +def test_retrieve_response_returns_fresh_entry_within_expiration() -> None: + """An entry newer than `HTTPCACHE_EXPIRATION_SECS` is still a cache hit at retrieval time.""" + data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'hello'} + storage = _make_storage(to_gzip(data, mtime=950), expiration_secs=100) + request = Request('https://example.com') + response = storage.retrieve_response(None, request, current_time=1000) # ty: ignore[invalid-argument-type] + assert response is not None + assert response.body == b'hello' + + +class _KeyIterator: + """A plain async iterator over a snapshot of keys (not a generator, so an early `break` needs no close).""" + + def __init__(self, keys: list[str]) -> None: + self._keys = keys + self._index = 0 + + def __aiter__(self) -> _KeyIterator: + return self + + async def __anext__(self) -> Any: + if self._index >= len(self._keys): + raise StopAsyncIteration + key = self._keys[self._index] + self._index += 1 + return SimpleNamespace(key=key) + + +class _RecordingKvs: + """An in-memory key-value store double for the cleanup sweep that records deletions.""" + + def __init__(self, items: dict[str, bytes | None]) -> None: + self._items = dict(items) + self.deleted: list[str] = [] + + def iterate_keys(self) -> AsyncIterator[Any]: + # Snapshot the keys, mirroring the real clients (so deleting while iterating is safe). + return _KeyIterator(list(self._items)) + + async def get_value(self, key: str) -> bytes | None: + return self._items.get(key) + + async def delete_value(self, key: str) -> None: + self.deleted.append(key) + self._items.pop(key, None) + + +def _make_cleanup_storage( + items: dict[str, bytes | None], + *, + expiration_secs: int, + max_items: int = 100, +) -> tuple[ApifyCacheStorage, _RecordingKvs]: + storage = ApifyCacheStorage( + Settings({'HTTPCACHE_EXPIRATION_SECS': expiration_secs, 'APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS': max_items}) + ) + storage._async_thread = _FakeAsyncThread() # ty: ignore[invalid-assignment] + kvs = _RecordingKvs(items) + storage._kvs = kvs # ty: ignore[invalid-assignment] + return storage, kvs + + +def test_close_spider_deletes_expired_and_malformed_but_keeps_valid() -> None: + """The close-spider cleanup deletes expired and unreadable entries and leaves fresh ones in place.""" + current_time = 1000 + fresh = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b''} + items: dict[str, bytes | None] = { + 'expired': to_gzip(fresh, mtime=0), # age 1000s > expiration 100s + 'valid': to_gzip(fresh, mtime=current_time), # age 0s + 'malformed': b'bad', # too short for read_gzip_time to parse the mtime header + } + storage, kvs = _make_cleanup_storage(items, expiration_secs=100) + + storage.close_spider(None, current_time=current_time) # ty: ignore[invalid-argument-type] + + assert set(kvs.deleted) == {'expired', 'malformed'} + + +def test_close_spider_respects_max_items() -> None: + """At most `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS` entries are swept per close (exactly max, not max+1).""" + current_time = 1000 + expired = to_gzip({'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b''}, mtime=0) + items: dict[str, bytes | None] = {f'k{i}': expired for i in range(5)} + storage, kvs = _make_cleanup_storage(items, expiration_secs=100, max_items=2) + + storage.close_spider(None, current_time=current_time) # ty: ignore[invalid-argument-type] + + assert len(kvs.deleted) == 2 + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ @@ -55,6 +290,7 @@ def test_read_gzip_time_non_zero() -> None: ], ) def test_get_kvs_name(spider_name: str, expected: str) -> None: + """A spider name is normalized into a valid `httpcache-` key-value store name.""" assert get_kvs_name(spider_name) == expected @@ -67,5 +303,6 @@ def test_get_kvs_name(spider_name: str, expected: str) -> None: ], ) def test_get_kvs_name_raises(spider_name: str) -> None: + """A spider name that normalizes to empty raises `ValueError`.""" with pytest.raises(ValueError, match=r'Unsupported spider name'): assert get_kvs_name(spider_name) diff --git a/tests/unit/scrapy/requests/_custom_request_module.py b/tests/unit/scrapy/requests/_custom_request_module.py new file mode 100644 index 000000000..26f760db3 --- /dev/null +++ b/tests/unit/scrapy/requests/_custom_request_module.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from scrapy import Request + + +class CustomRequest(Request): + """A custom Scrapy `Request` subclass living in its own module, so a test can drop it from `sys.modules` + and verify reconstruction imports it on demand (mimicking an Actor migrating to a fresh process). + """ diff --git a/tests/unit/scrapy/requests/test_to_apify_request.py b/tests/unit/scrapy/requests/test_to_apify_request.py index 4616af646..482d5c5da 100644 --- a/tests/unit/scrapy/requests/test_to_apify_request.py +++ b/tests/unit/scrapy/requests/test_to_apify_request.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import cast import pytest @@ -22,6 +23,7 @@ def spider() -> DummySpider: def test_creates_simple_request(spider: Spider) -> None: + """A simple Scrapy request converts to an Apify request carrying the serialized `scrapy_request`.""" scrapy_request = Request(url='https://example.com') apify_request = to_apify_request(scrapy_request, spider) @@ -34,6 +36,7 @@ def test_creates_simple_request(spider: Spider) -> None: def test_handles_headers(spider: Spider) -> None: + """Scrapy request headers are carried onto the Apify request as `HttpHeaders`.""" scrapy_request_headers = Headers({'Authorization': 'Bearer access_token'}) scrapy_request = Request(url='https://example.com', headers=scrapy_request_headers) @@ -45,6 +48,7 @@ def test_handles_headers(spider: Spider) -> None: def test_without_id_and_unique_key(spider: Spider) -> None: + """A request without an id or unique key converts, preserving its user data.""" scrapy_request = Request( url='https://example.com', method='GET', @@ -65,6 +69,7 @@ def test_without_id_and_unique_key(spider: Spider) -> None: def test_with_id_and_unique_key(spider: Spider) -> None: + """An explicit `apify_request_unique_key` in `meta` becomes the Apify request's unique key.""" scrapy_request = Request( url='https://example.com', method='GET', @@ -89,20 +94,26 @@ def test_with_id_and_unique_key(spider: Spider) -> None: def test_invalid_scrapy_request_returns_none(spider: Spider) -> None: + """A non-Scrapy-request input returns None instead of raising.""" scrapy_request = 'invalid_request' apify_request = to_apify_request(scrapy_request, spider) # ty: ignore[invalid-argument-type] assert apify_request is None -def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None: - """Reproduce: CrawleeRequestData() argument after ** must be a mapping, not CrawleeRequestData. +def test_non_json_serializable_meta_is_skipped(spider: Spider, caplog: pytest.LogCaptureFixture) -> None: + """A non-JSON-serializable value in `meta` is skipped (returns None) and logged, not crashing the crawl.""" + scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}}) + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.requests'): + apify_request = to_apify_request(scrapy_request, spider) + + assert apify_request is None + assert any('JSON-serializable' in record.getMessage() for record in caplog.records) + - After two roundtrips through to_apify_request/to_scrapy_request with userData propagation, - Request.from_url() writes a CrawleeRequestData object into UserData.__pydantic_extra__['__crawlee']. - On the next roundtrip, this CrawleeRequestData object is found by user_data_dict.get('__crawlee') - and passed to CrawleeRequestData(**obj), which fails because CrawleeRequestData is not a mapping. - """ +def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None: + """Regression: propagating userData across repeated roundtrips must not fail on `__crawlee` data.""" # Step 1: Initial request -> first roundtrip initial_scrapy_request = Request(url='https://example.com/page') apify_request_1 = to_apify_request(initial_scrapy_request, spider) @@ -127,3 +138,29 @@ def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> follow_up_apify_request = to_apify_request(follow_up_2, spider) assert follow_up_apify_request is not None assert follow_up_apify_request.url == 'https://example.com/image.png' + + +def test_dont_filter_request_is_always_enqueued(spider: Spider) -> None: + """A `dont_filter=True` request is always enqueued: each conversion gets a fresh unique key, bypassing dedup.""" + first = to_apify_request(Request(url='https://example.com', dont_filter=True), spider) + second = to_apify_request(Request(url='https://example.com', dont_filter=True), spider) + + assert first is not None + assert second is not None + # `always_enqueue` prefixes the unique key with a random token (`|`), so two otherwise-identical + # requests get distinct unique keys and neither is deduplicated against the other. + assert '|' in first.unique_key + assert first.unique_key != second.unique_key + + +def test_apify_request_id_in_meta_is_ignored(spider: Spider) -> None: + """An `apify_request_id` in `meta` is ignored and does not break conversion; the unique key still applies.""" + scrapy_request = Request( + url='https://example.com', + meta={'apify_request_id': 'myCustomId12345', 'apify_request_unique_key': 'https://example.com'}, + ) + + apify_request = to_apify_request(scrapy_request, spider) + + assert apify_request is not None + assert apify_request.unique_key == 'https://example.com' diff --git a/tests/unit/scrapy/requests/test_to_scrapy_request.py b/tests/unit/scrapy/requests/test_to_scrapy_request.py index fadb25249..898312f28 100644 --- a/tests/unit/scrapy/requests/test_to_scrapy_request.py +++ b/tests/unit/scrapy/requests/test_to_scrapy_request.py @@ -1,14 +1,18 @@ from __future__ import annotations -import binascii +import codecs +import json +import pickle +import sys import pytest -from scrapy import Request, Spider +from scrapy import FormRequest, Request, Spider from crawlee._types import HttpHeaders from apify import Request as ApifyRequest -from apify.scrapy.requests import to_scrapy_request +from apify.scrapy._serialization import encode_to_json +from apify.scrapy.requests import to_apify_request, to_scrapy_request class DummySpider(Spider): @@ -21,8 +25,34 @@ def spider() -> DummySpider: return DummySpider() +# A JSON-encoded (pickle-free) Scrapy request for a GET request to https://apify.com, encoded exactly +# the way `to_apify_request` stores it under `user_data['scrapy_request']` (JSON, no outer base64). +_SCRAPY_REQUEST_JSON_ENCODED = encode_to_json( + { + 'url': 'https://apify.com', + 'callback': None, + 'errback': None, + 'headers': { + b'Accept': [b'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'], + b'Accept-Language': [b'en'], + b'User-Agent': [b'Scrapy/2.11.0 (+https://scrapy.org)'], + b'Accept-Encoding': [b'gzip, deflate'], + }, + 'body': b'', + 'cookies': {}, + 'meta': {'apify_request_id': 'fvwscO2UJLdr10B', 'apify_request_unique_key': 'https://apify.com'}, + 'encoding': 'utf-8', + 'flags': [], + 'cb_kwargs': {}, + 'dont_filter': False, + 'method': 'GET', + 'priority': 0, + } +) + + def test_without_reconstruction(spider: Spider) -> None: - # Without reconstruction of encoded Scrapy request + """An Apify request without a stored `scrapy_request` becomes a fresh Scrapy request (no reconstruction).""" apify_request = ApifyRequest( url='https://example.com', method='GET', @@ -39,7 +69,7 @@ def test_without_reconstruction(spider: Spider) -> None: def test_without_reconstruction_with_optional_fields(spider: Spider) -> None: - # Without reconstruction of encoded Scrapy request + """The without-reconstruction path also carries optional headers and user data to the Scrapy request.""" apify_request = ApifyRequest( url='https://crawlee.dev', method='GET', @@ -62,13 +92,13 @@ def test_without_reconstruction_with_optional_fields(spider: Spider) -> None: def test_with_reconstruction(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + """An Apify request with a stored `scrapy_request` is reconstructed from its JSON payload.""" apify_request = ApifyRequest( url='https://apify.com', method='GET', unique_key='https://apify.com', user_data={ - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -82,7 +112,7 @@ def test_with_reconstruction(spider: Spider) -> None: def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + """Reconstruction from the stored JSON payload also restores optional headers and user data.""" apify_request = ApifyRequest( url='https://apify.com', method='GET', @@ -90,7 +120,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: headers=HttpHeaders({'Authorization': 'Bearer access_token'}), user_data={ 'some_user_data': 'hello', - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -108,6 +138,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: def test_invalid_request_for_reconstruction(spider: Spider) -> None: + """A stored `scrapy_request` that is not valid JSON fails to decode during reconstruction.""" apify_request = ApifyRequest( url='https://example.com', method='GET', @@ -117,5 +148,295 @@ def test_invalid_request_for_reconstruction(spider: Spider) -> None: }, ) - with pytest.raises(binascii.Error): + # The stored value is parsed as JSON directly (no outer base64), so non-JSON text fails to decode. + with pytest.raises(json.JSONDecodeError): + to_scrapy_request(apify_request, spider) + + +def test_pickle_payload_rejected(spider: Spider) -> None: + """Data under 'scrapy_request' is JSON, so a pickle payload is rejected rather than deserialized.""" + # Build a pickle payload like the old code produced. + scrapy_request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'method': 'GET', + 'body': b'', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + } + pickle_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() + + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': pickle_encoded}, + ) + + # The JSON-based reconstruction must reject the pickle payload. + with pytest.raises((json.JSONDecodeError, UnicodeDecodeError, ValueError)): + to_scrapy_request(apify_request, spider) + + +def test_roundtrip_serialization(spider: Spider) -> None: + """A `to_apify_request` -> `to_scrapy_request` round-trip preserves the request under JSON encoding.""" + original_request = Request( + url='https://example.com/test', + method='POST', + body=b'test body content', + headers={'Content-Type': 'application/json', 'X-Custom': 'value'}, + meta={'userData': {'custom_key': 'custom_value'}}, + ) + + apify_request = to_apify_request(original_request, spider) + assert apify_request is not None + + # Verify the stored data is JSON (not pickle) and not wrapped in an outer base64 layer. + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + decoded_json = json.loads(encoded) + assert isinstance(decoded_json, dict) + assert decoded_json['url'] == 'https://example.com/test' + + # Reconstruct the Scrapy request + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, Request) + assert restored.url == original_request.url + assert restored.method == original_request.method + assert restored.body == original_request.body + + +def test_no_pickle_in_serialized_output(spider: Spider) -> None: + """`to_apify_request` produces JSON output, never a pickle payload.""" + scrapy_request = Request(url='https://example.com') + apify_request = to_apify_request(scrapy_request, spider) + assert apify_request is not None + + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + + # Pickle output starts with the b'\x80' opcode byte; JSON is text starting with '{'. + assert encoded.lstrip().startswith('{'), 'Output must be JSON, not pickle' + json.loads(encoded) # must parse as JSON + + +def _encode_request_dict(request_dict: dict) -> str: + """Encode a raw request dict the same way `to_apify_request` does (JSON, no outer base64).""" + return encode_to_json(request_dict) + + +def test_binary_body_round_trips(spider: Spider) -> None: + """Non-UTF-8 bytes in the request body survive the JSON roundtrip (base64-encoded).""" + original = Request( + url='https://example.com', + method='POST', + body=b'\x00\x01\x02\xff\xfe binary', + headers={'Content-Type': 'application/octet-stream'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.body == b'\x00\x01\x02\xff\xfe binary' + assert restored.headers.get('Content-Type') == b'application/octet-stream' + + +def test_binary_headers_round_trip_and_request_not_dropped(spider: Spider) -> None: + """A request with non-UTF-8 header values is not dropped; the exact header bytes survive the roundtrip.""" + original = Request( + url='https://example.com', + headers={b'Accept': b'text/html', b'X-Bin': b'\xff\xfe\x00'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None # must not be dropped + + restored = to_scrapy_request(apify_request, spider) + assert restored.headers.get(b'X-Bin') == b'\xff\xfe\x00' + assert restored.headers.get(b'Accept') == b'text/html' + + +def test_userdata_with_b64_sentinel_key_round_trips(spider: Spider) -> None: + """No in-band sentinel is used, so a user dict like `{'__b64__': ...}` in `meta` round-trips unchanged.""" + original = Request( + url='https://example.com', + meta={'userData': {}, 'looks_like_sentinel': {'__b64__': 'not really base64 !!!'}}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.meta['looks_like_sentinel'] == {'__b64__': 'not really base64 !!!'} + + +def test_already_imported_request_subclass_round_trips(spider: Spider) -> None: + """A `_class` referring to an already-imported `scrapy.Request` subclass is reconstructed.""" + original = FormRequest(url='https://example.com', formdata={'key': 'value'}) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, FormRequest) + assert restored.method == 'POST' + + +def test_non_request_class_is_rejected(spider: Spider) -> None: + """A `_class` resolving to a non-`scrapy.Request` type (here a plain `dict`) is rejected before use.""" + request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'builtins.dict', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match=r'not a scrapy\.Request subclass'): + to_scrapy_request(apify_request, spider) + + +def test_unresolvable_class_is_rejected(spider: Spider) -> None: + """A `_class` whose dotted path cannot be resolved (no such module) is rejected with a clear error.""" + request_dict = { + 'url': 'https://example.com', + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'definitely_not_imported_pkg.some_module.SomeRequest', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='cannot resolve'): + to_scrapy_request(apify_request, spider) + + +def test_custom_request_subclass_reconstructed_after_migration(spider: Spider) -> None: + """Actor-migration case: a not-yet-imported custom `Request` subclass is reconstructed by importing it.""" + module_name = 'tests.unit.scrapy.requests._custom_request_module' + request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'body': b'', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': f'{module_name}.CustomRequest', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + # Simulate a fresh process where the subclass's module has not been imported yet. + sys.modules.pop(module_name, None) + + restored = to_scrapy_request(apify_request, spider) + + from tests.unit.scrapy.requests._custom_request_module import CustomRequest + + assert isinstance(restored, CustomRequest) + assert module_name in sys.modules # it was imported on demand during reconstruction + + +def test_non_apify_request_input_is_rejected(spider: Spider) -> None: + """`to_scrapy_request` raises on non-Apify-request input (unlike `to_apify_request`, which returns None).""" + with pytest.raises(TypeError, match='must be an apify'): + to_scrapy_request('not an apify request', spider) # ty: ignore[invalid-argument-type] + + +def test_non_string_stored_scrapy_request_is_rejected(spider: Spider) -> None: + """A stored `scrapy_request` that is not a string is rejected before any JSON decoding.""" + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': {'unexpected': 'dict'}}, + ) + + with pytest.raises(TypeError, match='must be a string'): + to_scrapy_request(apify_request, spider) + + +def test_stored_scrapy_request_decoding_to_non_dict_is_rejected(spider: Spider) -> None: + """A stored `scrapy_request` that is valid JSON but not a dict (here a JSON array) is rejected.""" + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': '[1, 2, 3]'}, + ) + + with pytest.raises(TypeError, match='must be a dictionary'): + to_scrapy_request(apify_request, spider) + + +def test_non_string_class_is_rejected(spider: Spider) -> None: + """A `_class` entry that is not a string is rejected before any class resolution.""" + request_dict = { + 'url': 'https://example.com', + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 123, + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='expected a string'): to_scrapy_request(apify_request, spider) diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index 3e0db8206..c4a87622c 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from types import SimpleNamespace from typing import cast from unittest import mock @@ -24,11 +25,7 @@ def spider() -> DummySpider: @pytest.fixture def scheduler(monkeypatch: pytest.MonkeyPatch, spider: DummySpider) -> ApifyScheduler: - """Create a scheduler with its reactor check and async thread stubbed out. - - The request queue is a plain mock that satisfies the `isinstance` checks; the `run_coro` results - are set per test via the mocked async thread. - """ + """Create a scheduler with its reactor check and async thread stubbed out.""" monkeypatch.setattr('apify.scrapy.scheduler.is_asyncio_reactor_installed', lambda: True) monkeypatch.setattr('apify.scrapy.scheduler.AsyncThread', mock.MagicMock()) @@ -42,10 +39,62 @@ def scheduler(monkeypatch: pytest.MonkeyPatch, spider: DummySpider) -> ApifySche return scheduler +def test_has_pending_requests_reflects_queue_state(scheduler: ApifyScheduler) -> None: + """`has_pending_requests` is True while the queue is not finished and False once it is.""" + async_thread = cast('mock.MagicMock', scheduler._async_thread) + + async_thread.run_coro.return_value = False # the queue still has work + assert scheduler.has_pending_requests() is True + + async_thread.run_coro.return_value = True # the queue is drained + assert scheduler.has_pending_requests() is False + + +def test_enqueue_request_skips_non_serializable_request( + scheduler: ApifyScheduler, + caplog: pytest.LogCaptureFixture, +) -> None: + """A request that cannot be converted (non-serializable meta) is not enqueued: returns False and logs an error.""" + rq = cast('mock.MagicMock', scheduler._rq) + + # A set in `meta` is not JSON-serializable, so `to_apify_request` returns None. + scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}}) + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'): + result = scheduler.enqueue_request(scrapy_request) + + assert result is False + assert 'could not be converted' in caplog.text + rq.add_request.assert_not_called() + + +def test_enqueue_request_enqueues_converted_request(scheduler: ApifyScheduler) -> None: + """A convertible request is enqueued and reported as newly added when the queue had not seen it.""" + rq = cast('mock.MagicMock', scheduler._rq) + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.return_value = SimpleNamespace(was_already_present=False) + + result = scheduler.enqueue_request(Request(url='https://example.com')) + + assert result is True + rq.add_request.assert_called_once() + + +def test_enqueue_request_returns_false_for_duplicate(scheduler: ApifyScheduler) -> None: + """A request already present in the queue is reported as not newly enqueued (returns False).""" + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.return_value = SimpleNamespace(was_already_present=True) + + result = scheduler.enqueue_request(Request(url='https://example.com')) + + assert result is False + + def test_next_request_skips_request_that_fails_to_convert( scheduler: ApifyScheduler, caplog: pytest.LogCaptureFixture, ) -> None: + """A queue entry that fails to reconstruct is skipped and still marked handled, not retried forever.""" rq = cast('mock.MagicMock', scheduler._rq) async_thread = cast('mock.MagicMock', scheduler._async_thread) @@ -67,11 +116,13 @@ def test_next_request_skips_request_that_fails_to_convert( assert result is None assert 'skipping it' in caplog.text - # It was still marked as handled before the failed conversion, so it is not retried forever. + # Even though conversion failed, the unrecoverable entry is still marked as handled (consumed), so + # the queue does not keep handing it back forever. rq.mark_request_as_handled.assert_called_once_with(malformed_request) def test_next_request_returns_converted_request(scheduler: ApifyScheduler) -> None: + """A valid queue entry is reconstructed into a Scrapy request and marked handled.""" rq = cast('mock.MagicMock', scheduler._rq) async_thread = cast('mock.MagicMock', scheduler._async_thread) @@ -88,3 +139,15 @@ def test_next_request_returns_converted_request(scheduler: ApifyScheduler) -> No assert isinstance(result, Request) assert result.url == apify_request.url rq.mark_request_as_handled.assert_called_once_with(apify_request) + + +def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) -> None: + """An empty queue makes `next_request` return None and skip marking anything as handled.""" + rq = cast('mock.MagicMock', scheduler._rq) + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.return_value = None + + result = scheduler.next_request() + + assert result is None + rq.mark_request_as_handled.assert_not_called() diff --git a/tests/unit/scrapy/test_serialization.py b/tests/unit/scrapy/test_serialization.py new file mode 100644 index 000000000..e290af986 --- /dev/null +++ b/tests/unit/scrapy/test_serialization.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest +from pydantic import BaseModel, Field + +from apify.scrapy._serialization import _MAX_ERROR_VALUE_REPR_LEN, decode_from_json, encode_to_json + + +def _round_trip(data: dict) -> dict: + """Encode then decode a dict through the serializer.""" + decoded = decode_from_json(encode_to_json(data)) + assert isinstance(decoded, dict) + return decoded + + +class _LongRepr: + """A helper whose repr is long enough to trigger truncation in a serialization error message.""" + + def __repr__(self) -> str: + return 'x' * 500 + + +def test_bytes_body_round_trips() -> None: + """A binary `body` round-trips unchanged (base64-encoded inside the JSON).""" + assert _round_trip({'body': b'\x00\x01\xff binary'})['body'] == b'\x00\x01\xff binary' + + +def test_str_body_round_trips_to_bytes() -> None: + """A `str` body is encoded as its UTF-8 bytes and comes back as `bytes` (Scrapy stores bytes).""" + assert _round_trip({'body': 'hello'})['body'] == b'hello' + + +def test_empty_str_body_round_trips() -> None: + """An empty `str` body round-trips to empty `bytes`.""" + assert _round_trip({'body': ''})['body'] == b'' + + +def test_bytes_headers_round_trip() -> None: + """Scrapy-style `{bytes: [bytes]}` headers, including binary values, round-trip unchanged.""" + data = {'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}} + assert _round_trip(data)['headers'] == {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']} + + +def test_str_header_value_round_trips_to_bytes() -> None: + """A `str` header value is encoded as its UTF-8 bytes and comes back as `bytes` (Scrapy uses bytes).""" + assert _round_trip({'headers': {b'Content-Type': ['text/html']}})['headers'] == {b'Content-Type': [b'text/html']} + + +def test_bare_header_value_is_normalized_to_list() -> None: + """A header value not wrapped in a list is normalized to a single-element list on round-trip.""" + assert _round_trip({'headers': {b'X-Single': b'one'}})['headers'] == {b'X-Single': [b'one']} + + +def test_non_ascii_is_not_escaped() -> None: + """Non-ASCII text stays in its UTF-8 form instead of ASCII escape sequences, which would bloat storage.""" + encoded = encode_to_json({'meta': {'name': 'Ñoño café 日本語'}}) + assert 'Ñoño café 日本語' in encoded + assert '\\u' not in encoded + assert decode_from_json(encoded)['meta']['name'] == 'Ñoño café 日本語' + + +def test_pydantic_model_is_dumped_by_alias() -> None: + """A pydantic model in `meta` is dumped to a plain dict using its serialization aliases.""" + + class Model(BaseModel): + first: int = Field(serialization_alias='First') + + encoded = encode_to_json({'meta': {'m': Model(first=1)}}) + assert decode_from_json(encoded)['meta']['m'] == {'First': 1} + + +def test_tuple_is_coerced_to_list() -> None: + """Documented limitation: JSON has no tuple type, so a tuple round-trips as a list.""" + assert _round_trip({'meta': {'coords': (1, 2, 3)}})['meta']['coords'] == [1, 2, 3] + + +def test_non_string_dict_keys_are_coerced_to_strings() -> None: + """Documented limitation: JSON object keys are strings, so `{1: 'a'}` round-trips as `{'1': 'a'}`.""" + assert _round_trip({'cb_kwargs': {'m': {1: 'a'}}})['cb_kwargs']['m'] == {'1': 'a'} + + +def test_non_serializable_value_raises_with_type_and_repr() -> None: + """A value JSON cannot represent raises a `TypeError` naming the offending type and value.""" + when = datetime(2020, 1, 2, 3, 4, 5, tzinfo=UTC) + with pytest.raises(TypeError) as exc_info: + encode_to_json({'meta': {'when': when}}) + + # The user-facing message points at meta/cb_kwargs; the chained cause names the offending value. + assert 'JSON-serializable' in str(exc_info.value) + assert 'datetime' in str(exc_info.value.__cause__) + + +def test_long_value_repr_is_truncated_in_error() -> None: + """A non-serializable value with a very long repr is truncated (with an ellipsis) so it cannot bloat the log.""" + with pytest.raises(TypeError) as exc_info: + encode_to_json({'meta': {'big': _LongRepr()}}) + + cause = str(exc_info.value.__cause__) + assert cause.endswith('...') + assert cause.count('x') == _MAX_ERROR_VALUE_REPR_LEN + + +def test_encode_rejects_non_dict() -> None: + """Encoding a non-dict top-level value raises `TypeError`.""" + with pytest.raises(TypeError, match='Expected a dict'): + encode_to_json(['not', 'a', 'dict']) # ty: ignore[invalid-argument-type] + + +def test_decode_of_non_dict_json_returns_value_as_is() -> None: + """JSON that decodes to a non-dict (a list, a bare string) is returned unchanged, not coerced or rejected.""" + assert decode_from_json('[1, 2, 3]') == [1, 2, 3] + assert decode_from_json('"hello"') == 'hello' + + +def test_decode_rejects_non_base64_body() -> None: + """A non-base64 `body` is rejected loudly (the `validate=True` guard), not silently decoded to garbage.""" + with pytest.raises(ValueError, match='base64'): + decode_from_json('{"body": "not valid base64 !!!"}') + + +def test_decode_rejects_non_base64_header_value() -> None: + """A non-base64 header value is rejected loudly rather than silently decoded to garbage.""" + with pytest.raises(ValueError, match='base64'): + decode_from_json('{"headers": {"X-Test": ["not valid base64 !!!"]}}')