diff --git a/docs/03_guides/06_scrapy.mdx b/docs/03_guides/06_scrapy.mdx
index 12525609a..b1b81c1e7 100644
--- a/docs/03_guides/06_scrapy.mdx
+++ b/docs/03_guides/06_scrapy.mdx
@@ -46,7 +46,7 @@ The Apify SDK provides several custom components to support integration with the
- `apify.scrapy.ApifyScheduler` - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests.
- `apify.scrapy.ActorDatasetPushPipeline` - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset.
- `apify.scrapy.ApifyHttpProxyMiddleware` - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service.
-- `apify.scrapy.extensions.ApifyCacheStorage` - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work.
+- `apify.scrapy.extensions.ApifyCacheStorage` - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). To enable caching, set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings. By default, when the spider closes, up to 100 expired and unreadable entries per run are cleaned up. To change this number, update `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS`.
Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include:
diff --git a/docs/04_upgrading/upgrading_to_v4.md b/docs/04_upgrading/upgrading_to_v4.md
index a69c84882..32cc13ca7 100644
--- a/docs/04_upgrading/upgrading_to_v4.md
+++ b/docs/04_upgrading/upgrading_to_v4.md
@@ -198,3 +198,36 @@ await client.run('my-run').charge('my-event', count=5)
### Async `iterate_*` are no longer coroutine functions
`DatasetClientAsync.iterate_items()` and `KeyValueStoreClientAsync.iterate_keys()` are now plain `def` functions returning `AsyncIterator[T]`. Consumer code (`async for ...`) is unchanged; if you annotate the call's return value, change `AsyncGenerator[T, None]` to `AsyncIterator[T]`.
+
+## Scrapy requests and HTTP cache stored as JSON
+
+This applies only if you use the Scrapy integration (`apify[scrapy]`).
+
+The integration now serializes data as JSON instead of pickle. `ApifyScheduler` stores Scrapy requests in the Apify request queue (under `user_data['scrapy_request']`), and `ApifyCacheStorage` stores HTTP cache entries in the key-value store. Both now hold JSON. Unlike pickle, JSON stays stable across Python and library versions.
+
+### Persisted data from before the upgrade is not read back
+
+Data written by an older SDK uses the pickle format, which v4 does not load. The two storages handle this differently:
+
+- HTTP cache: a legacy entry is treated as a cache miss. Scrapy re-fetches the page and re-stores it as JSON, so the cache heals itself. No action is needed.
+- Request queue: a request stored by an older SDK cannot be reconstructed, so it is skipped and the failure is logged. This matters only when pre-upgrade requests are still in the queue, for example after a run is migrated or restarted, or when you reuse a named request queue. A fresh run is not affected.
+
+### `meta` and `cb_kwargs` must be JSON-serializable
+
+Pickle could store arbitrary Python objects. JSON cannot, so the values in a request's `meta` and `cb_kwargs` are now subject to JSON's type system:
+
+- A `tuple` comes back as a `list`.
+- Non-string `dict` keys come back as strings, so `{1: 'a'}` becomes `{'1': 'a'}`.
+- A value JSON cannot represent (`datetime`, `set`, `Decimal`, a custom object) is no longer stored silently. The request is skipped and the failure is logged. Pydantic models are still supported and are dumped with `model_dump()`.
+
+Convert such values to a JSON-friendly form before yielding the request:
+
+```python
+from datetime import datetime
+
+# Before (v3): relied on pickle to store the datetime object.
+yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1)})
+
+# After (v4): store a JSON-serializable value.
+yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1).isoformat()})
+```
diff --git a/src/apify/scrapy/_serialization.py b/src/apify/scrapy/_serialization.py
new file mode 100644
index 000000000..0cd66b35d
--- /dev/null
+++ b/src/apify/scrapy/_serialization.py
@@ -0,0 +1,138 @@
+"""JSON serialization of Scrapy requests and cached responses for storage on the Apify platform.
+
+Scrapy requests and cached responses are stored in the Apify request queue and key-value store which hold JSON,
+so they are serialized as JSON here rather than pickled.
+
+Only `body` (`bytes`) and `headers` (`{bytes: [bytes]}`) are not natively JSON-serializable; both sit at fixed keys
+and are base64-encoded in place. A `str` `body` is encoded as its UTF-8 bytes and comes back as `bytes`, matching
+Scrapy, which always stores `body` as `bytes`. Pydantic models such as Crawlee's `UserData` are dumped via
+`model_dump()`. Everything else, notably `meta` and `cb_kwargs`, must already be JSON-serializable, otherwise
+serialization fails with a clear error naming the offending value. No in-band sentinel is used, so no user value
+can collide with the encoding.
+
+Known limitations of the pickle -> JSON switch (a documented breaking change): JSON has fewer types than pickle,
+so values in `meta`/`cb_kwargs` are subject to JSON's coercions. A `tuple` round-trips as a `list` and non-string
+`dict` keys round-trip as strings (e.g. `{1: 'a'}` becomes `{'1': 'a'}`). Values JSON cannot represent at all
+(`datetime`, `set`, `Decimal`, arbitrary objects, ...) are not coerced silently: serialization raises and the request
+is skipped loudly rather than stored in a corrupted form.
+"""
+
+from __future__ import annotations
+
+import base64
+import json
+from typing import Any
+
+from pydantic import BaseModel
+
+# Cap the offending value's repr in a serialization error message so a huge value cannot bloat the log.
+_MAX_ERROR_VALUE_REPR_LEN = 200
+
+
+def encode_to_json(data: dict[str, Any]) -> str:
+ """Serialize a Scrapy request/response dict to a JSON string.
+
+ The `body` and `headers` fields are base64-encoded in place (a `str` `body` via its UTF-8 bytes); pydantic
+ models are dumped to plain dicts. A `TypeError` is raised if any other value cannot be JSON-encoded.
+
+ Args:
+ data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`.
+
+ Returns:
+ The JSON-encoded string.
+ """
+ if not isinstance(data, dict):
+ raise TypeError(f'Expected a dict to serialize, got {type(data)}')
+
+ safe = dict(data)
+
+ # `body` is base64-encoded so binary payloads survive; a `str` body is taken as its UTF-8 bytes, which keeps
+ # encode/decode symmetric (decode always base64-decodes `body` back to `bytes`).
+ body = safe.get('body')
+ if isinstance(body, (bytes, str)):
+ raw_body = body.encode('utf-8') if isinstance(body, str) else body
+ safe['body'] = base64.b64encode(raw_body).decode('ascii')
+
+ if isinstance(safe.get('headers'), dict):
+ safe['headers'] = _encode_headers(safe['headers'])
+
+ try:
+ # `ensure_ascii=False` keeps non-ASCII URLs/meta as their UTF-8 form instead of `\uXXXX` escapes, which
+ # would otherwise roughly double the size of non-Latin text in storage.
+ return json.dumps(safe, default=_json_default, ensure_ascii=False)
+ except TypeError as exc:
+ raise TypeError(
+ 'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. '
+ 'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, '
+ 'list, dict, or a pydantic model).'
+ ) from exc
+
+
+def decode_from_json(text: str) -> Any:
+ """Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`.
+
+ The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation.
+
+ Args:
+ text: The JSON-encoded string.
+
+ Returns:
+ The decoded object (a dict for valid request/response payloads).
+ """
+ data = json.loads(text)
+ if not isinstance(data, dict):
+ return data
+
+ # `validate=True` makes a non-base64 body raise loudly instead of silently decoding to garbage.
+ if isinstance(data.get('body'), str):
+ data['body'] = base64.b64decode(data['body'], validate=True)
+
+ if isinstance(data.get('headers'), dict):
+ data['headers'] = _decode_headers(data['headers'])
+
+ return data
+
+
+def _json_default(obj: Any) -> Any:
+ """Fallback for values `json.dumps` cannot serialize: pydantic models are dumped, anything else raises.
+
+ The error names the offending value (type and a truncated repr) so a failed serialization points straight
+ at the bad `meta`/`cb_kwargs` entry instead of just reporting that something failed.
+ """
+ if isinstance(obj, BaseModel):
+ return obj.model_dump(by_alias=True)
+ value_repr = repr(obj)
+ if len(value_repr) > _MAX_ERROR_VALUE_REPR_LEN:
+ value_repr = value_repr[:_MAX_ERROR_VALUE_REPR_LEN] + '...'
+ raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable: {value_repr}')
+
+
+def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]:
+ """Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`."""
+ encoded: dict[str, list[str]] = {}
+ for key, value in headers.items():
+ str_key = key.decode('latin-1') if isinstance(key, bytes) else key
+ values = value if isinstance(value, (list, tuple)) else [value]
+ encoded[str_key] = [_b64encode_value(item) for item in values]
+ return encoded
+
+
+def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]:
+ """Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects."""
+ decoded: dict[bytes, list[bytes]] = {}
+ for key, value in headers.items():
+ bytes_key = key.encode('latin-1') if isinstance(key, str) else key
+ values = value if isinstance(value, list) else [value]
+ decoded[bytes_key] = [base64.b64decode(item, validate=True) for item in values]
+ return decoded
+
+
+def _b64encode_value(value: Any) -> str:
+ """Base64-encode a single header value.
+
+ Scrapy stores header values as `bytes`; a `str` is encoded as its UTF-8 bytes. Any other type is coerced with
+ `str()` as a lenient last resort. That coercion is lossy (e.g. `5` becomes `b'5'`), but Scrapy does not produce
+ non-`bytes`/`str` header values, so it is not hit on the real path.
+ """
+ raw = value if isinstance(value, bytes) else str(value).encode('utf-8')
+ return base64.b64encode(raw).decode('ascii')
diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py
index 14d8753d3..0909c583b 100644
--- a/src/apify/scrapy/extensions/_httpcache.py
+++ b/src/apify/scrapy/extensions/_httpcache.py
@@ -2,7 +2,6 @@
import gzip
import io
-import pickle
import re
import struct
from logging import getLogger
@@ -14,6 +13,7 @@
from apify import Configuration
from apify.scrapy._async_thread import AsyncThread
+from apify.scrapy._serialization import decode_from_json, encode_to_json
from apify.storage_clients import ApifyStorageClient
from apify.storages import KeyValueStore
@@ -29,14 +29,14 @@
class ApifyCacheStorage:
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.
- It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches
- responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`)
- in the Scrapy documentation for more information. Requires the asyncio Twisted reactor
- to be installed.
+ It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches responses to requests.
+ See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) in the Scrapy documentation for more information.
+ Requires the asyncio Twisted reactor to be installed.
"""
def __init__(self, settings: BaseSettings) -> None:
- self._expiration_max_items = 100
+ # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
+ self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
self._spider: Spider | None = None
self._kvs: KeyValueStore | None = None
@@ -79,23 +79,26 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None:
async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
- i = 0
+ # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
+ # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
+ # an expired entry as a cache miss.
+ processed = 0
async for item in self._kvs.iterate_keys():
+ if processed >= self._expiration_max_items:
+ break
+ processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
- await self._kvs.set_value(item.key, None)
+ await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
- await self._kvs.set_value(item.key, None)
+ await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')
- if i == self._expiration_max_items:
- break
- i += 1
self._async_thread.run_coro(expire_kvs())
@@ -127,17 +130,25 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
if current_time is None:
current_time = int(time())
- if 0 < self._expiration_secs < current_time - read_gzip_time(value):
- logger.debug('Cache expired', extra={'request': request})
+
+ # A malformed or legacy cache entry must not crash retrieval; treat it as a cache miss so Scrapy re-fetches
+ # and re-stores it in the current format. The field reads stay inside the `try` as well: a value that decodes
+ # to a dict missing any expected key (a forward/older format, or a truncated-but-valid JSON payload) must
+ # also degrade to a miss rather than raising an uncaught `KeyError`.
+ try:
+ if 0 < self._expiration_secs < current_time - read_gzip_time(value):
+ logger.debug('Cache expired', extra={'request': request})
+ return None
+ data = from_gzip(value)
+ url = data['url']
+ status = data['status']
+ headers = Headers(data['headers'])
+ body = data['body']
+ except Exception as exc:
+ logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request})
return None
- data = from_gzip(value)
- url = data['url']
- status = data['status']
- headers = Headers(data['headers'])
- body = data['body']
respcls = responsetypes.from_args(headers=headers, url=url, body=body)
-
logger.debug('Cache hit', extra={'request': request})
return respcls(url=url, headers=headers, status=status, body=body)
@@ -162,18 +173,25 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
- """Dump a dictionary to a gzip-compressed byte stream."""
+ """Dump a dictionary to a gzip-compressed JSON byte stream.
+
+ Cache entries live in the Apify key-value store, which holds JSON, so they are serialized as JSON rather
+ than pickled. See `apify.scrapy._serialization` for the encoding.
+ """
+ payload = encode_to_json(data).encode('utf-8')
with io.BytesIO() as byte_stream:
with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file:
- pickle.dump(data, gzip_file, protocol=4)
+ gzip_file.write(payload)
return byte_stream.getvalue()
def from_gzip(gzip_bytes: bytes) -> dict:
- """Load a dictionary from a gzip-compressed byte stream."""
+ """Load a dictionary from a gzip-compressed JSON byte stream."""
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file:
- data: dict = pickle.load(gzip_file)
- return data
+ data = decode_from_json(gzip_file.read().decode('utf-8'))
+ if not isinstance(data, dict):
+ raise TypeError(f'Expected a dict from the cached payload, got {type(data)}')
+ return data
def read_gzip_time(gzip_bytes: bytes) -> int:
@@ -187,17 +205,15 @@ def read_gzip_time(gzip_bytes: bytes) -> int:
def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
"""Get the key value store name for a spider.
- The key value store name is derived from the spider name by replacing all special characters
- with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with
- 'httpcache-' and truncated to the maximum length.
+ The key value store name is derived from the spider name by replacing all special characters with hyphens
+ and trimming leading and trailing hyphens. The resulting name is prefixed with 'httpcache-' and truncated
+ to the maximum length.
- The documentation
- [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
+ The documentation [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
mentions that names can be up to 63 characters long, so the default max length is set to 60.
- Such naming isn't unique per spider, but should be sufficiently unique for most use cases.
- The name of the key value store should indicate to which spider it belongs, e.g. in
- the listing in the Apify's console.
+ Such naming isn't unique per spider, but should be sufficiently unique for most use cases. The name
+ of the key-value store should indicate to which spider it belongs, e.g. in the listing in the Apify's console.
Args:
spider_name: Value of the Spider instance's name attribute.
diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py
index b5ab0d498..38d6648df 100644
--- a/src/apify/scrapy/requests.py
+++ b/src/apify/scrapy/requests.py
@@ -1,23 +1,51 @@
from __future__ import annotations
-import codecs
-import pickle
from logging import getLogger
from typing import Any, cast
from scrapy import Request as ScrapyRequest
from scrapy import Spider
from scrapy.http.headers import Headers
+from scrapy.utils.misc import load_object
from scrapy.utils.request import request_from_dict
from crawlee._request import UserData
from crawlee._types import HttpHeaders
+from ._serialization import decode_from_json, encode_to_json
from apify import Request as ApifyRequest
logger = getLogger(__name__)
+def _ensure_known_request_class(request_dict: dict[str, Any]) -> None:
+ """Validate the optional `_class` entry before `request_from_dict` instantiates it.
+
+ `request_from_dict` resolves `_class` with `load_object` and calls it with the request kwargs. The dotted path
+ is resolved here first and rejected unless it is a `scrapy.Request` subclass, so a payload can never coerce
+ reconstruction into instantiating an arbitrary callable. Resolving may import the class's module (the same
+ import `request_from_dict` would do, and far safer than the arbitrary code execution the previous pickle
+ format allowed). That import is what lets a custom `Request` subclass be rebuilt in a fresh process after
+ an Actor migration, before the spider has lazily imported it.
+ """
+ class_path = request_dict.get('_class')
+ if class_path is None:
+ return
+
+ if not isinstance(class_path, str):
+ raise TypeError(f'Invalid scrapy_request `_class`, expected a string, got {type(class_path)}')
+
+ try:
+ request_cls = load_object(class_path)
+ except (ImportError, AttributeError, ValueError, NameError) as exc:
+ raise TypeError(f'Refusing to reconstruct a Scrapy request: cannot resolve `_class` {class_path!r}.') from exc
+
+ if not (isinstance(request_cls, type) and issubclass(request_cls, ScrapyRequest)):
+ raise TypeError(
+ f'Refusing to reconstruct a Scrapy request of type {class_path!r}: it is not a scrapy.Request subclass.'
+ )
+
+
def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequest | None:
"""Convert a Scrapy request to an Apify request.
@@ -35,6 +63,9 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ
logger.debug(f'to_apify_request was called (scrapy_request={scrapy_request})...')
# Configuration to behave as similarly as possible to Scrapy's default RFPDupeFilter.
+ #
+ # The body is stored twice on purpose: as `payload` (used for the extended unique key) and inside the serialized
+ # Scrapy request below (used to reconstruct it). Both come from `scrapy_request.body`.
request_kwargs: dict[str, Any] = {
'url': scrapy_request.url,
'method': scrapy_request.method,
@@ -46,50 +77,69 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ
try:
if scrapy_request.dont_filter:
request_kwargs['always_enqueue'] = True
- else:
- if scrapy_request.meta.get('apify_request_unique_key'):
- request_kwargs['unique_key'] = scrapy_request.meta['apify_request_unique_key']
-
- if scrapy_request.meta.get('apify_request_id'):
- request_kwargs['id'] = scrapy_request.meta['apify_request_id']
+ elif scrapy_request.meta.get('apify_request_unique_key'):
+ request_kwargs['unique_key'] = scrapy_request.meta['apify_request_unique_key']
user_data = scrapy_request.meta.get('userData', {})
- # Convert UserData Pydantic model to a plain dict to prevent CrawleeRequestData objects
- # from leaking into Request.from_url() during Scrapy-Apify roundtrips.
+ # Convert UserData Pydantic model to a plain dict to prevent CrawleeRequestData objects from leaking
+ # into Request.from_url() during Scrapy-Apify roundtrips.
if isinstance(user_data, UserData):
user_data = user_data.model_dump(by_alias=True)
- # Remove internal Crawlee data since it's managed by Request.from_url() and values
- # from previous roundtrips cause incorrect state.
+ # Remove internal Crawlee data since it's managed by Request.from_url() and values from previous roundtrips
+ # cause incorrect state.
if isinstance(user_data, dict):
user_data.pop('__crawlee', None)
request_kwargs['user_data'] = user_data if isinstance(user_data, dict) else {}
- # Convert Scrapy's headers to a HttpHeaders and store them in the apify_request
+ # Store an Apify-platform view of the headers. The authoritative copy with exact bytes travels in
+ # the serialized scrapy_request below, so non-UTF-8 headers (which make `to_unicode_dict()` raise) are
+ # tolerated rather than dropping the whole request.
+ #
+ # Trade-off: with `use_extended_unique_key=True` the unique key includes the headers, so when non-UTF-8
+ # headers are omitted here two requests differing only in those headers share a unique key and one is
+ # deduplicated away. This is rare (header values are normally ASCII/UTF-8) and still strictly better than
+ # the old behavior, which dropped such requests entirely.
if isinstance(scrapy_request.headers, Headers):
- headers = cast('dict[str, str]', dict(scrapy_request.headers.to_unicode_dict()))
- request_kwargs['headers'] = HttpHeaders(headers)
+ try:
+ headers = cast('dict[str, str]', dict(scrapy_request.headers.to_unicode_dict()))
+ request_kwargs['headers'] = HttpHeaders(headers)
+ except UnicodeDecodeError:
+ logger.warning(
+ 'Could not represent Scrapy request headers as Apify request headers (non-UTF-8 values); '
+ 'they are preserved in the serialized request instead.'
+ )
else:
logger.warning(
f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}'
)
apify_request = ApifyRequest.from_url(**request_kwargs)
-
- # Serialize the Scrapy ScrapyRequest and store it in the apify_request.
- # - This process involves converting the Scrapy ScrapyRequest object into a dictionary, encoding it to base64,
- # and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request.
- # - The serialization process can be referenced at: https://stackoverflow.com/questions/30469575/.
scrapy_request_dict = scrapy_request.to_dict(spider=spider)
- scrapy_request_dict_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode()
- apify_request.user_data['scrapy_request'] = scrapy_request_dict_encoded
except Exception as exc:
logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}')
return None
+ # Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so
+ # a non-JSON-serializable `meta`/`cb_kwargs` is logged with a traceback and the request skipped (returning
+ # None per this function's contract), rather than crashing the crawl.
+ try:
+ scrapy_request_json = encode_to_json(scrapy_request_dict)
+ except TypeError:
+ logger.exception(
+ f'Failed to serialize Scrapy request {scrapy_request} for storage on the Apify platform; skipping it. '
+ 'Ensure all values in `meta` and `cb_kwargs` are JSON-serializable.'
+ )
+ return None
+
+ # `scrapy_request_json` is already JSON-safe text (binary fields are base64-encoded inside it), so it is stored
+ # as-is. The request queue serializes `user_data` to JSON, which escapes the string correctly; wrapping it in
+ # a second base64 layer would only add ~33% overhead on the enqueue path.
+ apify_request.user_data['scrapy_request'] = scrapy_request_json
+
logger.debug(f'scrapy_request was converted to the apify_request={apify_request}')
return apify_request
@@ -102,32 +152,34 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ
spider: The Scrapy spider that the request is associated with.
Raises:
- TypeError: If the Apify request is not an instance of the `ApifyRequest` class.
- ValueError: If the Apify request does not contain the required keys.
+ TypeError: If `apify_request` is not an `ApifyRequest`, if the stored Scrapy request payload is malformed,
+ or if its `_class` cannot be resolved to a `scrapy.Request` subclass.
Returns:
The converted Scrapy request.
"""
if not isinstance(cast('Any', apify_request), ApifyRequest):
- raise TypeError('apify_request must be a crawlee.ScrapyRequest instance')
+ raise TypeError('apify_request must be an apify.Request instance')
logger.debug(f'to_scrapy_request was called (apify_request={apify_request})...')
# If the apify_request comes from the Scrapy
if 'scrapy_request' in apify_request.user_data:
- # Deserialize the Scrapy ScrapyRequest from the apify_request.
- # - This process involves decoding the base64-encoded request data and reconstructing
- # the Scrapy ScrapyRequest object from its dictionary representation.
+ # Deserialize the Scrapy ScrapyRequest from the apify_request by parsing the stored JSON and reconstructing
+ # the Scrapy ScrapyRequest object from its dictionary representation.
logger.debug('Restoring the Scrapy ScrapyRequest from the apify_request...')
- scrapy_request_dict_encoded = apify_request.user_data['scrapy_request']
- if not isinstance(scrapy_request_dict_encoded, str):
- raise TypeError('scrapy_request_dict_encoded must be a string')
+ scrapy_request_json = apify_request.user_data['scrapy_request']
+ if not isinstance(scrapy_request_json, str):
+ raise TypeError('the stored scrapy_request must be a string')
- scrapy_request_dict = pickle.loads(codecs.decode(scrapy_request_dict_encoded.encode(), 'base64'))
+ scrapy_request_dict = decode_from_json(scrapy_request_json)
if not isinstance(scrapy_request_dict, dict):
raise TypeError('scrapy_request_dict must be a dictionary')
+ # Validate any `_class` entry before request_from_dict resolves and imports it.
+ _ensure_known_request_class(scrapy_request_dict)
+
scrapy_request = request_from_dict(scrapy_request_dict, spider=spider)
if not isinstance(scrapy_request, ScrapyRequest):
raise TypeError('scrapy_request must be an instance of the ScrapyRequest class')
diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py
index 23db7b62b..2b95d30c8 100644
--- a/src/apify/scrapy/scheduler.py
+++ b/src/apify/scrapy/scheduler.py
@@ -162,20 +162,24 @@ def next_request(self) -> Request | None:
if not isinstance(self.spider, Spider):
raise TypeError('self.spider must be an instance of the Spider class')
- # Let the request queue know that the request is being handled. Every request should
- # be marked as handled, retrying is handled by the Scrapy's RetryMiddleware.
+ # Reconstruct the Scrapy request before consuming the queue entry. A malformed entry must not crash
+ # the whole run, so on failure it is logged and skipped (None) rather than propagating.
+ try:
+ scrapy_request = to_scrapy_request(apify_request, spider=self.spider)
+ except Exception:
+ logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.')
+ scrapy_request = None
+
+ # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry
+ # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back
+ # forever. Retrying genuine failures is the RetryMiddleware's job.
try:
self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request))
except Exception:
traceback.print_exc()
raise
- # Reconstruct the Scrapy request. A malformed queue entry must not crash the whole run: it
- # has already been marked handled above, so log it and skip it instead of propagating.
- try:
- scrapy_request = to_scrapy_request(apify_request, spider=self.spider)
- except Exception:
- logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.')
+ if scrapy_request is None:
return None
logger.debug(f'Converted to scrapy_request: {scrapy_request}')
diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py
index 8e250aa05..6f34853b5 100644
--- a/tests/unit/scrapy/extensions/test_httpcache.py
+++ b/tests/unit/scrapy/extensions/test_httpcache.py
@@ -1,44 +1,279 @@
+from __future__ import annotations
+
+import asyncio
+import gzip
+import io
+import json
+import pickle
from time import time
+from types import SimpleNamespace
+from typing import TYPE_CHECKING, Any, cast
import pytest
+from scrapy import Request
+from scrapy.http import HtmlResponse
+from scrapy.settings import Settings
+
+from apify.scrapy.extensions._httpcache import ApifyCacheStorage, from_gzip, get_kvs_name, read_gzip_time, to_gzip
-from apify.scrapy.extensions._httpcache import from_gzip, get_kvs_name, read_gzip_time, to_gzip
+if TYPE_CHECKING:
+ from collections.abc import AsyncIterator
FIXTURE_DICT = {'name': 'Alice'}
+# Gzip-compressed JSON (the pickle-free format) of FIXTURE_DICT with mtime=0.
FIXTURE_BYTES = (
- b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s'
- b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00'
+ b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xabV\xcaK\xccMU\xb2RPr\xcc\xc9LNU'
+ b'\xaa\x05\x00\x03\x9a\x9d\xb0\x11\x00\x00\x00'
)
def test_gzip() -> None:
+ """`to_gzip` then `from_gzip` round-trips a dict unchanged."""
assert from_gzip(to_gzip(FIXTURE_DICT)) == FIXTURE_DICT
def test_to_gzip() -> None:
+ """`to_gzip` produces the expected gzip-compressed JSON bytes for a fixed mtime."""
data_bytes = to_gzip(FIXTURE_DICT, mtime=0)
assert data_bytes == FIXTURE_BYTES
def test_from_gzip() -> None:
+ """`from_gzip` decodes gzip-compressed JSON bytes back into the original dict."""
data_dict = from_gzip(FIXTURE_BYTES)
assert data_dict == FIXTURE_DICT
def test_read_gzip_time() -> None:
+ """`read_gzip_time` reads the mtime stored in the gzip header (here 0)."""
assert read_gzip_time(FIXTURE_BYTES) == 0
def test_read_gzip_time_non_zero() -> None:
+ """`read_gzip_time` reads back a non-zero mtime written into the gzip header."""
current_time = int(time())
data_bytes = to_gzip(FIXTURE_DICT, mtime=current_time)
assert read_gzip_time(data_bytes) == current_time
+def test_gzip_round_trips_binary_response() -> None:
+ """A cached response with a binary body and Scrapy-style bytes headers round-trips."""
+ data = {
+ 'status': 200,
+ 'url': 'https://example.com',
+ 'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']},
+ 'body': b'\xff\xfe',
+ }
+
+ assert from_gzip(to_gzip(data)) == data
+
+
+def test_from_gzip_rejects_pickle_payload() -> None:
+ """Cache entries are gzip-compressed JSON, so a pickle payload is rejected rather than loaded."""
+ with io.BytesIO() as byte_stream:
+ with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file:
+ pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4)
+ pickle_payload = byte_stream.getvalue()
+
+ with pytest.raises((UnicodeDecodeError, json.JSONDecodeError, ValueError)):
+ from_gzip(pickle_payload)
+
+
+class _FakeAsyncThread:
+ def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
+ loop = asyncio.new_event_loop()
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+
+ def close(self) -> None:
+ """No-op; `close_spider` calls this when shutting the cache storage down."""
+
+
+class _FakeKvs:
+ def __init__(self, value: bytes | None) -> None:
+ self._value = value
+
+ async def get_value(self, _: str) -> bytes | None:
+ return self._value
+
+ async def set_value(self, _: str, value: bytes) -> None:
+ self._value = value
+
+
+class _FakeFingerprinter:
+ def fingerprint(self, _: Request) -> bytes:
+ return b'\xab\xcd'
+
+
+def _make_storage(value: bytes | None, *, expiration_secs: int = 0) -> ApifyCacheStorage:
+ storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': expiration_secs}))
+ storage._async_thread = _FakeAsyncThread() # ty: ignore[invalid-assignment]
+ storage._kvs = _FakeKvs(value) # ty: ignore[invalid-assignment]
+ storage._fingerprinter = _FakeFingerprinter() # ty: ignore[invalid-assignment]
+ return storage
+
+
+def test_retrieve_response_returns_cached_response() -> None:
+ """A stored gzip-JSON entry is returned as a reconstructed Scrapy response."""
+ data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'hello'}
+ storage = _make_storage(to_gzip(data))
+ response = storage.retrieve_response(None, Request('https://example.com')) # ty: ignore[invalid-argument-type]
+ assert response is not None
+ assert response.status == 200
+ assert response.body == b'hello'
+
+
+def test_retrieve_response_ignores_legacy_pickle_item() -> None:
+ """A legacy gzip-pickle entry degrades to a cache miss instead of raising."""
+ # A gzip-wrapped pickle payload is the legacy (pre-JSON) cache format that the JSON reader cannot
+ # load. After the upgrade, such an item must degrade to a cache miss instead of raising and breaking
+ # the download, so the cache self-heals (re-fetch and re-store as JSON) rather than crashing.
+ with io.BytesIO() as byte_stream:
+ with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file:
+ pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4)
+ legacy_pickle = byte_stream.getvalue()
+ storage = _make_storage(legacy_pickle)
+ assert storage.retrieve_response(None, Request('https://example.com')) is None # ty: ignore[invalid-argument-type]
+
+
+def test_retrieve_response_missing_key_is_cache_miss() -> None:
+ """A valid payload missing an expected field (here `url`) degrades to a cache miss, not a `KeyError`."""
+ value = to_gzip({'status': 200, 'headers': {}, 'body': b'x'}) # no 'url'
+ storage = _make_storage(value)
+ assert storage.retrieve_response(None, Request('https://example.com')) is None # ty: ignore[invalid-argument-type]
+
+
+def test_store_then_retrieve_round_trips_response() -> None:
+ """A stored response round-trips through the cache and is persisted as gzip-JSON, never pickle."""
+ storage = _make_storage(None)
+ request = Request('https://example.com')
+ response = HtmlResponse(
+ url='https://example.com',
+ status=200,
+ headers={'Content-Type': 'text/html'},
+ body=b'cached',
+ )
+
+ storage.store_response(None, request, response) # ty: ignore[invalid-argument-type]
+
+ # The persisted bytes are gzip-compressed JSON (decodable by `from_gzip`), never a pickle payload.
+ stored = cast('_FakeKvs', storage._kvs)._value
+ assert isinstance(stored, bytes)
+ assert from_gzip(stored)['body'] == b'cached'
+
+ retrieved = storage.retrieve_response(None, request) # ty: ignore[invalid-argument-type]
+ assert retrieved is not None
+ assert retrieved.status == 200
+ assert retrieved.url == 'https://example.com'
+ assert retrieved.body == b'cached'
+ assert retrieved.headers.get('Content-Type') == b'text/html'
+
+
+def test_retrieve_response_treats_expired_entry_as_miss() -> None:
+ """A cached entry older than `HTTPCACHE_EXPIRATION_SECS` is a cache miss at retrieval time."""
+ data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'x'}
+ storage = _make_storage(to_gzip(data, mtime=0), expiration_secs=100)
+ request = Request('https://example.com')
+ # Retrieved 1000s after the entry's mtime (0), well past the 100s expiration window.
+ assert storage.retrieve_response(None, request, current_time=1000) is None # ty: ignore[invalid-argument-type]
+
+
+def test_retrieve_response_returns_fresh_entry_within_expiration() -> None:
+ """An entry newer than `HTTPCACHE_EXPIRATION_SECS` is still a cache hit at retrieval time."""
+ data = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b'hello'}
+ storage = _make_storage(to_gzip(data, mtime=950), expiration_secs=100)
+ request = Request('https://example.com')
+ response = storage.retrieve_response(None, request, current_time=1000) # ty: ignore[invalid-argument-type]
+ assert response is not None
+ assert response.body == b'hello'
+
+
+class _KeyIterator:
+ """A plain async iterator over a snapshot of keys (not a generator, so an early `break` needs no close)."""
+
+ def __init__(self, keys: list[str]) -> None:
+ self._keys = keys
+ self._index = 0
+
+ def __aiter__(self) -> _KeyIterator:
+ return self
+
+ async def __anext__(self) -> Any:
+ if self._index >= len(self._keys):
+ raise StopAsyncIteration
+ key = self._keys[self._index]
+ self._index += 1
+ return SimpleNamespace(key=key)
+
+
+class _RecordingKvs:
+ """An in-memory key-value store double for the cleanup sweep that records deletions."""
+
+ def __init__(self, items: dict[str, bytes | None]) -> None:
+ self._items = dict(items)
+ self.deleted: list[str] = []
+
+ def iterate_keys(self) -> AsyncIterator[Any]:
+ # Snapshot the keys, mirroring the real clients (so deleting while iterating is safe).
+ return _KeyIterator(list(self._items))
+
+ async def get_value(self, key: str) -> bytes | None:
+ return self._items.get(key)
+
+ async def delete_value(self, key: str) -> None:
+ self.deleted.append(key)
+ self._items.pop(key, None)
+
+
+def _make_cleanup_storage(
+ items: dict[str, bytes | None],
+ *,
+ expiration_secs: int,
+ max_items: int = 100,
+) -> tuple[ApifyCacheStorage, _RecordingKvs]:
+ storage = ApifyCacheStorage(
+ Settings({'HTTPCACHE_EXPIRATION_SECS': expiration_secs, 'APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS': max_items})
+ )
+ storage._async_thread = _FakeAsyncThread() # ty: ignore[invalid-assignment]
+ kvs = _RecordingKvs(items)
+ storage._kvs = kvs # ty: ignore[invalid-assignment]
+ return storage, kvs
+
+
+def test_close_spider_deletes_expired_and_malformed_but_keeps_valid() -> None:
+ """The close-spider cleanup deletes expired and unreadable entries and leaves fresh ones in place."""
+ current_time = 1000
+ fresh = {'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b''}
+ items: dict[str, bytes | None] = {
+ 'expired': to_gzip(fresh, mtime=0), # age 1000s > expiration 100s
+ 'valid': to_gzip(fresh, mtime=current_time), # age 0s
+ 'malformed': b'bad', # too short for read_gzip_time to parse the mtime header
+ }
+ storage, kvs = _make_cleanup_storage(items, expiration_secs=100)
+
+ storage.close_spider(None, current_time=current_time) # ty: ignore[invalid-argument-type]
+
+ assert set(kvs.deleted) == {'expired', 'malformed'}
+
+
+def test_close_spider_respects_max_items() -> None:
+ """At most `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS` entries are swept per close (exactly max, not max+1)."""
+ current_time = 1000
+ expired = to_gzip({'status': 200, 'url': 'https://example.com', 'headers': {}, 'body': b''}, mtime=0)
+ items: dict[str, bytes | None] = {f'k{i}': expired for i in range(5)}
+ storage, kvs = _make_cleanup_storage(items, expiration_secs=100, max_items=2)
+
+ storage.close_spider(None, current_time=current_time) # ty: ignore[invalid-argument-type]
+
+ assert len(kvs.deleted) == 2
+
+
@pytest.mark.parametrize(
('spider_name', 'expected'),
[
@@ -55,6 +290,7 @@ def test_read_gzip_time_non_zero() -> None:
],
)
def test_get_kvs_name(spider_name: str, expected: str) -> None:
+ """A spider name is normalized into a valid `httpcache-` key-value store name."""
assert get_kvs_name(spider_name) == expected
@@ -67,5 +303,6 @@ def test_get_kvs_name(spider_name: str, expected: str) -> None:
],
)
def test_get_kvs_name_raises(spider_name: str) -> None:
+ """A spider name that normalizes to empty raises `ValueError`."""
with pytest.raises(ValueError, match=r'Unsupported spider name'):
assert get_kvs_name(spider_name)
diff --git a/tests/unit/scrapy/requests/_custom_request_module.py b/tests/unit/scrapy/requests/_custom_request_module.py
new file mode 100644
index 000000000..26f760db3
--- /dev/null
+++ b/tests/unit/scrapy/requests/_custom_request_module.py
@@ -0,0 +1,9 @@
+from __future__ import annotations
+
+from scrapy import Request
+
+
+class CustomRequest(Request):
+ """A custom Scrapy `Request` subclass living in its own module, so a test can drop it from `sys.modules`
+ and verify reconstruction imports it on demand (mimicking an Actor migrating to a fresh process).
+ """
diff --git a/tests/unit/scrapy/requests/test_to_apify_request.py b/tests/unit/scrapy/requests/test_to_apify_request.py
index 4616af646..482d5c5da 100644
--- a/tests/unit/scrapy/requests/test_to_apify_request.py
+++ b/tests/unit/scrapy/requests/test_to_apify_request.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import logging
from typing import cast
import pytest
@@ -22,6 +23,7 @@ def spider() -> DummySpider:
def test_creates_simple_request(spider: Spider) -> None:
+ """A simple Scrapy request converts to an Apify request carrying the serialized `scrapy_request`."""
scrapy_request = Request(url='https://example.com')
apify_request = to_apify_request(scrapy_request, spider)
@@ -34,6 +36,7 @@ def test_creates_simple_request(spider: Spider) -> None:
def test_handles_headers(spider: Spider) -> None:
+ """Scrapy request headers are carried onto the Apify request as `HttpHeaders`."""
scrapy_request_headers = Headers({'Authorization': 'Bearer access_token'})
scrapy_request = Request(url='https://example.com', headers=scrapy_request_headers)
@@ -45,6 +48,7 @@ def test_handles_headers(spider: Spider) -> None:
def test_without_id_and_unique_key(spider: Spider) -> None:
+ """A request without an id or unique key converts, preserving its user data."""
scrapy_request = Request(
url='https://example.com',
method='GET',
@@ -65,6 +69,7 @@ def test_without_id_and_unique_key(spider: Spider) -> None:
def test_with_id_and_unique_key(spider: Spider) -> None:
+ """An explicit `apify_request_unique_key` in `meta` becomes the Apify request's unique key."""
scrapy_request = Request(
url='https://example.com',
method='GET',
@@ -89,20 +94,26 @@ def test_with_id_and_unique_key(spider: Spider) -> None:
def test_invalid_scrapy_request_returns_none(spider: Spider) -> None:
+ """A non-Scrapy-request input returns None instead of raising."""
scrapy_request = 'invalid_request'
apify_request = to_apify_request(scrapy_request, spider) # ty: ignore[invalid-argument-type]
assert apify_request is None
-def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None:
- """Reproduce: CrawleeRequestData() argument after ** must be a mapping, not CrawleeRequestData.
+def test_non_json_serializable_meta_is_skipped(spider: Spider, caplog: pytest.LogCaptureFixture) -> None:
+ """A non-JSON-serializable value in `meta` is skipped (returns None) and logged, not crashing the crawl."""
+ scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}})
+
+ with caplog.at_level(logging.ERROR, logger='apify.scrapy.requests'):
+ apify_request = to_apify_request(scrapy_request, spider)
+
+ assert apify_request is None
+ assert any('JSON-serializable' in record.getMessage() for record in caplog.records)
+
- After two roundtrips through to_apify_request/to_scrapy_request with userData propagation,
- Request.from_url() writes a CrawleeRequestData object into UserData.__pydantic_extra__['__crawlee'].
- On the next roundtrip, this CrawleeRequestData object is found by user_data_dict.get('__crawlee')
- and passed to CrawleeRequestData(**obj), which fails because CrawleeRequestData is not a mapping.
- """
+def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None:
+ """Regression: propagating userData across repeated roundtrips must not fail on `__crawlee` data."""
# Step 1: Initial request -> first roundtrip
initial_scrapy_request = Request(url='https://example.com/page')
apify_request_1 = to_apify_request(initial_scrapy_request, spider)
@@ -127,3 +138,29 @@ def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) ->
follow_up_apify_request = to_apify_request(follow_up_2, spider)
assert follow_up_apify_request is not None
assert follow_up_apify_request.url == 'https://example.com/image.png'
+
+
+def test_dont_filter_request_is_always_enqueued(spider: Spider) -> None:
+ """A `dont_filter=True` request is always enqueued: each conversion gets a fresh unique key, bypassing dedup."""
+ first = to_apify_request(Request(url='https://example.com', dont_filter=True), spider)
+ second = to_apify_request(Request(url='https://example.com', dont_filter=True), spider)
+
+ assert first is not None
+ assert second is not None
+ # `always_enqueue` prefixes the unique key with a random token (`|`), so two otherwise-identical
+ # requests get distinct unique keys and neither is deduplicated against the other.
+ assert '|' in first.unique_key
+ assert first.unique_key != second.unique_key
+
+
+def test_apify_request_id_in_meta_is_ignored(spider: Spider) -> None:
+ """An `apify_request_id` in `meta` is ignored and does not break conversion; the unique key still applies."""
+ scrapy_request = Request(
+ url='https://example.com',
+ meta={'apify_request_id': 'myCustomId12345', 'apify_request_unique_key': 'https://example.com'},
+ )
+
+ apify_request = to_apify_request(scrapy_request, spider)
+
+ assert apify_request is not None
+ assert apify_request.unique_key == 'https://example.com'
diff --git a/tests/unit/scrapy/requests/test_to_scrapy_request.py b/tests/unit/scrapy/requests/test_to_scrapy_request.py
index fadb25249..898312f28 100644
--- a/tests/unit/scrapy/requests/test_to_scrapy_request.py
+++ b/tests/unit/scrapy/requests/test_to_scrapy_request.py
@@ -1,14 +1,18 @@
from __future__ import annotations
-import binascii
+import codecs
+import json
+import pickle
+import sys
import pytest
-from scrapy import Request, Spider
+from scrapy import FormRequest, Request, Spider
from crawlee._types import HttpHeaders
from apify import Request as ApifyRequest
-from apify.scrapy.requests import to_scrapy_request
+from apify.scrapy._serialization import encode_to_json
+from apify.scrapy.requests import to_apify_request, to_scrapy_request
class DummySpider(Spider):
@@ -21,8 +25,34 @@ def spider() -> DummySpider:
return DummySpider()
+# A JSON-encoded (pickle-free) Scrapy request for a GET request to https://apify.com, encoded exactly
+# the way `to_apify_request` stores it under `user_data['scrapy_request']` (JSON, no outer base64).
+_SCRAPY_REQUEST_JSON_ENCODED = encode_to_json(
+ {
+ 'url': 'https://apify.com',
+ 'callback': None,
+ 'errback': None,
+ 'headers': {
+ b'Accept': [b'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'],
+ b'Accept-Language': [b'en'],
+ b'User-Agent': [b'Scrapy/2.11.0 (+https://scrapy.org)'],
+ b'Accept-Encoding': [b'gzip, deflate'],
+ },
+ 'body': b'',
+ 'cookies': {},
+ 'meta': {'apify_request_id': 'fvwscO2UJLdr10B', 'apify_request_unique_key': 'https://apify.com'},
+ 'encoding': 'utf-8',
+ 'flags': [],
+ 'cb_kwargs': {},
+ 'dont_filter': False,
+ 'method': 'GET',
+ 'priority': 0,
+ }
+)
+
+
def test_without_reconstruction(spider: Spider) -> None:
- # Without reconstruction of encoded Scrapy request
+ """An Apify request without a stored `scrapy_request` becomes a fresh Scrapy request (no reconstruction)."""
apify_request = ApifyRequest(
url='https://example.com',
method='GET',
@@ -39,7 +69,7 @@ def test_without_reconstruction(spider: Spider) -> None:
def test_without_reconstruction_with_optional_fields(spider: Spider) -> None:
- # Without reconstruction of encoded Scrapy request
+ """The without-reconstruction path also carries optional headers and user data to the Scrapy request."""
apify_request = ApifyRequest(
url='https://crawlee.dev',
method='GET',
@@ -62,13 +92,13 @@ def test_without_reconstruction_with_optional_fields(spider: Spider) -> None:
def test_with_reconstruction(spider: Spider) -> None:
- # With reconstruction of encoded Scrapy request
+ """An Apify request with a stored `scrapy_request` is reconstructed from its JSON payload."""
apify_request = ApifyRequest(
url='https://apify.com',
method='GET',
unique_key='https://apify.com',
user_data={
- 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501
+ 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED,
},
)
@@ -82,7 +112,7 @@ def test_with_reconstruction(spider: Spider) -> None:
def test_with_reconstruction_with_optional_fields(spider: Spider) -> None:
- # With reconstruction of encoded Scrapy request
+ """Reconstruction from the stored JSON payload also restores optional headers and user data."""
apify_request = ApifyRequest(
url='https://apify.com',
method='GET',
@@ -90,7 +120,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None:
headers=HttpHeaders({'Authorization': 'Bearer access_token'}),
user_data={
'some_user_data': 'hello',
- 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501
+ 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED,
},
)
@@ -108,6 +138,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None:
def test_invalid_request_for_reconstruction(spider: Spider) -> None:
+ """A stored `scrapy_request` that is not valid JSON fails to decode during reconstruction."""
apify_request = ApifyRequest(
url='https://example.com',
method='GET',
@@ -117,5 +148,295 @@ def test_invalid_request_for_reconstruction(spider: Spider) -> None:
},
)
- with pytest.raises(binascii.Error):
+ # The stored value is parsed as JSON directly (no outer base64), so non-JSON text fails to decode.
+ with pytest.raises(json.JSONDecodeError):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_pickle_payload_rejected(spider: Spider) -> None:
+ """Data under 'scrapy_request' is JSON, so a pickle payload is rejected rather than deserialized."""
+ # Build a pickle payload like the old code produced.
+ scrapy_request_dict = {
+ 'url': 'https://example.com',
+ 'callback': None,
+ 'errback': None,
+ 'headers': {},
+ 'method': 'GET',
+ 'body': b'',
+ 'cookies': {},
+ 'meta': {},
+ 'encoding': 'utf-8',
+ 'priority': 0,
+ 'dont_filter': False,
+ 'flags': [],
+ 'cb_kwargs': {},
+ }
+ pickle_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode()
+
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': pickle_encoded},
+ )
+
+ # The JSON-based reconstruction must reject the pickle payload.
+ with pytest.raises((json.JSONDecodeError, UnicodeDecodeError, ValueError)):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_roundtrip_serialization(spider: Spider) -> None:
+ """A `to_apify_request` -> `to_scrapy_request` round-trip preserves the request under JSON encoding."""
+ original_request = Request(
+ url='https://example.com/test',
+ method='POST',
+ body=b'test body content',
+ headers={'Content-Type': 'application/json', 'X-Custom': 'value'},
+ meta={'userData': {'custom_key': 'custom_value'}},
+ )
+
+ apify_request = to_apify_request(original_request, spider)
+ assert apify_request is not None
+
+ # Verify the stored data is JSON (not pickle) and not wrapped in an outer base64 layer.
+ encoded = apify_request.user_data['scrapy_request']
+ assert isinstance(encoded, str)
+ decoded_json = json.loads(encoded)
+ assert isinstance(decoded_json, dict)
+ assert decoded_json['url'] == 'https://example.com/test'
+
+ # Reconstruct the Scrapy request
+ restored = to_scrapy_request(apify_request, spider)
+ assert isinstance(restored, Request)
+ assert restored.url == original_request.url
+ assert restored.method == original_request.method
+ assert restored.body == original_request.body
+
+
+def test_no_pickle_in_serialized_output(spider: Spider) -> None:
+ """`to_apify_request` produces JSON output, never a pickle payload."""
+ scrapy_request = Request(url='https://example.com')
+ apify_request = to_apify_request(scrapy_request, spider)
+ assert apify_request is not None
+
+ encoded = apify_request.user_data['scrapy_request']
+ assert isinstance(encoded, str)
+
+ # Pickle output starts with the b'\x80' opcode byte; JSON is text starting with '{'.
+ assert encoded.lstrip().startswith('{'), 'Output must be JSON, not pickle'
+ json.loads(encoded) # must parse as JSON
+
+
+def _encode_request_dict(request_dict: dict) -> str:
+ """Encode a raw request dict the same way `to_apify_request` does (JSON, no outer base64)."""
+ return encode_to_json(request_dict)
+
+
+def test_binary_body_round_trips(spider: Spider) -> None:
+ """Non-UTF-8 bytes in the request body survive the JSON roundtrip (base64-encoded)."""
+ original = Request(
+ url='https://example.com',
+ method='POST',
+ body=b'\x00\x01\x02\xff\xfe binary',
+ headers={'Content-Type': 'application/octet-stream'},
+ )
+
+ apify_request = to_apify_request(original, spider)
+ assert apify_request is not None
+
+ restored = to_scrapy_request(apify_request, spider)
+ assert restored.body == b'\x00\x01\x02\xff\xfe binary'
+ assert restored.headers.get('Content-Type') == b'application/octet-stream'
+
+
+def test_binary_headers_round_trip_and_request_not_dropped(spider: Spider) -> None:
+ """A request with non-UTF-8 header values is not dropped; the exact header bytes survive the roundtrip."""
+ original = Request(
+ url='https://example.com',
+ headers={b'Accept': b'text/html', b'X-Bin': b'\xff\xfe\x00'},
+ )
+
+ apify_request = to_apify_request(original, spider)
+ assert apify_request is not None # must not be dropped
+
+ restored = to_scrapy_request(apify_request, spider)
+ assert restored.headers.get(b'X-Bin') == b'\xff\xfe\x00'
+ assert restored.headers.get(b'Accept') == b'text/html'
+
+
+def test_userdata_with_b64_sentinel_key_round_trips(spider: Spider) -> None:
+ """No in-band sentinel is used, so a user dict like `{'__b64__': ...}` in `meta` round-trips unchanged."""
+ original = Request(
+ url='https://example.com',
+ meta={'userData': {}, 'looks_like_sentinel': {'__b64__': 'not really base64 !!!'}},
+ )
+
+ apify_request = to_apify_request(original, spider)
+ assert apify_request is not None
+
+ restored = to_scrapy_request(apify_request, spider)
+ assert restored.meta['looks_like_sentinel'] == {'__b64__': 'not really base64 !!!'}
+
+
+def test_already_imported_request_subclass_round_trips(spider: Spider) -> None:
+ """A `_class` referring to an already-imported `scrapy.Request` subclass is reconstructed."""
+ original = FormRequest(url='https://example.com', formdata={'key': 'value'})
+
+ apify_request = to_apify_request(original, spider)
+ assert apify_request is not None
+
+ restored = to_scrapy_request(apify_request, spider)
+ assert isinstance(restored, FormRequest)
+ assert restored.method == 'POST'
+
+
+def test_non_request_class_is_rejected(spider: Spider) -> None:
+ """A `_class` resolving to a non-`scrapy.Request` type (here a plain `dict`) is rejected before use."""
+ request_dict = {
+ 'url': 'https://example.com',
+ 'callback': None,
+ 'errback': None,
+ 'headers': {},
+ 'body': '',
+ 'cookies': {},
+ 'meta': {},
+ 'encoding': 'utf-8',
+ 'priority': 0,
+ 'dont_filter': False,
+ 'flags': [],
+ 'cb_kwargs': {},
+ 'method': 'GET',
+ '_class': 'builtins.dict',
+ }
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': _encode_request_dict(request_dict)},
+ )
+
+ with pytest.raises(TypeError, match=r'not a scrapy\.Request subclass'):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_unresolvable_class_is_rejected(spider: Spider) -> None:
+ """A `_class` whose dotted path cannot be resolved (no such module) is rejected with a clear error."""
+ request_dict = {
+ 'url': 'https://example.com',
+ 'headers': {},
+ 'body': '',
+ 'cookies': {},
+ 'meta': {},
+ 'encoding': 'utf-8',
+ 'priority': 0,
+ 'dont_filter': False,
+ 'flags': [],
+ 'cb_kwargs': {},
+ 'method': 'GET',
+ '_class': 'definitely_not_imported_pkg.some_module.SomeRequest',
+ }
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': _encode_request_dict(request_dict)},
+ )
+
+ with pytest.raises(TypeError, match='cannot resolve'):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_custom_request_subclass_reconstructed_after_migration(spider: Spider) -> None:
+ """Actor-migration case: a not-yet-imported custom `Request` subclass is reconstructed by importing it."""
+ module_name = 'tests.unit.scrapy.requests._custom_request_module'
+ request_dict = {
+ 'url': 'https://example.com',
+ 'callback': None,
+ 'errback': None,
+ 'headers': {},
+ 'body': b'',
+ 'cookies': {},
+ 'meta': {},
+ 'encoding': 'utf-8',
+ 'priority': 0,
+ 'dont_filter': False,
+ 'flags': [],
+ 'cb_kwargs': {},
+ 'method': 'GET',
+ '_class': f'{module_name}.CustomRequest',
+ }
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': _encode_request_dict(request_dict)},
+ )
+
+ # Simulate a fresh process where the subclass's module has not been imported yet.
+ sys.modules.pop(module_name, None)
+
+ restored = to_scrapy_request(apify_request, spider)
+
+ from tests.unit.scrapy.requests._custom_request_module import CustomRequest
+
+ assert isinstance(restored, CustomRequest)
+ assert module_name in sys.modules # it was imported on demand during reconstruction
+
+
+def test_non_apify_request_input_is_rejected(spider: Spider) -> None:
+ """`to_scrapy_request` raises on non-Apify-request input (unlike `to_apify_request`, which returns None)."""
+ with pytest.raises(TypeError, match='must be an apify'):
+ to_scrapy_request('not an apify request', spider) # ty: ignore[invalid-argument-type]
+
+
+def test_non_string_stored_scrapy_request_is_rejected(spider: Spider) -> None:
+ """A stored `scrapy_request` that is not a string is rejected before any JSON decoding."""
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': {'unexpected': 'dict'}},
+ )
+
+ with pytest.raises(TypeError, match='must be a string'):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_stored_scrapy_request_decoding_to_non_dict_is_rejected(spider: Spider) -> None:
+ """A stored `scrapy_request` that is valid JSON but not a dict (here a JSON array) is rejected."""
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': '[1, 2, 3]'},
+ )
+
+ with pytest.raises(TypeError, match='must be a dictionary'):
+ to_scrapy_request(apify_request, spider)
+
+
+def test_non_string_class_is_rejected(spider: Spider) -> None:
+ """A `_class` entry that is not a string is rejected before any class resolution."""
+ request_dict = {
+ 'url': 'https://example.com',
+ 'headers': {},
+ 'body': '',
+ 'cookies': {},
+ 'meta': {},
+ 'encoding': 'utf-8',
+ 'priority': 0,
+ 'dont_filter': False,
+ 'flags': [],
+ 'cb_kwargs': {},
+ 'method': 'GET',
+ '_class': 123,
+ }
+ apify_request = ApifyRequest(
+ url='https://example.com',
+ method='GET',
+ unique_key='https://example.com',
+ user_data={'scrapy_request': _encode_request_dict(request_dict)},
+ )
+
+ with pytest.raises(TypeError, match='expected a string'):
to_scrapy_request(apify_request, spider)
diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py
index 3e0db8206..c4a87622c 100644
--- a/tests/unit/scrapy/test_scheduler.py
+++ b/tests/unit/scrapy/test_scheduler.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
+from types import SimpleNamespace
from typing import cast
from unittest import mock
@@ -24,11 +25,7 @@ def spider() -> DummySpider:
@pytest.fixture
def scheduler(monkeypatch: pytest.MonkeyPatch, spider: DummySpider) -> ApifyScheduler:
- """Create a scheduler with its reactor check and async thread stubbed out.
-
- The request queue is a plain mock that satisfies the `isinstance` checks; the `run_coro` results
- are set per test via the mocked async thread.
- """
+ """Create a scheduler with its reactor check and async thread stubbed out."""
monkeypatch.setattr('apify.scrapy.scheduler.is_asyncio_reactor_installed', lambda: True)
monkeypatch.setattr('apify.scrapy.scheduler.AsyncThread', mock.MagicMock())
@@ -42,10 +39,62 @@ def scheduler(monkeypatch: pytest.MonkeyPatch, spider: DummySpider) -> ApifySche
return scheduler
+def test_has_pending_requests_reflects_queue_state(scheduler: ApifyScheduler) -> None:
+ """`has_pending_requests` is True while the queue is not finished and False once it is."""
+ async_thread = cast('mock.MagicMock', scheduler._async_thread)
+
+ async_thread.run_coro.return_value = False # the queue still has work
+ assert scheduler.has_pending_requests() is True
+
+ async_thread.run_coro.return_value = True # the queue is drained
+ assert scheduler.has_pending_requests() is False
+
+
+def test_enqueue_request_skips_non_serializable_request(
+ scheduler: ApifyScheduler,
+ caplog: pytest.LogCaptureFixture,
+) -> None:
+ """A request that cannot be converted (non-serializable meta) is not enqueued: returns False and logs an error."""
+ rq = cast('mock.MagicMock', scheduler._rq)
+
+ # A set in `meta` is not JSON-serializable, so `to_apify_request` returns None.
+ scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}})
+
+ with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'):
+ result = scheduler.enqueue_request(scrapy_request)
+
+ assert result is False
+ assert 'could not be converted' in caplog.text
+ rq.add_request.assert_not_called()
+
+
+def test_enqueue_request_enqueues_converted_request(scheduler: ApifyScheduler) -> None:
+ """A convertible request is enqueued and reported as newly added when the queue had not seen it."""
+ rq = cast('mock.MagicMock', scheduler._rq)
+ async_thread = cast('mock.MagicMock', scheduler._async_thread)
+ async_thread.run_coro.return_value = SimpleNamespace(was_already_present=False)
+
+ result = scheduler.enqueue_request(Request(url='https://example.com'))
+
+ assert result is True
+ rq.add_request.assert_called_once()
+
+
+def test_enqueue_request_returns_false_for_duplicate(scheduler: ApifyScheduler) -> None:
+ """A request already present in the queue is reported as not newly enqueued (returns False)."""
+ async_thread = cast('mock.MagicMock', scheduler._async_thread)
+ async_thread.run_coro.return_value = SimpleNamespace(was_already_present=True)
+
+ result = scheduler.enqueue_request(Request(url='https://example.com'))
+
+ assert result is False
+
+
def test_next_request_skips_request_that_fails_to_convert(
scheduler: ApifyScheduler,
caplog: pytest.LogCaptureFixture,
) -> None:
+ """A queue entry that fails to reconstruct is skipped and still marked handled, not retried forever."""
rq = cast('mock.MagicMock', scheduler._rq)
async_thread = cast('mock.MagicMock', scheduler._async_thread)
@@ -67,11 +116,13 @@ def test_next_request_skips_request_that_fails_to_convert(
assert result is None
assert 'skipping it' in caplog.text
- # It was still marked as handled before the failed conversion, so it is not retried forever.
+ # Even though conversion failed, the unrecoverable entry is still marked as handled (consumed), so
+ # the queue does not keep handing it back forever.
rq.mark_request_as_handled.assert_called_once_with(malformed_request)
def test_next_request_returns_converted_request(scheduler: ApifyScheduler) -> None:
+ """A valid queue entry is reconstructed into a Scrapy request and marked handled."""
rq = cast('mock.MagicMock', scheduler._rq)
async_thread = cast('mock.MagicMock', scheduler._async_thread)
@@ -88,3 +139,15 @@ def test_next_request_returns_converted_request(scheduler: ApifyScheduler) -> No
assert isinstance(result, Request)
assert result.url == apify_request.url
rq.mark_request_as_handled.assert_called_once_with(apify_request)
+
+
+def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) -> None:
+ """An empty queue makes `next_request` return None and skip marking anything as handled."""
+ rq = cast('mock.MagicMock', scheduler._rq)
+ async_thread = cast('mock.MagicMock', scheduler._async_thread)
+ async_thread.run_coro.return_value = None
+
+ result = scheduler.next_request()
+
+ assert result is None
+ rq.mark_request_as_handled.assert_not_called()
diff --git a/tests/unit/scrapy/test_serialization.py b/tests/unit/scrapy/test_serialization.py
new file mode 100644
index 000000000..e290af986
--- /dev/null
+++ b/tests/unit/scrapy/test_serialization.py
@@ -0,0 +1,126 @@
+from __future__ import annotations
+
+from datetime import UTC, datetime
+
+import pytest
+from pydantic import BaseModel, Field
+
+from apify.scrapy._serialization import _MAX_ERROR_VALUE_REPR_LEN, decode_from_json, encode_to_json
+
+
+def _round_trip(data: dict) -> dict:
+ """Encode then decode a dict through the serializer."""
+ decoded = decode_from_json(encode_to_json(data))
+ assert isinstance(decoded, dict)
+ return decoded
+
+
+class _LongRepr:
+ """A helper whose repr is long enough to trigger truncation in a serialization error message."""
+
+ def __repr__(self) -> str:
+ return 'x' * 500
+
+
+def test_bytes_body_round_trips() -> None:
+ """A binary `body` round-trips unchanged (base64-encoded inside the JSON)."""
+ assert _round_trip({'body': b'\x00\x01\xff binary'})['body'] == b'\x00\x01\xff binary'
+
+
+def test_str_body_round_trips_to_bytes() -> None:
+ """A `str` body is encoded as its UTF-8 bytes and comes back as `bytes` (Scrapy stores bytes)."""
+ assert _round_trip({'body': 'hello'})['body'] == b'hello'
+
+
+def test_empty_str_body_round_trips() -> None:
+ """An empty `str` body round-trips to empty `bytes`."""
+ assert _round_trip({'body': ''})['body'] == b''
+
+
+def test_bytes_headers_round_trip() -> None:
+ """Scrapy-style `{bytes: [bytes]}` headers, including binary values, round-trip unchanged."""
+ data = {'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}}
+ assert _round_trip(data)['headers'] == {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}
+
+
+def test_str_header_value_round_trips_to_bytes() -> None:
+ """A `str` header value is encoded as its UTF-8 bytes and comes back as `bytes` (Scrapy uses bytes)."""
+ assert _round_trip({'headers': {b'Content-Type': ['text/html']}})['headers'] == {b'Content-Type': [b'text/html']}
+
+
+def test_bare_header_value_is_normalized_to_list() -> None:
+ """A header value not wrapped in a list is normalized to a single-element list on round-trip."""
+ assert _round_trip({'headers': {b'X-Single': b'one'}})['headers'] == {b'X-Single': [b'one']}
+
+
+def test_non_ascii_is_not_escaped() -> None:
+ """Non-ASCII text stays in its UTF-8 form instead of ASCII escape sequences, which would bloat storage."""
+ encoded = encode_to_json({'meta': {'name': 'Ñoño café 日本語'}})
+ assert 'Ñoño café 日本語' in encoded
+ assert '\\u' not in encoded
+ assert decode_from_json(encoded)['meta']['name'] == 'Ñoño café 日本語'
+
+
+def test_pydantic_model_is_dumped_by_alias() -> None:
+ """A pydantic model in `meta` is dumped to a plain dict using its serialization aliases."""
+
+ class Model(BaseModel):
+ first: int = Field(serialization_alias='First')
+
+ encoded = encode_to_json({'meta': {'m': Model(first=1)}})
+ assert decode_from_json(encoded)['meta']['m'] == {'First': 1}
+
+
+def test_tuple_is_coerced_to_list() -> None:
+ """Documented limitation: JSON has no tuple type, so a tuple round-trips as a list."""
+ assert _round_trip({'meta': {'coords': (1, 2, 3)}})['meta']['coords'] == [1, 2, 3]
+
+
+def test_non_string_dict_keys_are_coerced_to_strings() -> None:
+ """Documented limitation: JSON object keys are strings, so `{1: 'a'}` round-trips as `{'1': 'a'}`."""
+ assert _round_trip({'cb_kwargs': {'m': {1: 'a'}}})['cb_kwargs']['m'] == {'1': 'a'}
+
+
+def test_non_serializable_value_raises_with_type_and_repr() -> None:
+ """A value JSON cannot represent raises a `TypeError` naming the offending type and value."""
+ when = datetime(2020, 1, 2, 3, 4, 5, tzinfo=UTC)
+ with pytest.raises(TypeError) as exc_info:
+ encode_to_json({'meta': {'when': when}})
+
+ # The user-facing message points at meta/cb_kwargs; the chained cause names the offending value.
+ assert 'JSON-serializable' in str(exc_info.value)
+ assert 'datetime' in str(exc_info.value.__cause__)
+
+
+def test_long_value_repr_is_truncated_in_error() -> None:
+ """A non-serializable value with a very long repr is truncated (with an ellipsis) so it cannot bloat the log."""
+ with pytest.raises(TypeError) as exc_info:
+ encode_to_json({'meta': {'big': _LongRepr()}})
+
+ cause = str(exc_info.value.__cause__)
+ assert cause.endswith('...')
+ assert cause.count('x') == _MAX_ERROR_VALUE_REPR_LEN
+
+
+def test_encode_rejects_non_dict() -> None:
+ """Encoding a non-dict top-level value raises `TypeError`."""
+ with pytest.raises(TypeError, match='Expected a dict'):
+ encode_to_json(['not', 'a', 'dict']) # ty: ignore[invalid-argument-type]
+
+
+def test_decode_of_non_dict_json_returns_value_as_is() -> None:
+ """JSON that decodes to a non-dict (a list, a bare string) is returned unchanged, not coerced or rejected."""
+ assert decode_from_json('[1, 2, 3]') == [1, 2, 3]
+ assert decode_from_json('"hello"') == 'hello'
+
+
+def test_decode_rejects_non_base64_body() -> None:
+ """A non-base64 `body` is rejected loudly (the `validate=True` guard), not silently decoded to garbage."""
+ with pytest.raises(ValueError, match='base64'):
+ decode_from_json('{"body": "not valid base64 !!!"}')
+
+
+def test_decode_rejects_non_base64_header_value() -> None:
+ """A non-base64 header value is rejected loudly rather than silently decoded to garbage."""
+ with pytest.raises(ValueError, match='base64'):
+ decode_from_json('{"headers": {"X-Test": ["not valid base64 !!!"]}}')