Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/03_guides/06_scrapy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The Apify SDK provides several custom components to support integration with the
- <ApiLink to="class/ApifyScheduler">`apify.scrapy.ApifyScheduler`</ApiLink> - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests.
- <ApiLink to="class/ActorDatasetPushPipeline">`apify.scrapy.ActorDatasetPushPipeline`</ApiLink> - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset.
- <ApiLink to="class/ApifyHttpProxyMiddleware">`apify.scrapy.ApifyHttpProxyMiddleware`</ApiLink> - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service.
- <ApiLink to="class/ApifyCacheStorage">`apify.scrapy.extensions.ApifyCacheStorage`</ApiLink> - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work.
- <ApiLink to="class/ApifyCacheStorage">`apify.scrapy.extensions.ApifyCacheStorage`</ApiLink> - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). To enable caching, set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings. By default, when the spider closes, up to 100 expired and unreadable entries per run are cleaned up. To change this number, update `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS`.

Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include:

Expand Down
33 changes: 33 additions & 0 deletions docs/04_upgrading/upgrading_to_v4.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,36 @@ await client.run('my-run').charge('my-event', count=5)
### Async `iterate_*` are no longer coroutine functions

`DatasetClientAsync.iterate_items()` and `KeyValueStoreClientAsync.iterate_keys()` are now plain `def` functions returning `AsyncIterator[T]`. Consumer code (`async for ...`) is unchanged; if you annotate the call's return value, change `AsyncGenerator[T, None]` to `AsyncIterator[T]`.

## Scrapy requests and HTTP cache stored as JSON

This applies only if you use the Scrapy integration (`apify[scrapy]`).

The integration now serializes data as JSON instead of pickle. `ApifyScheduler` stores Scrapy requests in the Apify request queue (under `user_data['scrapy_request']`), and `ApifyCacheStorage` stores HTTP cache entries in the key-value store. Both now hold JSON. Unlike pickle, JSON stays stable across Python and library versions.

### Persisted data from before the upgrade is not read back

Data written by an older SDK uses the pickle format, which v4 does not load. The two storages handle this differently:

- HTTP cache: a legacy entry is treated as a cache miss. Scrapy re-fetches the page and re-stores it as JSON, so the cache heals itself. No action is needed.
- Request queue: a request stored by an older SDK cannot be reconstructed, so it is skipped and the failure is logged. This matters only when pre-upgrade requests are still in the queue, for example after a run is migrated or restarted, or when you reuse a named request queue. A fresh run is not affected.

### `meta` and `cb_kwargs` must be JSON-serializable

Pickle could store arbitrary Python objects. JSON cannot, so the values in a request's `meta` and `cb_kwargs` are now subject to JSON's type system:

- A `tuple` comes back as a `list`.
- Non-string `dict` keys come back as strings, so `{1: 'a'}` becomes `{'1': 'a'}`.
- A value JSON cannot represent (`datetime`, `set`, `Decimal`, a custom object) is no longer stored silently. The request is skipped and the failure is logged. Pydantic models are still supported and are dumped with `model_dump()`.

Convert such values to a JSON-friendly form before yielding the request:

```python
from datetime import datetime

# Before (v3): relied on pickle to store the datetime object.
yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1)})

# After (v4): store a JSON-serializable value.
yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1).isoformat()})
```
138 changes: 138 additions & 0 deletions src/apify/scrapy/_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""JSON serialization of Scrapy requests and cached responses for storage on the Apify platform.

Scrapy requests and cached responses are stored in the Apify request queue and key-value store which hold JSON,
so they are serialized as JSON here rather than pickled.

Only `body` (`bytes`) and `headers` (`{bytes: [bytes]}`) are not natively JSON-serializable; both sit at fixed keys
and are base64-encoded in place. A `str` `body` is encoded as its UTF-8 bytes and comes back as `bytes`, matching
Scrapy, which always stores `body` as `bytes`. Pydantic models such as Crawlee's `UserData` are dumped via
`model_dump()`. Everything else, notably `meta` and `cb_kwargs`, must already be JSON-serializable, otherwise
serialization fails with a clear error naming the offending value. No in-band sentinel is used, so no user value
can collide with the encoding.

