From 9c7b961f5904a6087114f133257dbb6acc9f1b31 Mon Sep 17 00:00:00 2001 From: Marco Antonio Gil Date: Mon, 11 May 2026 12:34:49 +0200 Subject: [PATCH 1/2] PTHMINT-119: SSE event stream support and EventManager Introduce Server-Sent Events support and higher-level helpers for subscribing to order event streams. Adds multisafepay.client.sse (ServerSentEvent, ServerSentEventStream, StreamingResponse) and an EventStream adapter under api.paths.events.stream to deserialize SSE payloads into Event/EventData models. Adds EventManager for subscribing by token or from an Order response, and exposes get_event_manager() on the Sdk. Extend Order response to include events_token/events_url/events_stream_url with normalization for backward compatibility with legacy event_token/event_stream_url fields. Include an example (examples/event_manager/subscribe_events.py), update examples/order_manager/cloud_pos_order.py, update README with usage and env vars, export SSE types from client.__init__, and add unit and e2e tests for stream and manager behavior. Also contains small formatting/.env and test tweaks. --- .env.example | 2 +- README.md | 63 ++++- examples/event_manager/subscribe_events.py | 107 +++++++++ examples/order_manager/cloud_pos_order.py | 10 +- src/multisafepay/api/paths/events/__init__.py | 14 ++ .../api/paths/events/event_manager.py | 87 +++++++ .../api/paths/events/stream/__init__.py | 132 +++++++++++ .../paths/events/stream/response/__init__.py | 16 ++ .../stream/response/components/__init__.py | 18 ++ .../stream/response/components/event_data.py | 55 +++++ .../api/paths/events/stream/response/event.py | 50 ++++ .../paths/orders/response/order_response.py | 27 ++- src/multisafepay/client/__init__.py | 3 + src/multisafepay/client/client.py | 8 + src/multisafepay/client/sse.py | 224 ++++++++++++++++++ src/multisafepay/sdk.py | 13 + .../e2e/examples/event_manager/conftest.py | 211 +++++++++++++++++ .../event_manager/test_subscribe_events.py | 88 +++++++ .../events/stream/test_unit_event_stream.py | 186 +++++++++++++++ .../path/events/test_unit_event_manager.py | 121 ++++++++++ .../response/test_unit_order_response.py | 74 ++++++ .../unit/client/test_unit_client.py | 16 ++ tests/multisafepay/unit/test_unit_sdk.py | 14 ++ 23 files changed, 1534 insertions(+), 5 deletions(-) create mode 100644 examples/event_manager/subscribe_events.py create mode 100644 src/multisafepay/api/paths/events/__init__.py create mode 100644 src/multisafepay/api/paths/events/event_manager.py create mode 100644 src/multisafepay/api/paths/events/stream/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/components/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/components/event_data.py create mode 100644 src/multisafepay/api/paths/events/stream/response/event.py create mode 100644 src/multisafepay/client/sse.py create mode 100644 tests/multisafepay/e2e/examples/event_manager/conftest.py create mode 100644 tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py create mode 100644 tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py create mode 100644 tests/multisafepay/unit/api/path/events/test_unit_event_manager.py create mode 100644 tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py diff --git a/.env.example b/.env.example index 1e17f89..5acb224 100644 --- a/.env.example +++ b/.env.example @@ -22,4 +22,4 @@ E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT=your-e2e-terminal-group-api-key-for-def # Optional custom base URL for dev-backed examples and terminal endpoint E2E. # MSP_SDK_BUILD_PROFILE=dev # MSP_SDK_ALLOW_CUSTOM_BASE_URL=1 -# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url \ No newline at end of file +# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url diff --git a/README.md b/README.md index b59392f..4f76991 100644 --- a/README.md +++ b/README.md @@ -87,8 +87,9 @@ from multisafepay.client import ScopedCredentialResolver credential_resolver = ScopedCredentialResolver( default_api_key="", + partner_affiliate_api_key="", terminal_group_api_keys={ - "Default": "", + "": "", }, ) @@ -98,6 +99,43 @@ sdk = Sdk( ) ``` +### Event stream subscriptions + +Use `EventManager` to subscribe to MultiSafepay SSE streams directly, or to subscribe from an order response that already contains event credentials. + +```python +from multisafepay import Sdk +from multisafepay.client import ScopedCredentialResolver + + +credential_resolver = ScopedCredentialResolver( + default_api_key="", + terminal_group_api_keys={ + "": "", + }, +) + +sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, +) + +order_manager = sdk.get_order_manager() +event_manager = sdk.get_event_manager() + +create_response = order_manager.create( + request_order=order_request, + terminal_group_id="", +) +order = create_response.get_data() + +with event_manager.subscribe_order_events(order, timeout=45.0) as stream: + for event in stream: + print(event) +``` + +Use `subscribe_events(events_token=..., events_stream_url=...)` when the token and stream URL are already available separately. + ### Development-only custom base URL override By default, the SDK only targets: @@ -143,6 +181,29 @@ In any non-dev profile (including default `release`), custom base URLs are block Go to the folder `examples` to see how to use the SDK. +The event-stream example in `examples/event_manager/subscribe_events.py` requires: + +```bash +export API_KEY="" +export TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="" +export CLOUD_POS_TERMINAL_GROUP_ID="" +export CLOUD_POS_TERMINAL_ID="" +``` + +The SSE E2E test can also run against a dev-backed base URL and optionally resolve the terminal group automatically: + +```bash +export E2E_NO_SANDBOX_BASE_URL="https://dev-api.example.com/v1/" +export MSP_SDK_BUILD_PROFILE=dev +export MSP_SDK_ALLOW_CUSTOM_BASE_URL=1 +export MSP_SDK_CUSTOM_BASE_URL="https://dev-api.example.com/v1/" +export E2E_API_KEY="" +export E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="" +export E2E_CLOUD_POS_TERMINAL_ID="" +# Optional when CLOUD_POS_TERMINAL_GROUP_ID is not set +export E2E_PARTNER_API_KEY="" +``` + ## Code quality checks ### Linting diff --git a/examples/event_manager/subscribe_events.py b/examples/event_manager/subscribe_events.py new file mode 100644 index 0000000..826fa89 --- /dev/null +++ b/examples/event_manager/subscribe_events.py @@ -0,0 +1,107 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Create a Cloud POS order and subscribe to its event stream.""" + +import os +import time + +from dotenv import load_dotenv +from multisafepay import Sdk +from multisafepay.api.paths.orders.request import OrderRequest +from multisafepay.client import ScopedCredentialResolver + +# Load environment variables from a .env file +load_dotenv() + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + raise RuntimeError( + f"Missing required environment variable. Set one of: {', '.join(names)}", + ) + + +DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY") +TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env( + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env( + "CLOUD_POS_TERMINAL_GROUP_ID", +) +TERMINAL_ID = _require_first_env( + "CLOUD_POS_TERMINAL_ID", + "E2E_CLOUD_POS_TERMINAL_ID", +) + +if __name__ == "__main__": + # This example executes Cloud POS calls with terminal-group scope. + scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID + resolver_kwargs = { + "default_api_key": DEFAULT_ACCOUNT_API_KEY, + } + if scoped_terminal_group_id: + resolver_kwargs["terminal_group_api_keys"] = { + scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY, + } + + credential_resolver = ScopedCredentialResolver(**resolver_kwargs) + + multisafepay_sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + order_manager = multisafepay_sdk.get_order_manager() + event_manager = multisafepay_sdk.get_event_manager() + + order_id = f"cloud-pos-{int(time.time())}" + + order_request = ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": TERMINAL_ID, + }, + ) + ) + + create_response = order_manager.create( + order_request, + terminal_group_id=scoped_terminal_group_id, + ) + order = create_response.get_data() + + if order is None: + raise RuntimeError("Order creation did not return order data") + + print(f"Created Cloud POS order: {order.order_id}") + print("Listening for events. Press Ctrl+C to stop.") + + try: + with event_manager.subscribe_order_events(order, timeout=45.0) as stream: + for event in stream: + print(event) + except KeyboardInterrupt: + print("Stream interrupted by user.") diff --git a/examples/order_manager/cloud_pos_order.py b/examples/order_manager/cloud_pos_order.py index 095302c..61906f7 100644 --- a/examples/order_manager/cloud_pos_order.py +++ b/examples/order_manager/cloud_pos_order.py @@ -37,14 +37,20 @@ ) order_manager = multisafepay_sdk.get_order_manager() + order_id = f"cloud-pos-{int(time.time())}" + order_request = ( OrderRequest() .add_type("redirect") - .add_order_id(f"cloud-pos-{int(time.time())}") + .add_order_id(order_id) .add_description("Cloud POS order") .add_amount(100) .add_currency("EUR") - .add_gateway_info({"terminal_id": TERMINAL_ID}) + .add_gateway_info( + { + "terminal_id": TERMINAL_ID, + }, + ) ) create_response = order_manager.create( diff --git a/src/multisafepay/api/paths/events/__init__.py b/src/multisafepay/api/paths/events/__init__.py new file mode 100644 index 0000000..cf12cf7 --- /dev/null +++ b/src/multisafepay/api/paths/events/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Events API endpoints.""" + +from multisafepay.api.paths.events.event_manager import EventManager + +__all__ = [ + "EventManager", +] diff --git a/src/multisafepay/api/paths/events/event_manager.py b/src/multisafepay/api/paths/events/event_manager.py new file mode 100644 index 0000000..8cd490a --- /dev/null +++ b/src/multisafepay/api/paths/events/event_manager.py @@ -0,0 +1,87 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event manager for event stream subscription helpers.""" + +from __future__ import annotations + +from multisafepay.api.base.abstract_manager import AbstractManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.client.client import Client + + +class EventManager(AbstractManager): + """Manages event stream subscriptions for order events.""" + + def __init__(self: EventManager, client: Client) -> None: + """Initialize the EventManager with a client.""" + super().__init__(client) + + def subscribe_events( + self: EventManager, + events_token: str, + events_stream_url: str, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to order events using the SSE stream endpoint. + + Parameters + ---------- + events_token (str): Token returned by order creation for event auth. + events_stream_url (str): Full SSE stream URL. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + return EventStream.open( + events_token=events_token, + events_stream_url=events_stream_url, + last_event_id=last_event_id, + timeout=timeout, + ) + + def subscribe_order_events( + self: EventManager, + order: Order, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to events for an existing order response object. + + Parameters + ---------- + order (Order): Order response that contains event credentials. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + events_token = order.events_token or order.event_token + events_stream_url = order.events_stream_url or order.event_stream_url + + if not events_token or not events_stream_url: + raise ValueError( + "Order does not contain events_token/events_stream_url.", + ) + + return self.subscribe_events( + events_token=events_token, + events_stream_url=events_stream_url, + last_event_id=last_event_id, + timeout=timeout, + ) diff --git a/src/multisafepay/api/paths/events/stream/__init__.py b/src/multisafepay/api/paths/events/stream/__init__.py new file mode 100644 index 0000000..84b2268 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/__init__.py @@ -0,0 +1,132 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event stream contracts for the events path.""" + +from __future__ import annotations + +import json + +from multisafepay.api.paths.events.stream.response import Event, EventData +from multisafepay.client.sse import ( + ServerSentEvent, + ServerSentEventStream, + StreamingResponse, +) +from typing_extensions import Self + + +def _deserialize_event_payload(raw_payload: str | None) -> object | None: + """Parse raw SSE data for this path and fall back to plain text.""" + if raw_payload is None: + return None + + try: + return json.loads(raw_payload) + except json.JSONDecodeError: + return raw_payload + + +def _to_event(server_sent_event: ServerSentEvent) -> Event: + """Adapt a generic SSE message into the events-path response model.""" + event = Event.from_dict( + { + "event": server_sent_event.event, + "data": _deserialize_event_payload(server_sent_event.data), + "event_id": server_sent_event.event_id, + "retry": server_sent_event.retry, + "raw_data": server_sent_event.raw_data, + }, + ) + if event is None: + raise ValueError("Unable to adapt SSE payload to Event.") + return event + + +class EventStream: + """Iterator over events received from the MultiSafepay SSE endpoint.""" + + def __init__( + self: EventStream, + response: StreamingResponse | None = None, + stream: ServerSentEventStream | None = None, + ) -> None: + """Initialize the stream from an HTTP response or generic SSE stream.""" + if stream is not None: + self._stream = stream + return + + if response is None: + raise ValueError( + "response is required when stream is not provided.", + ) + + self._stream = ServerSentEventStream(response=response) + + @classmethod + def _from_stream( + cls: type[EventStream], + stream: ServerSentEventStream, + ) -> EventStream: + """Build an EventStream around an already-open generic SSE stream.""" + return cls(stream=stream) + + @classmethod + def open( + cls: type[EventStream], + events_token: str, + events_stream_url: str, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """Open a new SSE stream using the event token and stream URL.""" + headers = { + "Accept": "text/event-stream", + "Cache-Control": "no-cache", + "Authorization": f"Bearer {events_token}", + } + if last_event_id is not None: + headers["Last-Event-ID"] = last_event_id + + stream = ServerSentEventStream.open( + url=events_stream_url, + headers=headers, + timeout=timeout, + ) + return cls._from_stream(stream) + + @property + def closed(self: EventStream) -> bool: + """Return whether this stream is already closed.""" + return self._stream.closed + + def close(self: EventStream) -> None: + """Close the underlying HTTP response stream.""" + self._stream.close() + + def __iter__(self: EventStream) -> EventStream: + """Return self as an iterator over events.""" + return self + + def __next__(self: EventStream) -> Event: + """Read the next SSE message and return it as an Event.""" + return _to_event(next(self._stream)) + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: EventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +__all__ = [ + "Event", + "EventData", + "EventStream", +] diff --git a/src/multisafepay/api/paths/events/stream/response/__init__.py b/src/multisafepay/api/paths/events/stream/response/__init__.py new file mode 100644 index 0000000..bd51138 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for event stream payloads.""" + +from multisafepay.api.paths.events.stream.response.components import EventData +from multisafepay.api.paths.events.stream.response.event import Event + +__all__ = [ + "Event", + "EventData", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/__init__.py b/src/multisafepay/api/paths/events/stream/response/components/__init__.py new file mode 100644 index 0000000..9b13bc9 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component models for event stream response payloads.""" + +from multisafepay.api.paths.events.stream.response.components.event_data import ( + EventData, + EventDataPayload, +) + +__all__ = [ + "EventData", + "EventDataPayload", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/event_data.py b/src/multisafepay/api/paths/events/stream/response/components/event_data.py new file mode 100644 index 0000000..67eedb1 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/event_data.py @@ -0,0 +1,55 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component response models for event stream payloads.""" + +from __future__ import annotations + +from typing import Optional, Union + +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + +EventDataPayload = Union[ + "EventData", + str, + int, + float, + bool, + list[object], + None, +] + + +class EventData(ResponseModel): + """Structured nested event payload for known order-event attributes.""" + + status: Optional[str] + order_id: Optional[str] + type: Optional[str] + data: EventDataPayload + + @staticmethod + def from_dict(d: dict) -> Optional[EventData]: + """Create EventData from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return EventData(**args) diff --git a/src/multisafepay/api/paths/events/stream/response/event.py b/src/multisafepay/api/paths/events/stream/response/event.py new file mode 100644 index 0000000..52af229 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/event.py @@ -0,0 +1,50 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for order event stream messages.""" + +from __future__ import annotations + +from typing import Optional + +from multisafepay.api.paths.events.stream.response.components import ( + EventData, + EventDataPayload, +) +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + + +class Event(ResponseModel): + """Structured event message returned by the MultiSafepay SSE stream.""" + + event: Optional[str] + data: EventDataPayload + event_id: Optional[str] + retry: Optional[int] + raw_data: Optional[str] + + @staticmethod + def from_dict(d: dict) -> Optional[Event]: + """Create Event from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return Event(**args) diff --git a/src/multisafepay/api/paths/orders/response/order_response.py b/src/multisafepay/api/paths/orders/response/order_response.py index 4581929..89e8f04 100644 --- a/src/multisafepay/api/paths/orders/response/order_response.py +++ b/src/multisafepay/api/paths/orders/response/order_response.py @@ -103,6 +103,11 @@ class Order(ResponseModel): payment_url: Optional[str] cancel_url: Optional[str] session_id: Optional[str] + events_token: Optional[str] + events_url: Optional[str] + events_stream_url: Optional[str] + + # Backward compatibility aliases for older API payloads. event_token: Optional[str] event_url: Optional[str] event_stream_url: Optional[str] @@ -118,6 +123,23 @@ def get_order_id(self: "Order") -> str: """ return self.order_id + @staticmethod + def _normalize_event_fields(d: dict) -> dict: + """Normalize singular/plural event keys for compatibility.""" + mapping = [ + ("events_token", "event_token"), + ("events_url", "event_url"), + ("events_stream_url", "event_stream_url"), + ] + + for plural_key, singular_key in mapping: + if d.get(plural_key) is None and d.get(singular_key) is not None: + d[plural_key] = d[singular_key] + if d.get(singular_key) is None and d.get(plural_key) is not None: + d[singular_key] = d[plural_key] + + return d + @staticmethod def from_dict(d: dict) -> Optional["Order"]: """ @@ -134,7 +156,10 @@ def from_dict(d: dict) -> Optional["Order"]: """ if d is None: return None - order_dependency_adapter = Decorator(dependencies=d) + normalized_dependencies = Order._normalize_event_fields(d.copy()) + order_dependency_adapter = Decorator( + dependencies=normalized_dependencies, + ) dependencies = ( order_dependency_adapter.adapt_order_adjustment( d.get("order_adjustment"), diff --git a/src/multisafepay/client/__init__.py b/src/multisafepay/client/__init__.py index 922e1d2..30fdbf6 100644 --- a/src/multisafepay/client/__init__.py +++ b/src/multisafepay/client/__init__.py @@ -3,9 +3,12 @@ from multisafepay.client.api_key import ApiKey from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver +from multisafepay.client.sse import ServerSentEvent, ServerSentEventStream __all__ = [ "ApiKey", "Client", "ScopedCredentialResolver", + "ServerSentEvent", + "ServerSentEventStream", ] diff --git a/src/multisafepay/client/client.py b/src/multisafepay/client/client.py index ec83993..5854e30 100644 --- a/src/multisafepay/client/client.py +++ b/src/multisafepay/client/client.py @@ -44,6 +44,14 @@ class Client: CUSTOM_BASE_URL_ENV = "MSP_SDK_CUSTOM_BASE_URL" ALLOW_CUSTOM_BASE_URL_ENV = "MSP_SDK_ALLOW_CUSTOM_BASE_URL" + AUTH_SCOPE_DEFAULT = ScopedCredentialResolver.AUTH_SCOPE_DEFAULT + AUTH_SCOPE_PARTNER_AFFILIATE = ( + ScopedCredentialResolver.AUTH_SCOPE_PARTNER_AFFILIATE + ) + AUTH_SCOPE_TERMINAL_GROUP = ( + ScopedCredentialResolver.AUTH_SCOPE_TERMINAL_GROUP + ) + METHOD_POST = "POST" METHOD_GET = "GET" METHOD_PATCH = "PATCH" diff --git a/src/multisafepay/client/sse.py b/src/multisafepay/client/sse.py new file mode 100644 index 0000000..16c0586 --- /dev/null +++ b/src/multisafepay/client/sse.py @@ -0,0 +1,224 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Generic Server-Sent Events support utilities.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import ClassVar, Protocol +from urllib.parse import urlparse +from urllib.request import Request, urlopen + +from typing_extensions import Self + + +class StreamingResponse(Protocol): + """Protocol for the minimal stream response interface used by SSE streams.""" + + def readline(self: StreamingResponse) -> bytes: + """Read one line from the stream response.""" + + def close(self: StreamingResponse) -> None: + """Close the stream response.""" + + +@dataclass(frozen=True) +class ServerSentEvent: + """Generic representation of one SSE message.""" + + event: str | None = None + data: str | None = None + event_id: str | None = None + retry: int | None = None + raw_data: str | None = None + + +@dataclass +class _ServerSentEventBuilder: + """Mutable builder used while parsing one SSE message.""" + + _FIELD_HANDLERS: ClassVar[dict[str, str]] = { + "event": "_consume_event", + "data": "_consume_data", + "id": "_consume_id", + "retry": "_consume_retry", + } + + event_name: str | None = None + event_id: str | None = None + event_retry: int | None = None + data_lines: list[str] = field(default_factory=list) + has_fields: bool = False + + def consume_line(self: _ServerSentEventBuilder, line: str) -> None: + """Consume one SSE line and update the builder state.""" + if line.startswith(":"): + return + + field_name, field_value = _parse_line(line) + if field_name is None: + return + + handler_name = self._FIELD_HANDLERS.get(field_name) + if handler_name is None: + return + + getattr(self, handler_name)(field_value) + + def _consume_event( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE event field.""" + self.has_fields = True + self.event_name = field_value + + def _consume_data(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume one SSE data line.""" + self.has_fields = True + self.data_lines.append(field_value) + + def _consume_id(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume the SSE id field.""" + self.has_fields = True + self.event_id = field_value + + def _consume_retry( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE retry field when it is a valid integer.""" + try: + self.event_retry = int(field_value) + except ValueError: + return + + self.has_fields = True + + def build(self: _ServerSentEventBuilder) -> ServerSentEvent | None: + """Build an immutable SSE message or None when no message exists.""" + if not self.has_fields and not self.data_lines: + return None + + raw_data = "\n".join(self.data_lines) if self.data_lines else None + return ServerSentEvent( + event=self.event_name, + data=raw_data, + event_id=self.event_id, + retry=self.event_retry, + raw_data=raw_data, + ) + + +class ServerSentEventStream: + """Iterator over messages received from a generic SSE endpoint.""" + + def __init__( + self: ServerSentEventStream, + response: StreamingResponse, + ) -> None: + """Initialize the stream from an already-open HTTP response.""" + self._response = response + self._closed = False + + @classmethod + def open( + cls: type[ServerSentEventStream], + url: str, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + ) -> ServerSentEventStream: + """Open a new SSE stream using a URL and optional headers.""" + cls._validate_url(url) + + request = Request( # noqa: S310 + url=url, + headers=headers or {}, + method="GET", + ) + # Keep the response open; close manages the lifecycle. + # pylint: disable=consider-using-with + response = urlopen(request, timeout=timeout) # noqa: S310 + # pylint: enable=consider-using-with + + return cls(response=response) + + @staticmethod + def _validate_url(url: str) -> None: + """Validate the stream URL before opening the network connection.""" + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("Invalid SSE URL.") + + @property + def closed(self: ServerSentEventStream) -> bool: + """Return whether this stream is already closed.""" + return self._closed + + def close(self: ServerSentEventStream) -> None: + """Close the underlying HTTP response stream.""" + if self._closed: + return + + self._response.close() + self._closed = True + + def __iter__(self: ServerSentEventStream) -> ServerSentEventStream: + """Return self as an iterator over SSE messages.""" + return self + + def __next__(self: ServerSentEventStream) -> ServerSentEvent: + """Read the next SSE message and return it.""" + if self._closed: + raise StopIteration + + builder = _ServerSentEventBuilder() + while True: + line = self._read_line() + if line is None: + self.close() + raise StopIteration + + if line == "": + event = builder.build() + if event is not None: + return event + builder = _ServerSentEventBuilder() + continue + + builder.consume_line(line) + + def _read_line(self: ServerSentEventStream) -> str | None: + """Read and decode one line from the underlying stream response.""" + raw_line = self._response.readline() + if not raw_line: + return None + + return raw_line.decode("utf-8", errors="replace").rstrip("\r\n") + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: ServerSentEventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +def _parse_line(line: str) -> tuple[str | None, str]: + """Parse one SSE line into field and value parts.""" + if ":" not in line: + return line or None, "" + + field_name, field_value = line.split(":", 1) + if field_name == "": + return None, "" + if field_value.startswith(" "): + field_value = field_value[1:] + + return field_name, field_value diff --git a/src/multisafepay/sdk.py b/src/multisafepay/sdk.py index 28f42b6..74abe03 100644 --- a/src/multisafepay/sdk.py +++ b/src/multisafepay/sdk.py @@ -11,6 +11,7 @@ from multisafepay.api.paths.auth.auth_manager import AuthManager from multisafepay.api.paths.categories.category_manager import CategoryManager +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.api.paths.gateways.gateway_manager import GatewayManager from multisafepay.api.paths.issuers.issuer_manager import IssuerManager from multisafepay.api.paths.orders.order_manager import OrderManager @@ -178,6 +179,18 @@ def get_category_manager(self: "Sdk") -> CategoryManager: """ return CategoryManager(self.client) + def get_event_manager(self: "Sdk") -> EventManager: + """ + Get the event manager. + + Returns + ------- + EventManager + The event manager instance. + + """ + return EventManager(self.client) + def get_order_manager(self: "Sdk") -> OrderManager: """ Get the order manager. diff --git a/tests/multisafepay/e2e/examples/event_manager/conftest.py b/tests/multisafepay/e2e/examples/event_manager/conftest.py new file mode 100644 index 0000000..8fb1792 --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/conftest.py @@ -0,0 +1,211 @@ +"""Event-manager-specific E2E fixtures and selective skip behavior.""" + +import os +from urllib.parse import urlparse + +import pytest + +from multisafepay.client import ScopedCredentialResolver +from multisafepay.client.client import Client +from multisafepay.client.credential_resolver import AuthScope +from multisafepay.sdk import Sdk + +EVENT_MANAGER_E2E_NODE_PREFIXES = ( + "tests/multisafepay/e2e/examples/event_manager/", +) +DEFAULT_API_KEY_ENVS = ("E2E_API_KEY", "API_KEY") +PARTNER_API_KEY_ENVS = ("E2E_PARTNER_API_KEY", "PARTNER_API_KEY") +TERMINAL_GROUP_API_KEY_ENVS = ( + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +TERMINAL_GROUP_ID_ENVS = ("CLOUD_POS_TERMINAL_GROUP_ID",) +TERMINAL_ID_ENVS = ("E2E_CLOUD_POS_TERMINAL_ID", "CLOUD_POS_TERMINAL_ID") +BASE_URL_ENVS = ("E2E_NO_SANDBOX_BASE_URL", "MSP_SDK_CUSTOM_BASE_URL") + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + msg = f"SSE E2E tests require one of: {', '.join(names)}" + raise pytest.UsageError(msg) + + +def _validate_base_url(base_url: str, *env_names: str) -> str: + parsed = urlparse(base_url) + if parsed.scheme != "https" or not parsed.netloc: + msg = f"{', '.join(env_names)} must be a valid https URL" + raise pytest.UsageError(msg) + + path = parsed.path.rstrip("/") + normalized_path = "/" if not path else f"{path}/" + return f"{parsed.scheme}://{parsed.netloc}{normalized_path}" + + +def _has_event_manager_e2e_env() -> bool: + has_base = bool( + _get_first_env(*DEFAULT_API_KEY_ENVS) + and _get_first_env(*TERMINAL_GROUP_API_KEY_ENVS) + and _get_first_env(*TERMINAL_ID_ENVS) + and _get_first_env(*BASE_URL_ENVS), + ) + if not has_base: + return False + + return bool( + _get_first_env(*TERMINAL_GROUP_ID_ENVS) + or _get_first_env(*PARTNER_API_KEY_ENVS), + ) + + +def _resolve_terminal_group_id( + base_url: str, + default_api_key: str, + partner_api_key: str, + terminal_id: str, +) -> str: + credential_resolver = ScopedCredentialResolver( + default_api_key=default_api_key, + partner_affiliate_api_key=partner_api_key, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = base_url + + limit = 100 + max_pages = 10 + for page in range(1, max_pages + 1): + response = sdk.get_client().create_get_request( + "json/terminals", + params={ + "limit": limit, + "page": page, + }, + auth_scope=AuthScope( + scope=Client.AUTH_SCOPE_PARTNER_AFFILIATE, + ), + ) + if ( + response.get_status_code() != 200 + or not response.get_body_success() + ): + raise pytest.UsageError( + "Unable to resolve Cloud POS terminal group id: " + "GET /json/terminals did not return a successful response", + ) + + listing = response.get_body_data() + if not isinstance(listing, list) or not listing: + break + + for terminal in listing: + listed_terminal_id = terminal.get("id") + terminal_code = terminal.get("code") + if terminal_id not in {listed_terminal_id, terminal_code}: + continue + + group_id = terminal.get("group_id") + if group_id is None: + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id: " + f"terminal {terminal_id} has no group_id", + ) + return str(group_id) + + if len(listing) < limit: + break + + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id for terminal {terminal_id}", + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_id() -> str: + """Return terminal id used by SSE E2E tests.""" + return _require_first_env(*TERMINAL_ID_ENVS) + + +@pytest.fixture(scope="session") +def cloud_pos_base_url() -> str: + """Return dev-backed base URL used by SSE E2E tests.""" + return _validate_base_url( + _require_first_env(*BASE_URL_ENVS), + *BASE_URL_ENVS, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_group_id( + cloud_pos_base_url: str, + cloud_pos_terminal_id: str, +) -> str: + """Return terminal group id from env or resolve it via /json/terminals.""" + explicit_group_id = _get_first_env(*TERMINAL_GROUP_ID_ENVS) + if explicit_group_id: + return explicit_group_id + + return _resolve_terminal_group_id( + base_url=cloud_pos_base_url, + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + partner_api_key=_require_first_env(*PARTNER_API_KEY_ENVS), + terminal_id=cloud_pos_terminal_id, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_events_sdk( + cloud_pos_base_url: str, + cloud_pos_terminal_group_id: str, +) -> Sdk: + """Return SDK configured for Cloud POS creation plus SSE subscription.""" + credential_resolver = ScopedCredentialResolver( + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + terminal_group_api_keys={ + cloud_pos_terminal_group_id: _require_first_env( + *TERMINAL_GROUP_API_KEY_ENVS, + ), + }, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = cloud_pos_base_url + return sdk + + +def pytest_collection_modifyitems( + config: pytest.Config, # noqa: ARG001 + items: list[pytest.Item], +) -> None: + """Skip SSE E2E tests when the required credentials are missing.""" + if _has_event_manager_e2e_env(): + return + + skip = pytest.mark.skip( + reason=( + "SSE E2E tests require E2E_API_KEY or API_KEY, " + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT or " + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT, " + "E2E_CLOUD_POS_TERMINAL_ID or CLOUD_POS_TERMINAL_ID, " + "E2E_NO_SANDBOX_BASE_URL or MSP_SDK_CUSTOM_BASE_URL, " + "and either CLOUD_POS_TERMINAL_GROUP_ID or PARTNER_API_KEY" + ), + ) + for item in items: + if item.nodeid.startswith(EVENT_MANAGER_E2E_NODE_PREFIXES): + item.add_marker(skip) diff --git a/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py new file mode 100644 index 0000000..c7d137b --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py @@ -0,0 +1,88 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""E2E coverage for examples/event_manager/subscribe_events.py.""" + +import time + +import pytest + +from multisafepay.api.base.response.custom_api_response import ( + CustomApiResponse, +) +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.order_manager import OrderManager +from multisafepay.api.paths.orders.request.order_request import OrderRequest +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.sdk import Sdk + + +@pytest.fixture(scope="module") +def order_manager(cloud_pos_events_sdk: Sdk) -> OrderManager: + """Return OrderManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_order_manager() + + +@pytest.fixture(scope="module") +def event_manager(cloud_pos_events_sdk: Sdk) -> EventManager: + """Return EventManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_event_manager() + + +def _build_cloud_pos_order_request( + order_id: str, + terminal_id: str, +) -> OrderRequest: + """Create the Cloud POS order request used by the SSE example.""" + return ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order for event streaming") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": terminal_id, + }, + ) + ) + + +def test_subscribe_order_events_opens_stream( + order_manager: OrderManager, + event_manager: EventManager, + cloud_pos_terminal_group_id: str, + cloud_pos_terminal_id: str, +) -> None: + """Create a Cloud POS order and verify the SSE stream opens successfully.""" + order_id = f"cloud-pos-events-e2e-{int(time.time())}" + + create_response = order_manager.create( + request_order=_build_cloud_pos_order_request( + order_id=order_id, + terminal_id=cloud_pos_terminal_id, + ), + terminal_group_id=cloud_pos_terminal_group_id, + ) + + assert isinstance(create_response, CustomApiResponse) + assert create_response.get_status_code() == 200 + assert create_response.get_body_success() is True + + order = create_response.get_data() + assert isinstance(order, Order) + assert order.order_id == order_id + assert order.events_token or order.event_token + assert order.events_stream_url or order.event_stream_url + + with event_manager.subscribe_order_events(order, timeout=10.0) as stream: + assert isinstance(stream, EventStream) + assert stream.closed is False + + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py new file mode 100644 index 0000000..eb09647 --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py @@ -0,0 +1,186 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event-path stream contracts.""" + +from __future__ import annotations + +import io + +import pytest + +from multisafepay.api.paths.events.stream import Event, EventData, EventStream +from multisafepay.client.sse import ServerSentEventStream + +EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +EVENTS_TOKEN = "events-token" +LAST_EVENT_ID = "last-10" +PING_PAYLOAD = b"data: ping\n\n" + + +class _FakeStreamingResponse: + """Small streaming response stub used for unit testing.""" + + def __init__(self: _FakeStreamingResponse, payload: bytes) -> None: + self._buffer = io.BytesIO(payload) + self.closed = False + + def readline(self: _FakeStreamingResponse) -> bytes: + return self._buffer.readline() + + def close(self: _FakeStreamingResponse) -> None: + self._buffer.close() + self.closed = True + + +def test_open_builds_expected_headers(monkeypatch: pytest.MonkeyPatch) -> None: + """Build event-specific auth headers before opening the generic SSE stream.""" + captured: dict[str, object] = {} + + def fake_open( + url: str, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + ) -> ServerSentEventStream: + captured["url"] = url + captured["headers"] = headers + captured["timeout"] = timeout + return ServerSentEventStream( + response=_FakeStreamingResponse(PING_PAYLOAD), + ) + + monkeypatch.setattr( + "multisafepay.api.paths.events.stream.ServerSentEventStream.open", + fake_open, + ) + + stream = EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + last_event_id=LAST_EVENT_ID, + timeout=9.5, + ) + event = next(stream) + + assert isinstance(event, Event) + assert event.data == "ping" + assert captured["url"] == EVENTS_STREAM_URL + assert captured["timeout"] == 9.5 + headers = captured["headers"] + assert headers["Authorization"] == f"Bearer {EVENTS_TOKEN}" + assert headers["Accept"] == "text/event-stream" + assert headers["Cache-Control"] == "no-cache" + assert headers["Last-Event-ID"] == LAST_EVENT_ID + + +def test_wraps_generic_sse_messages_as_event_contracts() -> None: + """Adapt generic SSE messages into the events-path Event contract.""" + payload = ( + b"event: order.updated\n" + b"id: 15\n" + b"retry: 1000\n" + b'data: {"status": "completed", "order_id": "123"}\n\n' + ) + stream = EventStream(response=_FakeStreamingResponse(payload)) + + event = next(stream) + + assert isinstance(event, Event) + assert event.event == "order.updated" + assert event.event_id == "15" + assert event.retry == 1000 + assert isinstance(event.data, EventData) + assert event.data.status == "completed" + assert event.data.order_id == "123" + + +def test_event_from_dict_adapts_nested_payloads() -> None: + """Build nested event payload models through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.updated", + "data": { + "status": "processing", + "data": { + "status": "completed", + "order_id": "nested-1", + }, + }, + }, + ) + + assert event is not None + assert isinstance(event.data, EventData) + assert event.data.status == "processing" + assert isinstance(event.data.data, EventData) + assert event.data.data.status == "completed" + assert event.data.data.order_id == "nested-1" + + +def test_event_data_from_dict_adapts_nested_list_payloads() -> None: + """Build nested EventData items when payload data contains a list.""" + payload = EventData.from_dict( + { + "status": "processing", + "data": [ + { + "status": "completed", + "order_id": "nested-2", + }, + "keep-me", + ], + }, + ) + + assert payload is not None + assert payload.status == "processing" + assert isinstance(payload.data, list) + assert isinstance(payload.data[0], EventData) + assert payload.data[0].status == "completed" + assert payload.data[0].order_id == "nested-2" + assert payload.data[1] == "keep-me" + + +def test_event_from_dict_adapts_list_payload_items() -> None: + """Build top-level event payload lists through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.batch", + "data": [ + { + "status": "completed", + "order_id": "batch-1", + }, + "tail", + ], + }, + ) + + assert event is not None + assert isinstance(event.data, list) + assert isinstance(event.data[0], EventData) + assert event.data[0].status == "completed" + assert event.data[0].order_id == "batch-1" + assert event.data[1] == "tail" + + +def test_from_dict_returns_none_for_none_payload() -> None: + """Return None when from_dict receives None input.""" + assert Event.from_dict(None) is None + assert EventData.from_dict(None) is None + + +def test_closes_wrapped_stream_on_eof() -> None: + """Close the wrapped generic stream when EOF is reached.""" + response = _FakeStreamingResponse(payload=b"") + stream = EventStream(response=response) + + with pytest.raises(StopIteration): + next(stream) + + assert response.closed is True + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py new file mode 100644 index 0000000..efbe3d0 --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py @@ -0,0 +1,121 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event manager subscription helpers.""" + +from typing import Optional +from unittest.mock import MagicMock + +import pytest + +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.orders.response.order_response import Order + +TEST_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +ORDER_EVENTS_STREAM_URL = "https://stream.example/events/stream/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example/events/stream/" +MISSING_EVENTS_ERROR = "events_token/events_stream_url" + + +def _patch_event_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> tuple[dict[str, object], object]: + """Patch EventStream.open and return capture dict plus sentinel stream.""" + captured: dict[str, object] = {} + expected_stream = object() + + def fake_open( + events_token: str, + events_stream_url: str, + last_event_id: Optional[str] = None, + timeout: float = 30.0, + ) -> object: + captured["events_token"] = events_token + captured["events_stream_url"] = events_stream_url + captured["last_event_id"] = last_event_id + captured["timeout"] = timeout + return expected_stream + + monkeypatch.setattr( + "multisafepay.api.paths.events.event_manager.EventStream.open", + fake_open, + ) + + return captured, expected_stream + + +def test_subscribe_events_delegates_to_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Delegate direct subscriptions to EventStream.open.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + stream = manager.subscribe_events( + events_token="token-abc", + events_stream_url=TEST_EVENTS_STREAM_URL, + last_event_id="last-15", + timeout=10.0, + ) + + assert stream is expected_stream + assert captured["events_token"] == "token-abc" + assert captured["events_stream_url"] == TEST_EVENTS_STREAM_URL + assert captured["last_event_id"] == "last-15" + assert captured["timeout"] == 10.0 + + +def test_subscribe_order_events_uses_plural_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Read events credentials from events_* fields when present.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + order = Order( + order_id="order-1", + events_token="events-token", + events_stream_url=ORDER_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "events-token" + assert captured["events_stream_url"] == ORDER_EVENTS_STREAM_URL + + +def test_subscribe_order_events_falls_back_to_legacy_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Support old event_* field names for backward compatibility.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + order = Order( + order_id="order-2", + event_token="legacy-token", + event_stream_url=LEGACY_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "legacy-token" + assert captured["events_stream_url"] == LEGACY_EVENTS_STREAM_URL + + +def test_subscribe_order_events_requires_token_and_stream_url() -> None: + """Raise a clear error when event credentials are missing in order.""" + manager = EventManager(MagicMock()) + order = Order(order_id="order-3") + + with pytest.raises( + ValueError, + match=MISSING_EVENTS_ERROR, + ): + manager.subscribe_order_events(order) diff --git a/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py new file mode 100644 index 0000000..60a5b5c --- /dev/null +++ b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py @@ -0,0 +1,74 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for order response event fields compatibility.""" + +from typing import Optional + +from multisafepay.api.paths.orders.response.order_response import Order + +PLURAL_EVENTS_TOKEN = "token-123" +PLURAL_EVENTS_URL = "wss://testapi.multisafepay.com/events/" +PLURAL_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +LEGACY_EVENTS_TOKEN = "legacy-token" +LEGACY_EVENTS_URL = "wss://legacy.example.com/events/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example.com/events/stream/" + + +def _assert_event_fields( + order: Optional[Order], + expected_token: str, + expected_url: str, + expected_stream_url: str, +) -> None: + """Assert both plural and legacy event fields are populated consistently.""" + assert order is not None + + assert order.events_token == expected_token + assert order.events_url == expected_url + assert order.events_stream_url == expected_stream_url + assert order.event_token == expected_token + assert order.event_url == expected_url + assert order.event_stream_url == expected_stream_url + + +def test_from_dict_maps_plural_event_fields_to_legacy_aliases() -> None: + """Map events_* fields to both plural and legacy singular attributes.""" + data = { + "order_id": "order-1", + "events_token": PLURAL_EVENTS_TOKEN, + "events_url": PLURAL_EVENTS_URL, + "events_stream_url": PLURAL_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=PLURAL_EVENTS_TOKEN, + expected_url=PLURAL_EVENTS_URL, + expected_stream_url=PLURAL_EVENTS_STREAM_URL, + ) + + +def test_from_dict_maps_legacy_event_fields_to_plural_names() -> None: + """Map event_* fields to newer plural names for consistency.""" + data = { + "order_id": "order-2", + "event_token": LEGACY_EVENTS_TOKEN, + "event_url": LEGACY_EVENTS_URL, + "event_stream_url": LEGACY_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=LEGACY_EVENTS_TOKEN, + expected_url=LEGACY_EVENTS_URL, + expected_stream_url=LEGACY_EVENTS_STREAM_URL, + ) diff --git a/tests/multisafepay/unit/client/test_unit_client.py b/tests/multisafepay/unit/client/test_unit_client.py index b4bbc90..8f6049d 100644 --- a/tests/multisafepay/unit/client/test_unit_client.py +++ b/tests/multisafepay/unit/client/test_unit_client.py @@ -88,6 +88,22 @@ def test_initializes_with_custom_requests_session_via_transport(): session.close() +def test_exposes_auth_scope_aliases_for_backward_compatibility(): + """Test that Client keeps the public auth scope aliases in sync.""" + assert ( + Client.AUTH_SCOPE_DEFAULT + == ScopedCredentialResolver.AUTH_SCOPE_DEFAULT + ) + assert ( + Client.AUTH_SCOPE_PARTNER_AFFILIATE + == ScopedCredentialResolver.AUTH_SCOPE_PARTNER_AFFILIATE + ) + assert ( + Client.AUTH_SCOPE_TERMINAL_GROUP + == ScopedCredentialResolver.AUTH_SCOPE_TERMINAL_GROUP + ) + + def test_defaults_to_test_url(monkeypatch: pytest.MonkeyPatch): """Test that client defaults to test URL when not in production.""" monkeypatch.delenv("MSP_SDK_BUILD_PROFILE", raising=False) diff --git a/tests/multisafepay/unit/test_unit_sdk.py b/tests/multisafepay/unit/test_unit_sdk.py index f160905..a7fa84c 100644 --- a/tests/multisafepay/unit/test_unit_sdk.py +++ b/tests/multisafepay/unit/test_unit_sdk.py @@ -7,9 +7,12 @@ """Unit tests for SDK-level environment/base URL guardrails.""" +from unittest.mock import MagicMock + import pytest from multisafepay import Sdk +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver @@ -122,6 +125,17 @@ def test_sdk_requires_api_key_or_resolver() -> None: Sdk(is_production=False) +def test_sdk_returns_event_manager() -> None: + """Expose EventManager through SDK convenience getter.""" + sdk = Sdk( + api_key="mock_api_key", + is_production=False, + transport=MagicMock(), + ) + + assert isinstance(sdk.get_event_manager(), EventManager) + + def test_sdk_uses_credential_resolver_with_custom_transport() -> None: """Wire resolver + transport together and use resolved auth header.""" transport = _CaptureTransport() From e36297e1b129e8e8745b28afed629080c64bc027 Mon Sep 17 00:00:00 2001 From: Marco Antonio Gil Date: Mon, 11 May 2026 13:24:43 +0200 Subject: [PATCH 2/2] PTHMINT-119: Add streaming support to transport and SSE Introduce streaming HTTP support across the SDK: add HTTPStreamResponse and open_stream(...) to the HTTPTransport protocol and export HTTPStreamResponse from transport package. Update RequestsTransport to share request preparation with a new open_stream implementation and return a _RequestsStreamResponse adapter that exposes readline(), close(), and raise_for_status(). Modify SSE client and EventStream to use the configured transport.open_stream(...) (and raise a clear error if the transport lacks streaming support), and pass the client transport from EventManager to EventStream.open. Update README and unit tests to reflect streaming behavior and ensure streams are closed on failing status checks. --- README.md | 5 + .../api/paths/events/event_manager.py | 1 + .../api/paths/events/stream/__init__.py | 6 +- src/multisafepay/client/sse.py | 46 +++--- src/multisafepay/transport/__init__.py | 3 +- src/multisafepay/transport/http_transport.py | 152 ++++++++++++------ .../transport/requests_transport.py | 116 ++++++++++++- .../events/stream/test_unit_event_stream.py | 122 ++++++++++++-- .../path/events/test_unit_event_manager.py | 18 ++- .../unit/transport/test_unit_transport.py | 35 ++++ 10 files changed, 408 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index 4f76991..08c0cc2 100644 --- a/README.md +++ b/README.md @@ -41,11 +41,14 @@ The SDK uses a small transport abstraction so you can choose (and swap) the unde ### How it works - The SDK expects an object implementing the `HTTPTransport` / `HTTPResponse` protocols defined in `src/multisafepay/transport/http_transport.py`. +- Event stream subscriptions also require the transport to implement `open_stream(...)` and return an `HTTPStreamResponse` with `readline()`, `close()`, and `raise_for_status()`. - If you do not provide a transport, the SDK defaults to `RequestsTransport`. - `requests` is an optional extra: - To use the default transport, install `multisafepay[requests]`. - To avoid `requests`, inject your own transport (for example, `httpx` or `urllib3`). +The built-in `RequestsTransport` supports both regular requests and SSE streams through the same configured `requests.Session`. Custom transports that only implement `request(...)` can still be used for regular API calls, but SSE subscriptions fail explicitly until `open_stream(...)` is added. The SDK does not fall back to another HTTP library for event streams. + ### Custom transport example ```bash @@ -136,6 +139,8 @@ with event_manager.subscribe_order_events(order, timeout=45.0) as stream: Use `subscribe_events(events_token=..., events_stream_url=...)` when the token and stream URL are already available separately. +SSE subscriptions use the same configured SDK transport as regular API calls. With the default transport this reuses the same `requests.Session`; with a custom transport, implement `open_stream(...)` on that transport instead of opening a separate HTTP connection path. + ### Development-only custom base URL override By default, the SDK only targets: diff --git a/src/multisafepay/api/paths/events/event_manager.py b/src/multisafepay/api/paths/events/event_manager.py index 8cd490a..dfc0cdc 100644 --- a/src/multisafepay/api/paths/events/event_manager.py +++ b/src/multisafepay/api/paths/events/event_manager.py @@ -47,6 +47,7 @@ def subscribe_events( return EventStream.open( events_token=events_token, events_stream_url=events_stream_url, + transport=self.client.transport, last_event_id=last_event_id, timeout=timeout, ) diff --git a/src/multisafepay/api/paths/events/stream/__init__.py b/src/multisafepay/api/paths/events/stream/__init__.py index 84b2268..4ad2fca 100644 --- a/src/multisafepay/api/paths/events/stream/__init__.py +++ b/src/multisafepay/api/paths/events/stream/__init__.py @@ -17,6 +17,7 @@ ServerSentEventStream, StreamingResponse, ) +from multisafepay.transport import HTTPTransport from typing_extensions import Self @@ -80,10 +81,12 @@ def open( cls: type[EventStream], events_token: str, events_stream_url: str, + *, + transport: HTTPTransport, last_event_id: str | None = None, timeout: float = 30.0, ) -> EventStream: - """Open a new SSE stream using the event token and stream URL.""" + """Open a new SSE stream using the event token and configured transport.""" headers = { "Accept": "text/event-stream", "Cache-Control": "no-cache", @@ -96,6 +99,7 @@ def open( url=events_stream_url, headers=headers, timeout=timeout, + transport=transport, ) return cls._from_stream(stream) diff --git a/src/multisafepay/client/sse.py b/src/multisafepay/client/sse.py index 16c0586..ec0e807 100644 --- a/src/multisafepay/client/sse.py +++ b/src/multisafepay/client/sse.py @@ -10,21 +10,20 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import ClassVar, Protocol +from typing import ClassVar from urllib.parse import urlparse -from urllib.request import Request, urlopen +from multisafepay.transport.http_transport import ( + HTTPStreamResponse as StreamingResponse, +) +from multisafepay.transport.http_transport import HTTPTransport from typing_extensions import Self - -class StreamingResponse(Protocol): - """Protocol for the minimal stream response interface used by SSE streams.""" - - def readline(self: StreamingResponse) -> bytes: - """Read one line from the stream response.""" - - def close(self: StreamingResponse) -> None: - """Close the stream response.""" +_STREAMING_TRANSPORT_REQUIRED_MESSAGE = ( + "Configured HTTP transport does not support streaming. " + "Implement open_stream(method, url, headers=None, data=None, **kwargs) " + "to use SSE subscriptions." +) @dataclass(frozen=True) @@ -132,19 +131,28 @@ def open( url: str, headers: dict[str, str] | None = None, timeout: float = 30.0, + transport: HTTPTransport | None = None, ) -> ServerSentEventStream: - """Open a new SSE stream using a URL and optional headers.""" + """Open a new SSE stream using the configured HTTP transport.""" cls._validate_url(url) - request = Request( # noqa: S310 - url=url, - headers=headers or {}, + if transport is None: + raise NotImplementedError(_STREAMING_TRANSPORT_REQUIRED_MESSAGE) + + if not hasattr(transport, "open_stream"): + raise NotImplementedError(_STREAMING_TRANSPORT_REQUIRED_MESSAGE) + + response = transport.open_stream( method="GET", + url=url, + headers=headers, + timeout=timeout, ) - # Keep the response open; close manages the lifecycle. - # pylint: disable=consider-using-with - response = urlopen(request, timeout=timeout) # noqa: S310 - # pylint: enable=consider-using-with + try: + response.raise_for_status() + except Exception: + response.close() + raise return cls(response=response) diff --git a/src/multisafepay/transport/__init__.py b/src/multisafepay/transport/__init__.py index 02c5651..a7f734b 100644 --- a/src/multisafepay/transport/__init__.py +++ b/src/multisafepay/transport/__init__.py @@ -7,11 +7,12 @@ """Transport layer module for HTTP communication abstraction.""" -from .http_transport import HTTPResponse, HTTPTransport +from .http_transport import HTTPResponse, HTTPStreamResponse, HTTPTransport from .requests_transport import RequestsTransport __all__ = [ "HTTPTransport", "HTTPResponse", + "HTTPStreamResponse", "RequestsTransport", ] diff --git a/src/multisafepay/transport/http_transport.py b/src/multisafepay/transport/http_transport.py index c1e2747..1b64183 100644 --- a/src/multisafepay/transport/http_transport.py +++ b/src/multisafepay/transport/http_transport.py @@ -10,57 +10,6 @@ from typing import Optional, Protocol -class HTTPTransport(Protocol): - """ - Protocol defining the interface for HTTP transport implementations. - - This abstraction allows the SDK to be decoupled from specific HTTP client - libraries, enabling flexibility to switch between different implementations - (e.g., requests, httpx, urllib) or to provide mock implementations for testing. - - The transport layer follows the Dependency Inversion Principle, allowing - business logic to depend on abstractions rather than concrete implementations. - """ - - def request( - self: "HTTPTransport", - method: str, - url: str, - headers: Optional[dict[str, str]] = None, - data: Optional[str] = None, - **kwargs: object, - ) -> "HTTPResponse": - """ - Execute an HTTP request. - - Parameters - ---------- - method (str): - The HTTP method (GET, POST, PATCH, DELETE, etc.). - url (str): - The full URL for the request. - headers (Optional[dict[str, str]]): - HTTP headers to include in the request, by default None. - data (Optional[str]): - Request body data, by default None. - **kwargs (object): - Additional keyword arguments for transport-specific options, - such as query params, timeout, SSL options, etc. - - Returns - ------- - HTTPResponse - The HTTP response object. - - Raises - ------ - Exception - If the request fails or encounters an error. - - """ - raise NotImplementedError - - class HTTPResponse(Protocol): """ Protocol defining the interface for HTTP response objects. @@ -123,3 +72,104 @@ def raise_for_status(self: "HTTPResponse") -> None: """ raise NotImplementedError + + +class HTTPStreamResponse(HTTPResponse, Protocol): + """Protocol for HTTP responses that can be consumed as a byte stream.""" + + def readline(self: "HTTPStreamResponse") -> bytes: + """Read one line from the streaming response.""" + raise NotImplementedError + + def close(self: "HTTPStreamResponse") -> None: + """Close the streaming response and release resources.""" + raise NotImplementedError + + +class HTTPTransport(Protocol): + """ + Protocol defining the interface for HTTP transport implementations. + + This abstraction allows the SDK to be decoupled from specific HTTP client + libraries, enabling flexibility to switch between different implementations + (e.g., requests, httpx, urllib) or to provide mock implementations for testing. + + The transport layer follows the Dependency Inversion Principle, allowing + business logic to depend on abstractions rather than concrete implementations. + """ + + def request( + self: "HTTPTransport", + method: str, + url: str, + headers: Optional[dict[str, str]] = None, + data: Optional[str] = None, + **kwargs: object, + ) -> "HTTPResponse": + """ + Execute an HTTP request. + + Parameters + ---------- + method (str): + The HTTP method (GET, POST, PATCH, DELETE, etc.). + url (str): + The full URL for the request. + headers (Optional[dict[str, str]]): + HTTP headers to include in the request, by default None. + data (Optional[str]): + Request body data, by default None. + **kwargs (object): + Additional keyword arguments for transport-specific options, + such as query params, timeout, SSL options, etc. + + Returns + ------- + HTTPResponse + The HTTP response object. + + Raises + ------ + Exception + If the request fails or encounters an error. + + """ + raise NotImplementedError + + def open_stream( + self: "HTTPTransport", + method: str, + url: str, + headers: Optional[dict[str, str]] = None, + data: Optional[str] = None, + **kwargs: object, + ) -> "HTTPStreamResponse": + """ + Open an HTTP response stream. + + Parameters + ---------- + method (str): + The HTTP method (GET, POST, etc.). + url (str): + The full URL for the request. + headers (Optional[dict[str, str]]): + HTTP headers to include in the request, by default None. + data (Optional[str]): + Request body data, by default None. + **kwargs (object): + Additional keyword arguments for transport-specific send options, + such as timeout, SSL options, and proxy configuration. + + Returns + ------- + HTTPStreamResponse + The open streaming HTTP response. + + Raises + ------ + Exception + If the stream cannot be opened or encounters an error. + + """ + raise NotImplementedError diff --git a/src/multisafepay/transport/requests_transport.py b/src/multisafepay/transport/requests_transport.py index f2ba58c..2a566c7 100644 --- a/src/multisafepay/transport/requests_transport.py +++ b/src/multisafepay/transport/requests_transport.py @@ -11,12 +11,23 @@ from typing import TYPE_CHECKING, cast +from multisafepay.transport.http_transport import HTTPStreamResponse from typing_extensions import Self _REQUESTS_IMPORT_ERROR: ImportError | None = None +_REQUEST_KWARG_NAMES = frozenset( + { + "auth", + "cookies", + "files", + "hooks", + "json", + "params", + }, +) if TYPE_CHECKING: # pragma: no cover - from requests import Request, Session + from requests import PreparedRequest, Request, Session from requests.models import Response try: @@ -39,6 +50,46 @@ def _raise_requests_missing() -> None: ) from _REQUESTS_IMPORT_ERROR +class _RequestsStreamResponse: + """Adapter exposing a requests streaming response through the SDK contract.""" + + def __init__(self: _RequestsStreamResponse, response: Response) -> None: + self._response = response + + @property + def status_code(self: _RequestsStreamResponse) -> int: + """Return the wrapped response status code.""" + return int(self._response.status_code) + + @property + def headers(self: _RequestsStreamResponse) -> dict[str, str]: + """Return normalized response headers.""" + return { + str(key): str(value) + for key, value in dict(self._response.headers).items() + } + + def json(self: _RequestsStreamResponse) -> object: + """Parse the wrapped response body as JSON.""" + return self._response.json() + + def raise_for_status(self: _RequestsStreamResponse) -> None: + """Raise requests' HTTP error for non-success status codes.""" + self._response.raise_for_status() + + def readline(self: _RequestsStreamResponse) -> bytes: + """Read one line from the underlying streaming body.""" + raw_stream = self._response.raw + if raw_stream is None: + return b"" + + return cast(bytes, raw_stream.readline()) + + def close(self: _RequestsStreamResponse) -> None: + """Close the wrapped streaming response.""" + self._response.close() + + class RequestsTransport: """ Concrete implementation of HTTPTransport using the requests library. @@ -100,16 +151,73 @@ def request( """ if not _HAS_REQUESTS: # pragma: no cover _raise_requests_missing() - session = cast("Session", self.session) - request = Request( + session, prepared_request, send_kwargs = self._prepare_request( + method=method, + url=url, + headers=headers, + data=data, + **kwargs, + ) + return session.send(prepared_request, **send_kwargs) + + def open_stream( + self: RequestsTransport, + method: str, + url: str, + headers: dict[str, str] | None = None, + data: str | None = None, + **kwargs: object, + ) -> HTTPStreamResponse: + """Open a streaming HTTP response using the shared requests session.""" + if not _HAS_REQUESTS: # pragma: no cover + _raise_requests_missing() + + session, prepared_request, send_kwargs = self._prepare_request( method=method, url=url, headers=headers, data=data, **kwargs, ) + send_kwargs["stream"] = True + response = session.send(prepared_request, **send_kwargs) + return _RequestsStreamResponse(response) + + def _prepare_request( + self: RequestsTransport, + method: str, + url: str, + headers: dict[str, str] | None = None, + data: str | None = None, + **kwargs: object, + ) -> tuple[Session, PreparedRequest, dict[str, object]]: + """Prepare a request once so regular and streaming calls share the same path.""" + if not _HAS_REQUESTS: # pragma: no cover + _raise_requests_missing() + + session = cast("Session", self.session) + request_kwargs: dict[str, object] = {} + send_kwargs: dict[str, object] = {} + + if headers is not None: + request_kwargs["headers"] = headers + + if data is not None: + request_kwargs["data"] = data + + for key, value in kwargs.items(): + if key in _REQUEST_KWARG_NAMES: + request_kwargs[key] = value + else: + send_kwargs[key] = value + + request = Request( + method=method, + url=url, + **request_kwargs, + ) prepared_request = session.prepare_request(request) - return session.send(prepared_request) + return session, prepared_request, send_kwargs def close(self: RequestsTransport) -> None: """ diff --git a/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py index eb09647..d6c709e 100644 --- a/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py +++ b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py @@ -10,11 +10,11 @@ from __future__ import annotations import io +import urllib.request import pytest from multisafepay.api.paths.events.stream import Event, EventData, EventStream -from multisafepay.client.sse import ServerSentEventStream EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" EVENTS_TOKEN = "events-token" @@ -28,53 +28,141 @@ class _FakeStreamingResponse: def __init__(self: _FakeStreamingResponse, payload: bytes) -> None: self._buffer = io.BytesIO(payload) self.closed = False + self.status_code = 200 + self.headers: dict[str, str] = {} def readline(self: _FakeStreamingResponse) -> bytes: return self._buffer.readline() + def json(self: _FakeStreamingResponse) -> object: + return {} + + def raise_for_status(self: _FakeStreamingResponse) -> None: + return None + def close(self: _FakeStreamingResponse) -> None: self._buffer.close() self.closed = True -def test_open_builds_expected_headers(monkeypatch: pytest.MonkeyPatch) -> None: - """Build event-specific auth headers before opening the generic SSE stream.""" - captured: dict[str, object] = {} +class _FailingStatusStreamingResponse(_FakeStreamingResponse): + """Streaming response that fails HTTP status validation.""" + + def raise_for_status(self: _FailingStatusStreamingResponse) -> None: + raise RuntimeError("stream failed") + + +class _FakeStreamingTransport: + """Capture stream opens and return a configurable streaming response.""" - def fake_open( + def __init__( + self: _FakeStreamingTransport, + response: _FakeStreamingResponse, + ) -> None: + self._response = response + self.calls: list[dict[str, object]] = [] + + def request( + self: _FakeStreamingTransport, + _method: str, + _url: str, + _headers: dict[str, str] | None = None, + _data: str | None = None, + **_kwargs: object, + ) -> object: + raise AssertionError("request() should not be used for SSE") + + def open_stream( + self: _FakeStreamingTransport, + method: str, url: str, headers: dict[str, str] | None = None, - timeout: float = 30.0, - ) -> ServerSentEventStream: - captured["url"] = url - captured["headers"] = headers - captured["timeout"] = timeout - return ServerSentEventStream( - response=_FakeStreamingResponse(PING_PAYLOAD), + data: str | None = None, + **kwargs: object, + ) -> _FakeStreamingResponse: + self.calls.append( + { + "method": method, + "url": url, + "headers": headers, + "data": data, + "kwargs": kwargs, + }, ) + return self._response - monkeypatch.setattr( - "multisafepay.api.paths.events.stream.ServerSentEventStream.open", - fake_open, - ) + +class _TransportWithoutStreaming: + """Transport double that intentionally does not implement stream support.""" + + def request( + self: _TransportWithoutStreaming, + _method: str, + _url: str, + _headers: dict[str, str] | None = None, + _data: str | None = None, + **_kwargs: object, + ) -> object: + raise AssertionError("request() should not be used for SSE") + + +def test_open_builds_expected_headers(monkeypatch: pytest.MonkeyPatch) -> None: + """Build event-specific auth headers before opening the transport stream.""" + + def fail_urlopen(*_args: object, **_kwargs: object) -> object: + raise AssertionError("urllib should not be used for SSE") + + monkeypatch.setattr(urllib.request, "urlopen", fail_urlopen) + transport = _FakeStreamingTransport(_FakeStreamingResponse(PING_PAYLOAD)) stream = EventStream.open( events_token=EVENTS_TOKEN, events_stream_url=EVENTS_STREAM_URL, + transport=transport, last_event_id=LAST_EVENT_ID, timeout=9.5, ) event = next(stream) + captured = transport.calls[0] assert isinstance(event, Event) assert event.data == "ping" assert captured["url"] == EVENTS_STREAM_URL - assert captured["timeout"] == 9.5 + assert captured["method"] == "GET" headers = captured["headers"] assert headers["Authorization"] == f"Bearer {EVENTS_TOKEN}" assert headers["Accept"] == "text/event-stream" assert headers["Cache-Control"] == "no-cache" assert headers["Last-Event-ID"] == LAST_EVENT_ID + assert captured["kwargs"]["timeout"] == 9.5 + + +def test_open_fails_clearly_when_transport_has_no_stream_support() -> None: + """Fail explicitly instead of bypassing the configured transport.""" + with pytest.raises( + NotImplementedError, + match="does not support streaming", + ): + EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + transport=_TransportWithoutStreaming(), + ) + + +def test_open_closes_stream_when_status_validation_fails() -> None: + """Close an opened transport stream when HTTP status validation fails.""" + response = _FailingStatusStreamingResponse(PING_PAYLOAD) + transport = _FakeStreamingTransport(response) + + with pytest.raises(RuntimeError, match="stream failed"): + EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + transport=transport, + ) + + assert response.closed is True def test_wraps_generic_sse_messages_as_event_contracts() -> None: diff --git a/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py index efbe3d0..935ce73 100644 --- a/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py +++ b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py @@ -31,11 +31,14 @@ def _patch_event_stream_open( def fake_open( events_token: str, events_stream_url: str, + *, + transport: object, last_event_id: Optional[str] = None, timeout: float = 30.0, ) -> object: captured["events_token"] = events_token captured["events_stream_url"] = events_stream_url + captured["transport"] = transport captured["last_event_id"] = last_event_id captured["timeout"] = timeout return expected_stream @@ -53,8 +56,10 @@ def test_subscribe_events_delegates_to_stream_open( ) -> None: """Delegate direct subscriptions to EventStream.open.""" captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() - manager = EventManager(MagicMock()) + manager = EventManager(client) stream = manager.subscribe_events( events_token="token-abc", events_stream_url=TEST_EVENTS_STREAM_URL, @@ -65,6 +70,7 @@ def test_subscribe_events_delegates_to_stream_open( assert stream is expected_stream assert captured["events_token"] == "token-abc" assert captured["events_stream_url"] == TEST_EVENTS_STREAM_URL + assert captured["transport"] is client.transport assert captured["last_event_id"] == "last-15" assert captured["timeout"] == 10.0 @@ -74,8 +80,10 @@ def test_subscribe_order_events_uses_plural_fields( ) -> None: """Read events credentials from events_* fields when present.""" captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() - manager = EventManager(MagicMock()) + manager = EventManager(client) order = Order( order_id="order-1", events_token="events-token", @@ -87,6 +95,7 @@ def test_subscribe_order_events_uses_plural_fields( assert stream is expected_stream assert captured["events_token"] == "events-token" assert captured["events_stream_url"] == ORDER_EVENTS_STREAM_URL + assert captured["transport"] is client.transport def test_subscribe_order_events_falls_back_to_legacy_fields( @@ -94,8 +103,10 @@ def test_subscribe_order_events_falls_back_to_legacy_fields( ) -> None: """Support old event_* field names for backward compatibility.""" captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() - manager = EventManager(MagicMock()) + manager = EventManager(client) order = Order( order_id="order-2", event_token="legacy-token", @@ -107,6 +118,7 @@ def test_subscribe_order_events_falls_back_to_legacy_fields( assert stream is expected_stream assert captured["events_token"] == "legacy-token" assert captured["events_stream_url"] == LEGACY_EVENTS_STREAM_URL + assert captured["transport"] is client.transport def test_subscribe_order_events_requires_token_and_stream_url() -> None: diff --git a/tests/multisafepay/unit/transport/test_unit_transport.py b/tests/multisafepay/unit/transport/test_unit_transport.py index 1735ed5..0648483 100644 --- a/tests/multisafepay/unit/transport/test_unit_transport.py +++ b/tests/multisafepay/unit/transport/test_unit_transport.py @@ -78,6 +78,41 @@ def test_context_manager_closes_session( mock_session.close.assert_called_once() + def test_open_stream_uses_shared_session_send( + self: "TestRequestsTransportWithRequests", + requires_requests: object, + ) -> None: + """Open streaming responses through the same configured session.""" + assert requires_requests is not None + mock_session = Mock() + prepared = object() + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {"Content-Type": "text/event-stream"} + mock_response.raw.readline.return_value = b"data: ping\n" + mock_session.prepare_request.return_value = prepared + mock_session.send.return_value = mock_response + + transport = RequestsTransport(session=mock_session) + response = transport.open_stream( + method="GET", + url="https://api.example.com/events/stream", + headers={"Accept": "text/event-stream"}, + timeout=9.5, + ) + + assert response.readline() == b"data: ping\n" + request_obj = mock_session.prepare_request.call_args.args[0] + assert isinstance(request_obj, requires_requests.Request) + mock_session.send.assert_called_once_with( + prepared, + timeout=9.5, + stream=True, + ) + + response.close() + mock_response.close.assert_called_once() + class TestRequestsTransportWithoutRequests: """Failure modes when `requests` isn't installed and no transport is injected."""