diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index e9cff35..58313f0 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -90,6 +90,79 @@ valid = client.verify_webhook(request.body, request.META['HTTP_X_SIGNATURE']) valid = client.verify_webhook(request.data, request.headers['X-SIGNATURE']) ``` +### Compressed webhook bodies + +GZIP compression can be enabled for hooks payloads from the Dashboard. Enabling compression reduces the payload size significantly (often 70–90% smaller) reducing your bandwidth usage on Stream. The computation overhead introduced by the decompression step is usually negligible and offset by the much smaller payload. + +When payload compression is enabled, webhook HTTP requests will include the `Content-Encoding: gzip` header and the request body will be compressed with GZIP. Some HTTP servers and middleware (Rails, Django, Laravel, Spring Boot, ASP.NET) handle this transparently and strip the header before your handler runs — in that case the body you see is already raw JSON. + +Before enabling compression, make sure that: + +* Your backend integration is using a recent version of our official SDKs with compression support +* If you don't use an official SDK, make sure that your code supports receiving compressed payloads +* The payload signature check is done on the **uncompressed** payload + +The Python SDK ships with `client.verify_and_decode_webhook(...)` which transparently handles plain, gzip-compressed, and base64-wrapped (SQS / SNS firehose) payloads. It returns the raw JSON body as `bytes`, ready to pass to `json.loads`. + +```python +import json +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +# Django view +def stream_webhook(request): + body = client.verify_and_decode_webhook( + request.body, + request.headers["X-Signature"], + request.headers.get("Content-Encoding"), + ) + event = json.loads(body) + # ... handle event ... +``` + +```python +import json +from flask import request +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +@app.route("/webhooks/stream", methods=["POST"]) +def stream_webhook(): + body = client.verify_and_decode_webhook( + request.get_data(), + request.headers["X-Signature"], + request.headers.get("Content-Encoding"), + ) + event = json.loads(body) + # ... handle event ... +``` + +If your HTTP framework or a middleware already decompressed the body before it reached your handler, the `Content-Encoding` header will be missing (or set to `identity`) and `verify_and_decode_webhook` will be a no-op for the decompression step — the same call works in both cases. + +`verify_and_decode_webhook` raises `stream_chat.base.exceptions.WebhookSignatureError` when the signature does not match or the body cannot be decoded. + +The original `client.verify_webhook(request.body, request.headers["X-Signature"])` is unchanged and still available for handlers that prefer to verify and parse the body separately. + +#### SQS / SNS firehose + +When delivering events through SQS or SNS, Stream base64-wraps the (possibly gzip-compressed) body so the payload stays valid UTF-8 over the queue. Pass `payload_encoding="base64"` so `verify_and_decode_webhook` unwraps the envelope before verifying the HMAC signature, which is always computed over the uncompressed JSON. + +```python +body = client.verify_and_decode_webhook( + sqs_message["Body"], + sqs_message["MessageAttributes"]["X-Signature"]["StringValue"], + content_encoding=sqs_message["MessageAttributes"] + .get("Content-Encoding", {}) + .get("StringValue"), + payload_encoding="base64", +) +event = json.loads(body) +``` + +If you only need to decode the body without checking the signature (for example because you have already verified it elsewhere), use `client.decompress_webhook_body(body, content_encoding, payload_encoding)`. + All webhook requests contain these headers: | Name | Description | Example | diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 750fcc8..1230c65 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -133,6 +133,85 @@ def verify_webhook( ).hexdigest() return signature == x_signature + def decompress_webhook_body( + self, + body: Union[bytes, str], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, + ) -> bytes: + """Decode a (possibly compressed and/or wrapped) webhook payload. + + Stream Chat can compress outbound webhook payloads with gzip and, for + SQS / SNS firehose delivery, also wrap the compressed bytes in base64 + so they remain valid UTF-8 over the queue. This helper applies the + encodings in order: + + 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. + 2. ``content_encoding`` (``"gzip"``) is decompressed next. + 3. The raw JSON bytes are returned. The caller can ``.decode("utf-8")`` + or pass the value straight to :func:`json.loads`, which accepts + bytes. + + ``None`` or an empty string for either encoding is a no-op, so the + regular HTTP webhook path stays bytewise identical to today. + + This method does **not** check the ``X-Signature`` header. Use + :meth:`verify_and_decode_webhook` for the combined decode + verify + flow. + + :param body: raw bytes (or str) received from Stream + :param content_encoding: value of the ``Content-Encoding`` header + (only ``"gzip"`` is supported) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose + :returns: the uncompressed JSON body as bytes + """ + from stream_chat.webhook import decompress_webhook_body + + return decompress_webhook_body( + body, + content_encoding=content_encoding, + payload_encoding=payload_encoding, + ) + + def verify_and_decode_webhook( + self, + body: Union[bytes, str], + x_signature: Union[str, bytes], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, + ) -> bytes: + """Decode a webhook payload and verify its HMAC-SHA256 signature. + + The signature is always computed over the **uncompressed** JSON + payload, so this method first decodes the body via + :meth:`decompress_webhook_body` and then compares the digest with + ``x_signature`` using :func:`hmac.compare_digest`. + + Works for plain HTTP webhooks (pass the ``Content-Encoding`` header + value) and for SQS / SNS firehose envelopes (additionally pass + ``payload_encoding="base64"``). + + :param body: raw bytes (or str) received from Stream + :param x_signature: the ``X-Signature`` header value sent by Stream + :param content_encoding: value of the ``Content-Encoding`` header + (only ``"gzip"`` is supported) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose + :returns: the verified, uncompressed JSON body as bytes + :raises stream_chat.base.exceptions.WebhookSignatureError: on + signature mismatch or any decode error + """ + from stream_chat.webhook import verify_and_decode_webhook + + return verify_and_decode_webhook( + body, + x_signature, + api_secret=self.api_secret, + content_encoding=content_encoding, + payload_encoding=payload_encoding, + ) + @abc.abstractmethod def update_app_settings( self, **settings: Any diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 9ed295b..3241f7f 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -25,3 +25,17 @@ def __str__(self) -> str: return f'StreamChat error code {self.error_code}: {self.error_message}"' else: return f"StreamChat error HTTP code: {self.status_code}" + + +class WebhookSignatureError(StreamAPIException): + """Raised when an outbound webhook signature does not match, the + webhook payload cannot be decompressed, or the wrapping (e.g. base64) + cannot be decoded. + """ + + def __init__(self, message: str) -> None: + super().__init__(message, status_code=0) + self.message = message + + def __str__(self) -> str: + return f"WebhookSignatureError: {self.message}" diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py new file mode 100644 index 0000000..dd72df3 --- /dev/null +++ b/stream_chat/tests/test_webhook_compression.py @@ -0,0 +1,271 @@ +import base64 +import gzip +import hashlib +import hmac + +import pytest + +from stream_chat import StreamChat, StreamChatAsync +from stream_chat.base.exceptions import WebhookSignatureError +from stream_chat.webhook import decompress_webhook_body, verify_and_decode_webhook + +API_KEY = "tkey" +API_SECRET = "tsec2" +JSON_BODY = b'{"type":"message.new","message":{"text":"the quick brown fox"}}' + + +def _sign(body: bytes, secret: str = API_SECRET) -> str: + return hmac.new(key=secret.encode(), msg=body, digestmod=hashlib.sha256).hexdigest() + + +def _gzip(body: bytes) -> bytes: + return gzip.compress(body) + + +def _b64(body: bytes) -> bytes: + return base64.b64encode(body) + + +@pytest.fixture +def sync_client() -> StreamChat: + return StreamChat(api_key=API_KEY, api_secret=API_SECRET) + + +class TestVerifyWebhookBackwardCompat: + def test_verify_webhook_matches_signature(self, sync_client: StreamChat): + signature = _sign(JSON_BODY) + assert sync_client.verify_webhook(JSON_BODY, signature) is True + + def test_verify_webhook_rejects_bad_signature(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, "0" * 64) is False + + def test_verify_webhook_accepts_bytes_signature(self, sync_client: StreamChat): + signature = _sign(JSON_BODY).encode() + assert sync_client.verify_webhook(JSON_BODY, signature) is True + + +class TestDecompressWebhookBody: + def test_passthrough_when_no_encodings(self): + assert decompress_webhook_body(JSON_BODY) == JSON_BODY + + def test_passthrough_when_encodings_are_empty_strings(self): + assert ( + decompress_webhook_body(JSON_BODY, content_encoding="", payload_encoding="") + == JSON_BODY + ) + + def test_passthrough_when_encodings_are_none(self): + assert ( + decompress_webhook_body( + JSON_BODY, content_encoding=None, payload_encoding=None + ) + == JSON_BODY + ) + + def test_gzip_round_trip_bytes(self): + compressed = _gzip(JSON_BODY) + assert decompress_webhook_body(compressed, content_encoding="gzip") == JSON_BODY + + def test_gzip_round_trip_str_input(self): + compressed = _gzip(JSON_BODY) + wrapped = compressed.decode("latin-1") + assert ( + decompress_webhook_body(wrapped.encode("latin-1"), content_encoding="gzip") + == JSON_BODY + ) + + def test_base64_round_trip_no_compression(self): + wrapped = _b64(JSON_BODY) + assert decompress_webhook_body(wrapped, payload_encoding="base64") == JSON_BODY + + def test_base64_str_input(self): + wrapped_str = _b64(JSON_BODY).decode("ascii") + assert ( + decompress_webhook_body(wrapped_str, payload_encoding="base64") == JSON_BODY + ) + + def test_base64_plus_gzip_round_trip(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert ( + decompress_webhook_body( + wrapped, content_encoding="gzip", payload_encoding="base64" + ) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "content_encoding", + ["GZIP", "Gzip", " gzip ", "gZiP"], + ) + def test_content_encoding_is_case_insensitive(self, content_encoding: str): + compressed = _gzip(JSON_BODY) + assert ( + decompress_webhook_body(compressed, content_encoding=content_encoding) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "payload_encoding", + ["BASE64", "Base64", " base64 ", "B64", "b64", " b64 "], + ) + def test_payload_encoding_aliases_and_case(self, payload_encoding: str): + wrapped = _b64(JSON_BODY) + assert ( + decompress_webhook_body(wrapped, payload_encoding=payload_encoding) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "content_encoding", ["br", "brotli", "zstd", "deflate", "compress", "lz4"] + ) + def test_unsupported_content_encoding(self, content_encoding: str): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, content_encoding=content_encoding) + message = str(exc_info.value).lower() + assert "unsupported" in message + assert "gzip" in message + + @pytest.mark.parametrize("payload_encoding", ["hex", "url", "binary"]) + def test_unsupported_payload_encoding(self, payload_encoding: str): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, payload_encoding=payload_encoding) + message = str(exc_info.value).lower() + assert "unsupported" in message + assert "payload_encoding" in message + + def test_invalid_gzip_bytes_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + decompress_webhook_body(b"this is not gzip data", content_encoding="gzip") + assert "decompress" in str(exc_info.value).lower() + + def test_invalid_base64_input_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + decompress_webhook_body( + b"!!!not-valid-base64!!!", payload_encoding="base64" + ) + assert "payload_encoding" in str(exc_info.value).lower() + + def test_returns_bytes_type(self): + result = decompress_webhook_body(JSON_BODY) + assert isinstance(result, bytes) + + def test_unsupported_message_includes_value(self): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, content_encoding="brotli") + assert "brotli" in str(exc_info.value) + + +class TestVerifyAndDecodeWebhookHelper: + def test_happy_path_plain(self): + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + == JSON_BODY + ) + + def test_happy_path_gzip(self): + compressed = _gzip(JSON_BODY) + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook( + compressed, + signature, + api_secret=API_SECRET, + content_encoding="gzip", + ) + == JSON_BODY + ) + + def test_happy_path_base64_plus_gzip(self): + wrapped = _b64(_gzip(JSON_BODY)) + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook( + wrapped, + signature, + api_secret=API_SECRET, + content_encoding="gzip", + payload_encoding="base64", + ) + == JSON_BODY + ) + + def test_signature_mismatch_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + verify_and_decode_webhook(JSON_BODY, "0" * 64, api_secret=API_SECRET) + assert "invalid webhook signature" in str(exc_info.value).lower() + + def test_signature_over_compressed_bytes_raises(self): + compressed = _gzip(JSON_BODY) + signature_over_compressed = _sign(compressed) + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook( + compressed, + signature_over_compressed, + api_secret=API_SECRET, + content_encoding="gzip", + ) + + def test_signature_over_wrapped_bytes_raises(self): + wrapped = _b64(_gzip(JSON_BODY)) + signature_over_wrapped = _sign(wrapped) + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook( + wrapped, + signature_over_wrapped, + api_secret=API_SECRET, + content_encoding="gzip", + payload_encoding="base64", + ) + + def test_bad_secret_raises(self): + signature = _sign(JSON_BODY, secret="other") + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + + def test_signature_can_be_bytes(self): + signature = _sign(JSON_BODY).encode() + assert ( + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + == JSON_BODY + ) + + +class TestSyncClientMethods: + def test_decompress_via_client(self, sync_client: StreamChat): + wrapped = _b64(_gzip(JSON_BODY)) + assert ( + sync_client.decompress_webhook_body( + wrapped, content_encoding="gzip", payload_encoding="base64" + ) + == JSON_BODY + ) + + def test_verify_and_decode_via_client(self, sync_client: StreamChat): + signature = _sign(JSON_BODY) + compressed = _gzip(JSON_BODY) + assert ( + sync_client.verify_and_decode_webhook( + compressed, signature, content_encoding="gzip" + ) + == JSON_BODY + ) + + def test_verify_and_decode_via_client_signature_mismatch( + self, sync_client: StreamChat + ): + with pytest.raises(WebhookSignatureError): + sync_client.verify_and_decode_webhook(JSON_BODY, "0" * 64) + + +class TestAsyncClientMethods: + async def test_async_verify_and_decode_happy_path(self): + signature = _sign(JSON_BODY) + compressed = _gzip(JSON_BODY) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert ( + client.verify_and_decode_webhook( + compressed, signature, content_encoding="gzip" + ) + == JSON_BODY + ) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py new file mode 100644 index 0000000..941c32b --- /dev/null +++ b/stream_chat/webhook.py @@ -0,0 +1,137 @@ +"""Helpers for verifying and decoding outbound Stream webhook payloads. + +Stream Chat can compress outbound webhook payloads with gzip and, for SQS / SNS +firehose delivery, also wrap the compressed bytes in base64 so they remain +valid UTF-8 over the queue. The helpers in this module mirror the cross-SDK +contract: callers can either decode the body without checking the signature +(:func:`decompress_webhook_body`) or do decode + HMAC verification in one call +(:func:`verify_and_decode_webhook`). + +The functions live outside the client classes so they can be exercised in +isolation, without instantiating an HTTP client. The client methods just +delegate here, passing ``self.api_secret``. +""" + +import base64 +import gzip +import hashlib +import hmac +from typing import Optional, Union + +from stream_chat.base.exceptions import WebhookSignatureError + +_BASE64_ALIASES = frozenset({"base64", "b64"}) +_GZIP_ALIASES = frozenset({"gzip"}) + + +def _to_bytes(body: Union[bytes, str]) -> bytes: + if isinstance(body, str): + return body.encode("utf-8") + if isinstance(body, (bytes, bytearray, memoryview)): + return bytes(body) + raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") + + +def _normalize(value: Optional[str]) -> str: + if value is None: + return "" + return value.strip().lower() + + +def decompress_webhook_body( + body: Union[bytes, str], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, +) -> bytes: + """Decode a (possibly wrapped + compressed) webhook payload. + + Application order: + + 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. + This corresponds to the SQS / SNS envelope, which base64-wraps the + compressed bytes so they stay valid UTF-8 over the queue. + 2. ``content_encoding`` (``"gzip"``) is decompressed. + 3. The resulting raw JSON bytes are returned. The caller can decode them + as UTF-8 or pass them straight to :func:`json.loads` (which accepts + bytes). + + ``None`` or an empty string for either encoding is a no-op, so the regular + HTTP webhook path (no compression, no wrapping) is just an identity + function and stays bytewise identical to today. + + :param body: raw bytes or str received from Stream + :param content_encoding: value of the ``Content-Encoding`` header (``"gzip"``) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``) + :returns: the uncompressed JSON body as bytes + :raises WebhookSignatureError: when the body cannot be decoded with the + requested encodings + :raises ValueError: when an encoding value is not supported by this SDK + """ + data = _to_bytes(body) + + payload_enc = _normalize(payload_encoding) + if payload_enc: + if payload_enc in _BASE64_ALIASES: + try: + data = base64.b64decode(data, validate=True) + except ValueError as exc: + raise WebhookSignatureError( + f"failed to decode webhook body with payload_encoding={payload_enc!r}: {exc}" + ) + else: + raise ValueError( + f"unsupported webhook payload_encoding: {payload_encoding}. " + "This SDK only supports base64." + ) + + content_enc = _normalize(content_encoding) + if content_enc: + if content_enc in _GZIP_ALIASES: + try: + data = gzip.decompress(data) + except (gzip.BadGzipFile, OSError, EOFError) as exc: + raise WebhookSignatureError( + f"failed to decompress webhook body with Content-Encoding={content_enc!r}: {exc}" + ) + else: + raise ValueError( + f"unsupported webhook Content-Encoding: {content_encoding}. " + "This SDK only supports gzip; set webhook_compression_algorithm " + 'to "gzip" on the app config.' + ) + + return data + + +def verify_and_decode_webhook( + body: Union[bytes, str], + x_signature: Union[str, bytes], + api_secret: str, + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, +) -> bytes: + """Decode a webhook payload and verify its HMAC-SHA256 signature. + + The signature is always computed over the **uncompressed** JSON bytes, + so this helper first applies :func:`decompress_webhook_body` and then + compares the digest with ``x_signature`` using :func:`hmac.compare_digest`. + + :returns: the verified, uncompressed JSON body as bytes + :raises WebhookSignatureError: on signature mismatch or any decode error + """ + decoded = decompress_webhook_body( + body, content_encoding=content_encoding, payload_encoding=payload_encoding + ) + + if isinstance(x_signature, bytes): + x_signature = x_signature.decode() + + expected = hmac.new( + key=api_secret.encode(), msg=decoded, digestmod=hashlib.sha256 + ).hexdigest() + + if not hmac.compare_digest(expected, x_signature): + raise WebhookSignatureError("invalid webhook signature") + + return decoded