Known limitations of the pickle -> JSON switch (a documented breaking change): JSON has fewer types than pickle,
so values in `meta`/`cb_kwargs` are subject to JSON's coercions. A `tuple` round-trips as a `list` and non-string
`dict` keys round-trip as strings (e.g. `{1: 'a'}` becomes `{'1': 'a'}`). Values JSON cannot represent at all
(`datetime`, `set`, `Decimal`, arbitrary objects, ...) are not coerced silently: serialization raises and the request
is skipped loudly rather than stored in a corrupted form.
"""

from __future__ import annotations

import base64
import json
from typing import Any

from pydantic import BaseModel

# Cap the offending value's repr in a serialization error message so a huge value cannot bloat the log.
_MAX_ERROR_VALUE_REPR_LEN = 200


def encode_to_json(data: dict[str, Any]) -> str:
"""Serialize a Scrapy request/response dict to a JSON string.

The `body` and `headers` fields are base64-encoded in place (a `str` `body` via its UTF-8 bytes); pydantic
models are dumped to plain dicts. A `TypeError` is raised if any other value cannot be JSON-encoded.

Args:
data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`.

Returns:
The JSON-encoded string.
"""
if not isinstance(data, dict):
raise TypeError(f'Expected a dict to serialize, got {type(data)}')

safe = dict(data)

# `body` is base64-encoded so binary payloads survive; a `str` body is taken as its UTF-8 bytes, which keeps
# encode/decode symmetric (decode always base64-decodes `body` back to `bytes`).
body = safe.get('body')
if isinstance(body, (bytes, str)):
raw_body = body.encode('utf-8') if isinstance(body, str) else body
safe['body'] = base64.b64encode(raw_body).decode('ascii')

if isinstance(safe.get('headers'), dict):
safe['headers'] = _encode_headers(safe['headers'])

try:
# `ensure_ascii=False` keeps non-ASCII URLs/meta as their UTF-8 form instead of `\uXXXX` escapes, which
# would otherwise roughly double the size of non-Latin text in storage.
return json.dumps(safe, default=_json_default, ensure_ascii=False)
except TypeError as exc:
raise TypeError(
'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. '
'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, '
'list, dict, or a pydantic model).'
) from exc


def decode_from_json(text: str) -> Any:
"""Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`.

The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation.

Args:
text: The JSON-encoded string.

Returns:
The decoded object (a dict for valid request/response payloads).
"""
data = json.loads(text)
if not isinstance(data, dict):
return data

# `validate=True` makes a non-base64 body raise loudly instead of silently decoding to garbage.
if isinstance(data.get('body'), str):
data['body'] = base64.b64decode(data['body'], validate=True)

if isinstance(data.get('headers'), dict):
data['headers'] = _decode_headers(data['headers'])

return data


def _json_default(obj: Any) -> Any:
"""Fallback for values `json.dumps` cannot serialize: pydantic models are dumped, anything else raises.

The error names the offending value (type and a truncated repr) so a failed serialization points straight
at the bad `meta`/`cb_kwargs` entry instead of just reporting that something failed.
"""
if isinstance(obj, BaseModel):
return obj.model_dump(by_alias=True)
value_repr = repr(obj)
if len(value_repr) > _MAX_ERROR_VALUE_REPR_LEN:
value_repr = value_repr[:_MAX_ERROR_VALUE_REPR_LEN] + '...'
raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable: {value_repr}')


def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]:
"""Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`."""
encoded: dict[str, list[str]] = {}
for key, value in headers.items():
str_key = key.decode('latin-1') if isinstance(key, bytes) else key
values = value if isinstance(value, (list, tuple)) else [value]
encoded[str_key] = [_b64encode_value(item) for item in values]
return encoded


def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]:
"""Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects."""
decoded: dict[bytes, list[bytes]] = {}
for key, value in headers.items():
bytes_key = key.encode('latin-1') if isinstance(key, str) else key
values = value if isinstance(value, list) else [value]
decoded[bytes_key] = [base64.b64decode(item, validate=True) for item in values]
return decoded


def _b64encode_value(value: Any) -> str:
"""Base64-encode a single header value.

Scrapy stores header values as `bytes`; a `str` is encoded as its UTF-8 bytes. Any other type is coerced with
`str()` as a lenient last resort. That coercion is lossy (e.g. `5` becomes `b'5'`), but Scrapy does not produce
non-`bytes`/`str` header values, so it is not hit on the real path.
"""
raw = value if isinstance(value, bytes) else str(value).encode('utf-8')
return base64.b64encode(raw).decode('ascii')
82 changes: 49 additions & 33 deletions src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import gzip
import io
import pickle
import re
import struct
from logging import getLogger
Expand All @@ -14,6 +13,7 @@

from apify import Configuration
from apify.scrapy._async_thread import AsyncThread
from apify.scrapy._serialization import decode_from_json, encode_to_json
from apify.storage_clients import ApifyStorageClient
from apify.storages import KeyValueStore

Expand All @@ -29,14 +29,14 @@
class ApifyCacheStorage:
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.

It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches
responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`)
in the Scrapy documentation for more information. Requires the asyncio Twisted reactor
to be installed.
It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches responses to requests.
See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) in the Scrapy documentation for more information.
Requires the asyncio Twisted reactor to be installed.
"""

def __init__(self, settings: BaseSettings) -> None:
self._expiration_max_items = 100
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
self._spider: Spider | None = None
self._kvs: KeyValueStore | None = None
Expand Down Expand Up @@ -79,23 +79,26 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None:
async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
i = 0
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.set_value(item.key, None)
await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.set_value(item.key, None)
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')
if i == self._expiration_max_items:
break
i += 1

self._async_thread.run_coro(expire_kvs())

Expand Down Expand Up @@ -127,17 +130,25 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non

if current_time is None:
current_time = int(time())
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
logger.debug('Cache expired', extra={'request': request})

# A malformed or legacy cache entry must not crash retrieval; treat it as a cache miss so Scrapy re-fetches
# and re-stores it in the current format. The field reads stay inside the `try` as well: a value that decodes
# to a dict missing any expected key (a forward/older format, or a truncated-but-valid JSON payload) must
# also degrade to a miss rather than raising an uncaught `KeyError`.
try:
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
logger.debug('Cache expired', extra={'request': request})
return None
data = from_gzip(value)
url = data['url']
status = data['status']
headers = Headers(data['headers'])
body = data['body']
except Exception as exc:
logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request})
return None

data = from_gzip(value)
url = data['url']
status = data['status']
headers = Headers(data['headers'])
body = data['body']
respcls = responsetypes.from_args(headers=headers, url=url, body=body)

logger.debug('Cache hit', extra={'request': request})
return respcls(url=url, headers=headers, status=status, body=body)

Expand All @@ -162,18 +173,25 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
"""Dump a dictionary to a gzip-compressed byte stream."""
"""Dump a dictionary to a gzip-compressed JSON byte stream.

Cache entries live in the Apify key-value store, which holds JSON, so they are serialized as JSON rather
than pickled. See `apify.scrapy._serialization` for the encoding.
"""
payload = encode_to_json(data).encode('utf-8')
with io.BytesIO() as byte_stream:
with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file:
pickle.dump(data, gzip_file, protocol=4)
gzip_file.write(payload)
return byte_stream.getvalue()


def from_gzip(gzip_bytes: bytes) -> dict:
"""Load a dictionary from a gzip-compressed byte stream."""
"""Load a dictionary from a gzip-compressed JSON byte stream."""
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file:
data: dict = pickle.load(gzip_file)
return data
data = decode_from_json(gzip_file.read().decode('utf-8'))
if not isinstance(data, dict):
raise TypeError(f'Expected a dict from the cached payload, got {type(data)}')
return data


def read_gzip_time(gzip_bytes: bytes) -> int:
Expand All @@ -187,17 +205,15 @@ def read_gzip_time(gzip_bytes: bytes) -> int:
def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
"""Get the key value store name for a spider.

The key value store name is derived from the spider name by replacing all special characters
with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with
'httpcache-' and truncated to the maximum length.
The key value store name is derived from the spider name by replacing all special characters with hyphens
and trimming leading and trailing hyphens. The resulting name is prefixed with 'httpcache-' and truncated
to the maximum length.

The documentation
[about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
The documentation [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
mentions that names can be up to 63 characters long, so the default max length is set to 60.

Such naming isn't unique per spider, but should be sufficiently unique for most use cases.
The name of the key value store should indicate to which spider it belongs, e.g. in
the listing in the Apify's console.
Such naming isn't unique per spider, but should be sufficiently unique for most use cases. The name
of the key-value store should indicate to which spider it belongs, e.g. in the listing in the Apify's console.

Args:
spider_name: Value of the Spider instance's name attribute.
Expand Down
Loading
Loading