diff --git a/.env.example b/.env.example index 1e17f89..5acb224 100644 --- a/.env.example +++ b/.env.example @@ -22,4 +22,4 @@ E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT=your-e2e-terminal-group-api-key-for-def # Optional custom base URL for dev-backed examples and terminal endpoint E2E. # MSP_SDK_BUILD_PROFILE=dev # MSP_SDK_ALLOW_CUSTOM_BASE_URL=1 -# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url \ No newline at end of file +# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url diff --git a/README.md b/README.md index b59392f..08c0cc2 100644 --- a/README.md +++ b/README.md @@ -41,11 +41,14 @@ The SDK uses a small transport abstraction so you can choose (and swap) the unde ### How it works - The SDK expects an object implementing the `HTTPTransport` / `HTTPResponse` protocols defined in `src/multisafepay/transport/http_transport.py`. +- Event stream subscriptions also require the transport to implement `open_stream(...)` and return an `HTTPStreamResponse` with `readline()`, `close()`, and `raise_for_status()`. - If you do not provide a transport, the SDK defaults to `RequestsTransport`. - `requests` is an optional extra: - To use the default transport, install `multisafepay[requests]`. - To avoid `requests`, inject your own transport (for example, `httpx` or `urllib3`). +The built-in `RequestsTransport` supports both regular requests and SSE streams through the same configured `requests.Session`. Custom transports that only implement `request(...)` can still be used for regular API calls, but SSE subscriptions fail explicitly until `open_stream(...)` is added. The SDK does not fall back to another HTTP library for event streams. + ### Custom transport example ```bash @@ -85,10 +88,33 @@ from multisafepay import Sdk from multisafepay.client import ScopedCredentialResolver +credential_resolver = ScopedCredentialResolver( + default_api_key="", + partner_affiliate_api_key="", + terminal_group_api_keys={ + "": "", + }, +) + +sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, +) +``` + +### Event stream subscriptions + +Use `EventManager` to subscribe to MultiSafepay SSE streams directly, or to subscribe from an order response that already contains event credentials. + +```python +from multisafepay import Sdk +from multisafepay.client import ScopedCredentialResolver + + credential_resolver = ScopedCredentialResolver( default_api_key="", terminal_group_api_keys={ - "Default": "", + "": "", }, ) @@ -96,8 +122,25 @@ sdk = Sdk( is_production=False, credential_resolver=credential_resolver, ) + +order_manager = sdk.get_order_manager() +event_manager = sdk.get_event_manager() + +create_response = order_manager.create( + request_order=order_request, + terminal_group_id="", +) +order = create_response.get_data() + +with event_manager.subscribe_order_events(order, timeout=45.0) as stream: + for event in stream: + print(event) ``` +Use `subscribe_events(events_token=..., events_stream_url=...)` when the token and stream URL are already available separately. + +SSE subscriptions use the same configured SDK transport as regular API calls. With the default transport this reuses the same `requests.Session`; with a custom transport, implement `open_stream(...)` on that transport instead of opening a separate HTTP connection path. + ### Development-only custom base URL override By default, the SDK only targets: @@ -143,6 +186,29 @@ In any non-dev profile (including default `release`), custom base URLs are block Go to the folder `examples` to see how to use the SDK. +The event-stream example in `examples/event_manager/subscribe_events.py` requires: + +```bash +export API_KEY="" +export TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="" +export CLOUD_POS_TERMINAL_GROUP_ID="" +export CLOUD_POS_TERMINAL_ID="" +``` + +The SSE E2E test can also run against a dev-backed base URL and optionally resolve the terminal group automatically: + +```bash +export E2E_NO_SANDBOX_BASE_URL="https://dev-api.example.com/v1/" +export MSP_SDK_BUILD_PROFILE=dev +export MSP_SDK_ALLOW_CUSTOM_BASE_URL=1 +export MSP_SDK_CUSTOM_BASE_URL="https://dev-api.example.com/v1/" +export E2E_API_KEY="" +export E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="" +export E2E_CLOUD_POS_TERMINAL_ID="" +# Optional when CLOUD_POS_TERMINAL_GROUP_ID is not set +export E2E_PARTNER_API_KEY="" +``` + ## Code quality checks ### Linting diff --git a/examples/event_manager/subscribe_events.py b/examples/event_manager/subscribe_events.py new file mode 100644 index 0000000..826fa89 --- /dev/null +++ b/examples/event_manager/subscribe_events.py @@ -0,0 +1,107 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Create a Cloud POS order and subscribe to its event stream.""" + +import os +import time + +from dotenv import load_dotenv +from multisafepay import Sdk +from multisafepay.api.paths.orders.request import OrderRequest +from multisafepay.client import ScopedCredentialResolver + +# Load environment variables from a .env file +load_dotenv() + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + raise RuntimeError( + f"Missing required environment variable. Set one of: {', '.join(names)}", + ) + + +DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY") +TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env( + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env( + "CLOUD_POS_TERMINAL_GROUP_ID", +) +TERMINAL_ID = _require_first_env( + "CLOUD_POS_TERMINAL_ID", + "E2E_CLOUD_POS_TERMINAL_ID", +) + +if __name__ == "__main__": + # This example executes Cloud POS calls with terminal-group scope. + scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID + resolver_kwargs = { + "default_api_key": DEFAULT_ACCOUNT_API_KEY, + } + if scoped_terminal_group_id: + resolver_kwargs["terminal_group_api_keys"] = { + scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY, + } + + credential_resolver = ScopedCredentialResolver(**resolver_kwargs) + + multisafepay_sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + order_manager = multisafepay_sdk.get_order_manager() + event_manager = multisafepay_sdk.get_event_manager() + + order_id = f"cloud-pos-{int(time.time())}" + + order_request = ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": TERMINAL_ID, + }, + ) + ) + + create_response = order_manager.create( + order_request, + terminal_group_id=scoped_terminal_group_id, + ) + order = create_response.get_data() + + if order is None: + raise RuntimeError("Order creation did not return order data") + + print(f"Created Cloud POS order: {order.order_id}") + print("Listening for events. Press Ctrl+C to stop.") + + try: + with event_manager.subscribe_order_events(order, timeout=45.0) as stream: + for event in stream: + print(event) + except KeyboardInterrupt: + print("Stream interrupted by user.") diff --git a/examples/order_manager/cloud_pos_order.py b/examples/order_manager/cloud_pos_order.py index 095302c..61906f7 100644 --- a/examples/order_manager/cloud_pos_order.py +++ b/examples/order_manager/cloud_pos_order.py @@ -37,14 +37,20 @@ ) order_manager = multisafepay_sdk.get_order_manager() + order_id = f"cloud-pos-{int(time.time())}" + order_request = ( OrderRequest() .add_type("redirect") - .add_order_id(f"cloud-pos-{int(time.time())}") + .add_order_id(order_id) .add_description("Cloud POS order") .add_amount(100) .add_currency("EUR") - .add_gateway_info({"terminal_id": TERMINAL_ID}) + .add_gateway_info( + { + "terminal_id": TERMINAL_ID, + }, + ) ) create_response = order_manager.create( diff --git a/src/multisafepay/api/paths/events/__init__.py b/src/multisafepay/api/paths/events/__init__.py new file mode 100644 index 0000000..cf12cf7 --- /dev/null +++ b/src/multisafepay/api/paths/events/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Events API endpoints.""" + +from multisafepay.api.paths.events.event_manager import EventManager + +__all__ = [ + "EventManager", +] diff --git a/src/multisafepay/api/paths/events/event_manager.py b/src/multisafepay/api/paths/events/event_manager.py new file mode 100644 index 0000000..dfc0cdc --- /dev/null +++ b/src/multisafepay/api/paths/events/event_manager.py @@ -0,0 +1,88 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event manager for event stream subscription helpers.""" + +from __future__ import annotations + +from multisafepay.api.base.abstract_manager import AbstractManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.client.client import Client + + +class EventManager(AbstractManager): + """Manages event stream subscriptions for order events.""" + + def __init__(self: EventManager, client: Client) -> None: + """Initialize the EventManager with a client.""" + super().__init__(client) + + def subscribe_events( + self: EventManager, + events_token: str, + events_stream_url: str, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to order events using the SSE stream endpoint. + + Parameters + ---------- + events_token (str): Token returned by order creation for event auth. + events_stream_url (str): Full SSE stream URL. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + return EventStream.open( + events_token=events_token, + events_stream_url=events_stream_url, + transport=self.client.transport, + last_event_id=last_event_id, + timeout=timeout, + ) + + def subscribe_order_events( + self: EventManager, + order: Order, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to events for an existing order response object. + + Parameters + ---------- + order (Order): Order response that contains event credentials. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + events_token = order.events_token or order.event_token + events_stream_url = order.events_stream_url or order.event_stream_url + + if not events_token or not events_stream_url: + raise ValueError( + "Order does not contain events_token/events_stream_url.", + ) + + return self.subscribe_events( + events_token=events_token, + events_stream_url=events_stream_url, + last_event_id=last_event_id, + timeout=timeout, + ) diff --git a/src/multisafepay/api/paths/events/stream/__init__.py b/src/multisafepay/api/paths/events/stream/__init__.py new file mode 100644 index 0000000..4ad2fca --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/__init__.py @@ -0,0 +1,136 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event stream contracts for the events path.""" + +from __future__ import annotations + +import json + +from multisafepay.api.paths.events.stream.response import Event, EventData +from multisafepay.client.sse import ( + ServerSentEvent, + ServerSentEventStream, + StreamingResponse, +) +from multisafepay.transport import HTTPTransport +from typing_extensions import Self + + +def _deserialize_event_payload(raw_payload: str | None) -> object | None: + """Parse raw SSE data for this path and fall back to plain text.""" + if raw_payload is None: + return None + + try: + return json.loads(raw_payload) + except json.JSONDecodeError: + return raw_payload + + +def _to_event(server_sent_event: ServerSentEvent) -> Event: + """Adapt a generic SSE message into the events-path response model.""" + event = Event.from_dict( + { + "event": server_sent_event.event, + "data": _deserialize_event_payload(server_sent_event.data), + "event_id": server_sent_event.event_id, + "retry": server_sent_event.retry, + "raw_data": server_sent_event.raw_data, + }, + ) + if event is None: + raise ValueError("Unable to adapt SSE payload to Event.") + return event + + +class EventStream: + """Iterator over events received from the MultiSafepay SSE endpoint.""" + + def __init__( + self: EventStream, + response: StreamingResponse | None = None, + stream: ServerSentEventStream | None = None, + ) -> None: + """Initialize the stream from an HTTP response or generic SSE stream.""" + if stream is not None: + self._stream = stream + return + + if response is None: + raise ValueError( + "response is required when stream is not provided.", + ) + + self._stream = ServerSentEventStream(response=response) + + @classmethod + def _from_stream( + cls: type[EventStream], + stream: ServerSentEventStream, + ) -> EventStream: + """Build an EventStream around an already-open generic SSE stream.""" + return cls(stream=stream) + + @classmethod + def open( + cls: type[EventStream], + events_token: str, + events_stream_url: str, + *, + transport: HTTPTransport, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """Open a new SSE stream using the event token and configured transport.""" + headers = { + "Accept": "text/event-stream", + "Cache-Control": "no-cache", + "Authorization": f"Bearer {events_token}", + } + if last_event_id is not None: + headers["Last-Event-ID"] = last_event_id + + stream = ServerSentEventStream.open( + url=events_stream_url, + headers=headers, + timeout=timeout, + transport=transport, + ) + return cls._from_stream(stream) + + @property + def closed(self: EventStream) -> bool: + """Return whether this stream is already closed.""" + return self._stream.closed + + def close(self: EventStream) -> None: + """Close the underlying HTTP response stream.""" + self._stream.close() + + def __iter__(self: EventStream) -> EventStream: + """Return self as an iterator over events.""" + return self + + def __next__(self: EventStream) -> Event: + """Read the next SSE message and return it as an Event.""" + return _to_event(next(self._stream)) + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: EventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +__all__ = [ + "Event", + "EventData", + "EventStream", +] diff --git a/src/multisafepay/api/paths/events/stream/response/__init__.py b/src/multisafepay/api/paths/events/stream/response/__init__.py new file mode 100644 index 0000000..bd51138 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for event stream payloads.""" + +from multisafepay.api.paths.events.stream.response.components import EventData +from multisafepay.api.paths.events.stream.response.event import Event + +__all__ = [ + "Event", + "EventData", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/__init__.py b/src/multisafepay/api/paths/events/stream/response/components/__init__.py new file mode 100644 index 0000000..9b13bc9 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component models for event stream response payloads.""" + +from multisafepay.api.paths.events.stream.response.components.event_data import ( + EventData, + EventDataPayload, +) + +__all__ = [ + "EventData", + "EventDataPayload", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/event_data.py b/src/multisafepay/api/paths/events/stream/response/components/event_data.py new file mode 100644 index 0000000..67eedb1 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/event_data.py @@ -0,0 +1,55 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component response models for event stream payloads.""" + +from __future__ import annotations + +from typing import Optional, Union + +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + +EventDataPayload = Union[ + "EventData", + str, + int, + float, + bool, + list[object], + None, +] + + +class EventData(ResponseModel): + """Structured nested event payload for known order-event attributes.""" + + status: Optional[str] + order_id: Optional[str] + type: Optional[str] + data: EventDataPayload + + @staticmethod + def from_dict(d: dict) -> Optional[EventData]: + """Create EventData from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return EventData(**args) diff --git a/src/multisafepay/api/paths/events/stream/response/event.py b/src/multisafepay/api/paths/events/stream/response/event.py new file mode 100644 index 0000000..52af229 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/event.py @@ -0,0 +1,50 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for order event stream messages.""" + +from __future__ import annotations + +from typing import Optional + +from multisafepay.api.paths.events.stream.response.components import ( + EventData, + EventDataPayload, +) +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + + +class Event(ResponseModel): + """Structured event message returned by the MultiSafepay SSE stream.""" + + event: Optional[str] + data: EventDataPayload + event_id: Optional[str] + retry: Optional[int] + raw_data: Optional[str] + + @staticmethod + def from_dict(d: dict) -> Optional[Event]: + """Create Event from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return Event(**args) diff --git a/src/multisafepay/api/paths/orders/response/order_response.py b/src/multisafepay/api/paths/orders/response/order_response.py index 4581929..89e8f04 100644 --- a/src/multisafepay/api/paths/orders/response/order_response.py +++ b/src/multisafepay/api/paths/orders/response/order_response.py @@ -103,6 +103,11 @@ class Order(ResponseModel): payment_url: Optional[str] cancel_url: Optional[str] session_id: Optional[str] + events_token: Optional[str] + events_url: Optional[str] + events_stream_url: Optional[str] + + # Backward compatibility aliases for older API payloads. event_token: Optional[str] event_url: Optional[str] event_stream_url: Optional[str] @@ -118,6 +123,23 @@ def get_order_id(self: "Order") -> str: """ return self.order_id + @staticmethod + def _normalize_event_fields(d: dict) -> dict: + """Normalize singular/plural event keys for compatibility.""" + mapping = [ + ("events_token", "event_token"), + ("events_url", "event_url"), + ("events_stream_url", "event_stream_url"), + ] + + for plural_key, singular_key in mapping: + if d.get(plural_key) is None and d.get(singular_key) is not None: + d[plural_key] = d[singular_key] + if d.get(singular_key) is None and d.get(plural_key) is not None: + d[singular_key] = d[plural_key] + + return d + @staticmethod def from_dict(d: dict) -> Optional["Order"]: """ @@ -134,7 +156,10 @@ def from_dict(d: dict) -> Optional["Order"]: """ if d is None: return None - order_dependency_adapter = Decorator(dependencies=d) + normalized_dependencies = Order._normalize_event_fields(d.copy()) + order_dependency_adapter = Decorator( + dependencies=normalized_dependencies, + ) dependencies = ( order_dependency_adapter.adapt_order_adjustment( d.get("order_adjustment"), diff --git a/src/multisafepay/client/__init__.py b/src/multisafepay/client/__init__.py index 922e1d2..30fdbf6 100644 --- a/src/multisafepay/client/__init__.py +++ b/src/multisafepay/client/__init__.py @@ -3,9 +3,12 @@ from multisafepay.client.api_key import ApiKey from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver +from multisafepay.client.sse import ServerSentEvent, ServerSentEventStream __all__ = [ "ApiKey", "Client", "ScopedCredentialResolver", + "ServerSentEvent", + "ServerSentEventStream", ] diff --git a/src/multisafepay/client/client.py b/src/multisafepay/client/client.py index ec83993..5854e30 100644 --- a/src/multisafepay/client/client.py +++ b/src/multisafepay/client/client.py @@ -44,6 +44,14 @@ class Client: CUSTOM_BASE_URL_ENV = "MSP_SDK_CUSTOM_BASE_URL" ALLOW_CUSTOM_BASE_URL_ENV = "MSP_SDK_ALLOW_CUSTOM_BASE_URL" + AUTH_SCOPE_DEFAULT = ScopedCredentialResolver.AUTH_SCOPE_DEFAULT + AUTH_SCOPE_PARTNER_AFFILIATE = ( + ScopedCredentialResolver.AUTH_SCOPE_PARTNER_AFFILIATE + ) + AUTH_SCOPE_TERMINAL_GROUP = ( + ScopedCredentialResolver.AUTH_SCOPE_TERMINAL_GROUP + ) + METHOD_POST = "POST" METHOD_GET = "GET" METHOD_PATCH = "PATCH" diff --git a/src/multisafepay/client/sse.py b/src/multisafepay/client/sse.py new file mode 100644 index 0000000..ec0e807 --- /dev/null +++ b/src/multisafepay/client/sse.py @@ -0,0 +1,232 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Generic Server-Sent Events support utilities.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import ClassVar +from urllib.parse import urlparse + +from multisafepay.transport.http_transport import ( + HTTPStreamResponse as StreamingResponse, +) +from multisafepay.transport.http_transport import HTTPTransport +from typing_extensions import Self + +_STREAMING_TRANSPORT_REQUIRED_MESSAGE = ( + "Configured HTTP transport does not support streaming. " + "Implement open_stream(method, url, headers=None, data=None, **kwargs) " + "to use SSE subscriptions." +) + + +@dataclass(frozen=True) +class ServerSentEvent: + """Generic representation of one SSE message.""" + + event: str | None = None + data: str | None = None + event_id: str | None = None + retry: int | None = None + raw_data: str | None = None + + +@dataclass +class _ServerSentEventBuilder: + """Mutable builder used while parsing one SSE message.""" + + _FIELD_HANDLERS: ClassVar[dict[str, str]] = { + "event": "_consume_event", + "data": "_consume_data", + "id": "_consume_id", + "retry": "_consume_retry", + } + + event_name: str | None = None + event_id: str | None = None + event_retry: int | None = None + data_lines: list[str] = field(default_factory=list) + has_fields: bool = False + + def consume_line(self: _ServerSentEventBuilder, line: str) -> None: + """Consume one SSE line and update the builder state.""" + if line.startswith(":"): + return + + field_name, field_value = _parse_line(line) + if field_name is None: + return + + handler_name = self._FIELD_HANDLERS.get(field_name) + if handler_name is None: + return + + getattr(self, handler_name)(field_value) + + def _consume_event( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE event field.""" + self.has_fields = True + self.event_name = field_value + + def _consume_data(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume one SSE data line.""" + self.has_fields = True + self.data_lines.append(field_value) + + def _consume_id(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume the SSE id field.""" + self.has_fields = True + self.event_id = field_value + + def _consume_retry( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE retry field when it is a valid integer.""" + try: + self.event_retry = int(field_value) + except ValueError: + return + + self.has_fields = True + + def build(self: _ServerSentEventBuilder) -> ServerSentEvent | None: + """Build an immutable SSE message or None when no message exists.""" + if not self.has_fields and not self.data_lines: + return None + + raw_data = "\n".join(self.data_lines) if self.data_lines else None + return ServerSentEvent( + event=self.event_name, + data=raw_data, + event_id=self.event_id, + retry=self.event_retry, + raw_data=raw_data, + ) + + +class ServerSentEventStream: + """Iterator over messages received from a generic SSE endpoint.""" + + def __init__( + self: ServerSentEventStream, + response: StreamingResponse, + ) -> None: + """Initialize the stream from an already-open HTTP response.""" + self._response = response + self._closed = False + + @classmethod + def open( + cls: type[ServerSentEventStream], + url: str, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + transport: HTTPTransport | None = None, + ) -> ServerSentEventStream: + """Open a new SSE stream using the configured HTTP transport.""" + cls._validate_url(url) + + if transport is None: + raise NotImplementedError(_STREAMING_TRANSPORT_REQUIRED_MESSAGE) + + if not hasattr(transport, "open_stream"): + raise NotImplementedError(_STREAMING_TRANSPORT_REQUIRED_MESSAGE) + + response = transport.open_stream( + method="GET", + url=url, + headers=headers, + timeout=timeout, + ) + try: + response.raise_for_status() + except Exception: + response.close() + raise + + return cls(response=response) + + @staticmethod + def _validate_url(url: str) -> None: + """Validate the stream URL before opening the network connection.""" + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("Invalid SSE URL.") + + @property + def closed(self: ServerSentEventStream) -> bool: + """Return whether this stream is already closed.""" + return self._closed + + def close(self: ServerSentEventStream) -> None: + """Close the underlying HTTP response stream.""" + if self._closed: + return + + self._response.close() + self._closed = True + + def __iter__(self: ServerSentEventStream) -> ServerSentEventStream: + """Return self as an iterator over SSE messages.""" + return self + + def __next__(self: ServerSentEventStream) -> ServerSentEvent: + """Read the next SSE message and return it.""" + if self._closed: + raise StopIteration + + builder = _ServerSentEventBuilder() + while True: + line = self._read_line() + if line is None: + self.close() + raise StopIteration + + if line == "": + event = builder.build() + if event is not None: + return event + builder = _ServerSentEventBuilder() + continue + + builder.consume_line(line) + + def _read_line(self: ServerSentEventStream) -> str | None: + """Read and decode one line from the underlying stream response.""" + raw_line = self._response.readline() + if not raw_line: + return None + + return raw_line.decode("utf-8", errors="replace").rstrip("\r\n") + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: ServerSentEventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +def _parse_line(line: str) -> tuple[str | None, str]: + """Parse one SSE line into field and value parts.""" + if ":" not in line: + return line or None, "" + + field_name, field_value = line.split(":", 1) + if field_name == "": + return None, "" + if field_value.startswith(" "): + field_value = field_value[1:] + + return field_name, field_value diff --git a/src/multisafepay/sdk.py b/src/multisafepay/sdk.py index 28f42b6..74abe03 100644 --- a/src/multisafepay/sdk.py +++ b/src/multisafepay/sdk.py @@ -11,6 +11,7 @@ from multisafepay.api.paths.auth.auth_manager import AuthManager from multisafepay.api.paths.categories.category_manager import CategoryManager +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.api.paths.gateways.gateway_manager import GatewayManager from multisafepay.api.paths.issuers.issuer_manager import IssuerManager from multisafepay.api.paths.orders.order_manager import OrderManager @@ -178,6 +179,18 @@ def get_category_manager(self: "Sdk") -> CategoryManager: """ return CategoryManager(self.client) + def get_event_manager(self: "Sdk") -> EventManager: + """ + Get the event manager. + + Returns + ------- + EventManager + The event manager instance. + + """ + return EventManager(self.client) + def get_order_manager(self: "Sdk") -> OrderManager: """ Get the order manager. diff --git a/src/multisafepay/transport/__init__.py b/src/multisafepay/transport/__init__.py index 02c5651..a7f734b 100644 --- a/src/multisafepay/transport/__init__.py +++ b/src/multisafepay/transport/__init__.py @@ -7,11 +7,12 @@ """Transport layer module for HTTP communication abstraction.""" -from .http_transport import HTTPResponse, HTTPTransport +from .http_transport import HTTPResponse, HTTPStreamResponse, HTTPTransport from .requests_transport import RequestsTransport __all__ = [ "HTTPTransport", "HTTPResponse", + "HTTPStreamResponse", "RequestsTransport", ] diff --git a/src/multisafepay/transport/http_transport.py b/src/multisafepay/transport/http_transport.py index c1e2747..1b64183 100644 --- a/src/multisafepay/transport/http_transport.py +++ b/src/multisafepay/transport/http_transport.py @@ -10,57 +10,6 @@ from typing import Optional, Protocol -class HTTPTransport(Protocol): - """ - Protocol defining the interface for HTTP transport implementations. - - This abstraction allows the SDK to be decoupled from specific HTTP client - libraries, enabling flexibility to switch between different implementations - (e.g., requests, httpx, urllib) or to provide mock implementations for testing. - - The transport layer follows the Dependency Inversion Principle, allowing - business logic to depend on abstractions rather than concrete implementations. - """ - - def request( - self: "HTTPTransport", - method: str, - url: str, - headers: Optional[dict[str, str]] = None, - data: Optional[str] = None, - **kwargs: object, - ) -> "HTTPResponse": - """ - Execute an HTTP request. - - Parameters - ---------- - method (str): - The HTTP method (GET, POST, PATCH, DELETE, etc.). - url (str): - The full URL for the request. - headers (Optional[dict[str, str]]): - HTTP headers to include in the request, by default None. - data (Optional[str]): - Request body data, by default None. - **kwargs (object): - Additional keyword arguments for transport-specific options, - such as query params, timeout, SSL options, etc. - - Returns - ------- - HTTPResponse - The HTTP response object. - - Raises - ------ - Exception - If the request fails or encounters an error. - - """ - raise NotImplementedError - - class HTTPResponse(Protocol): """ Protocol defining the interface for HTTP response objects. @@ -123,3 +72,104 @@ def raise_for_status(self: "HTTPResponse") -> None: """ raise NotImplementedError + + +class HTTPStreamResponse(HTTPResponse, Protocol): + """Protocol for HTTP responses that can be consumed as a byte stream.""" + + def readline(self: "HTTPStreamResponse") -> bytes: + """Read one line from the streaming response.""" + raise NotImplementedError + + def close(self: "HTTPStreamResponse") -> None: + """Close the streaming response and release resources.""" + raise NotImplementedError + + +class HTTPTransport(Protocol): + """ + Protocol defining the interface for HTTP transport implementations. + + This abstraction allows the SDK to be decoupled from specific HTTP client + libraries, enabling flexibility to switch between different implementations + (e.g., requests, httpx, urllib) or to provide mock implementations for testing. + + The transport layer follows the Dependency Inversion Principle, allowing + business logic to depend on abstractions rather than concrete implementations. + """ + + def request( + self: "HTTPTransport", + method: str, + url: str, + headers: Optional[dict[str, str]] = None, + data: Optional[str] = None, + **kwargs: object, + ) -> "HTTPResponse": + """ + Execute an HTTP request. + + Parameters + ---------- + method (str): + The HTTP method (GET, POST, PATCH, DELETE, etc.). + url (str): + The full URL for the request. + headers (Optional[dict[str, str]]): + HTTP headers to include in the request, by default None. + data (Optional[str]): + Request body data, by default None. + **kwargs (object): + Additional keyword arguments for transport-specific options, + such as query params, timeout, SSL options, etc. + + Returns + ------- + HTTPResponse + The HTTP response object. + + Raises + ------ + Exception + If the request fails or encounters an error. + + """ + raise NotImplementedError + + def open_stream( + self: "HTTPTransport", + method: str, + url: str, + headers: Optional[dict[str, str]] = None, + data: Optional[str] = None, + **kwargs: object, + ) -> "HTTPStreamResponse": + """ + Open an HTTP response stream. + + Parameters + ---------- + method (str): + The HTTP method (GET, POST, etc.). + url (str): + The full URL for the request. + headers (Optional[dict[str, str]]): + HTTP headers to include in the request, by default None. + data (Optional[str]): + Request body data, by default None. + **kwargs (object): + Additional keyword arguments for transport-specific send options, + such as timeout, SSL options, and proxy configuration. + + Returns + ------- + HTTPStreamResponse + The open streaming HTTP response. + + Raises + ------ + Exception + If the stream cannot be opened or encounters an error. + + """ + raise NotImplementedError diff --git a/src/multisafepay/transport/requests_transport.py b/src/multisafepay/transport/requests_transport.py index f2ba58c..2a566c7 100644 --- a/src/multisafepay/transport/requests_transport.py +++ b/src/multisafepay/transport/requests_transport.py @@ -11,12 +11,23 @@ from typing import TYPE_CHECKING, cast +from multisafepay.transport.http_transport import HTTPStreamResponse from typing_extensions import Self _REQUESTS_IMPORT_ERROR: ImportError | None = None +_REQUEST_KWARG_NAMES = frozenset( + { + "auth", + "cookies", + "files", + "hooks", + "json", + "params", + }, +) if TYPE_CHECKING: # pragma: no cover - from requests import Request, Session + from requests import PreparedRequest, Request, Session from requests.models import Response try: @@ -39,6 +50,46 @@ def _raise_requests_missing() -> None: ) from _REQUESTS_IMPORT_ERROR +class _RequestsStreamResponse: + """Adapter exposing a requests streaming response through the SDK contract.""" + + def __init__(self: _RequestsStreamResponse, response: Response) -> None: + self._response = response + + @property + def status_code(self: _RequestsStreamResponse) -> int: + """Return the wrapped response status code.""" + return int(self._response.status_code) + + @property + def headers(self: _RequestsStreamResponse) -> dict[str, str]: + """Return normalized response headers.""" + return { + str(key): str(value) + for key, value in dict(self._response.headers).items() + } + + def json(self: _RequestsStreamResponse) -> object: + """Parse the wrapped response body as JSON.""" + return self._response.json() + + def raise_for_status(self: _RequestsStreamResponse) -> None: + """Raise requests' HTTP error for non-success status codes.""" + self._response.raise_for_status() + + def readline(self: _RequestsStreamResponse) -> bytes: + """Read one line from the underlying streaming body.""" + raw_stream = self._response.raw + if raw_stream is None: + return b"" + + return cast(bytes, raw_stream.readline()) + + def close(self: _RequestsStreamResponse) -> None: + """Close the wrapped streaming response.""" + self._response.close() + + class RequestsTransport: """ Concrete implementation of HTTPTransport using the requests library. @@ -100,16 +151,73 @@ def request( """ if not _HAS_REQUESTS: # pragma: no cover _raise_requests_missing() - session = cast("Session", self.session) - request = Request( + session, prepared_request, send_kwargs = self._prepare_request( + method=method, + url=url, + headers=headers, + data=data, + **kwargs, + ) + return session.send(prepared_request, **send_kwargs) + + def open_stream( + self: RequestsTransport, + method: str, + url: str, + headers: dict[str, str] | None = None, + data: str | None = None, + **kwargs: object, + ) -> HTTPStreamResponse: + """Open a streaming HTTP response using the shared requests session.""" + if not _HAS_REQUESTS: # pragma: no cover + _raise_requests_missing() + + session, prepared_request, send_kwargs = self._prepare_request( method=method, url=url, headers=headers, data=data, **kwargs, ) + send_kwargs["stream"] = True + response = session.send(prepared_request, **send_kwargs) + return _RequestsStreamResponse(response) + + def _prepare_request( + self: RequestsTransport, + method: str, + url: str, + headers: dict[str, str] | None = None, + data: str | None = None, + **kwargs: object, + ) -> tuple[Session, PreparedRequest, dict[str, object]]: + """Prepare a request once so regular and streaming calls share the same path.""" + if not _HAS_REQUESTS: # pragma: no cover + _raise_requests_missing() + + session = cast("Session", self.session) + request_kwargs: dict[str, object] = {} + send_kwargs: dict[str, object] = {} + + if headers is not None: + request_kwargs["headers"] = headers + + if data is not None: + request_kwargs["data"] = data + + for key, value in kwargs.items(): + if key in _REQUEST_KWARG_NAMES: + request_kwargs[key] = value + else: + send_kwargs[key] = value + + request = Request( + method=method, + url=url, + **request_kwargs, + ) prepared_request = session.prepare_request(request) - return session.send(prepared_request) + return session, prepared_request, send_kwargs def close(self: RequestsTransport) -> None: """ diff --git a/tests/multisafepay/e2e/examples/event_manager/conftest.py b/tests/multisafepay/e2e/examples/event_manager/conftest.py new file mode 100644 index 0000000..8fb1792 --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/conftest.py @@ -0,0 +1,211 @@ +"""Event-manager-specific E2E fixtures and selective skip behavior.""" + +import os +from urllib.parse import urlparse + +import pytest + +from multisafepay.client import ScopedCredentialResolver +from multisafepay.client.client import Client +from multisafepay.client.credential_resolver import AuthScope +from multisafepay.sdk import Sdk + +EVENT_MANAGER_E2E_NODE_PREFIXES = ( + "tests/multisafepay/e2e/examples/event_manager/", +) +DEFAULT_API_KEY_ENVS = ("E2E_API_KEY", "API_KEY") +PARTNER_API_KEY_ENVS = ("E2E_PARTNER_API_KEY", "PARTNER_API_KEY") +TERMINAL_GROUP_API_KEY_ENVS = ( + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +TERMINAL_GROUP_ID_ENVS = ("CLOUD_POS_TERMINAL_GROUP_ID",) +TERMINAL_ID_ENVS = ("E2E_CLOUD_POS_TERMINAL_ID", "CLOUD_POS_TERMINAL_ID") +BASE_URL_ENVS = ("E2E_NO_SANDBOX_BASE_URL", "MSP_SDK_CUSTOM_BASE_URL") + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + msg = f"SSE E2E tests require one of: {', '.join(names)}" + raise pytest.UsageError(msg) + + +def _validate_base_url(base_url: str, *env_names: str) -> str: + parsed = urlparse(base_url) + if parsed.scheme != "https" or not parsed.netloc: + msg = f"{', '.join(env_names)} must be a valid https URL" + raise pytest.UsageError(msg) + + path = parsed.path.rstrip("/") + normalized_path = "/" if not path else f"{path}/" + return f"{parsed.scheme}://{parsed.netloc}{normalized_path}" + + +def _has_event_manager_e2e_env() -> bool: + has_base = bool( + _get_first_env(*DEFAULT_API_KEY_ENVS) + and _get_first_env(*TERMINAL_GROUP_API_KEY_ENVS) + and _get_first_env(*TERMINAL_ID_ENVS) + and _get_first_env(*BASE_URL_ENVS), + ) + if not has_base: + return False + + return bool( + _get_first_env(*TERMINAL_GROUP_ID_ENVS) + or _get_first_env(*PARTNER_API_KEY_ENVS), + ) + + +def _resolve_terminal_group_id( + base_url: str, + default_api_key: str, + partner_api_key: str, + terminal_id: str, +) -> str: + credential_resolver = ScopedCredentialResolver( + default_api_key=default_api_key, + partner_affiliate_api_key=partner_api_key, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = base_url + + limit = 100 + max_pages = 10 + for page in range(1, max_pages + 1): + response = sdk.get_client().create_get_request( + "json/terminals", + params={ + "limit": limit, + "page": page, + }, + auth_scope=AuthScope( + scope=Client.AUTH_SCOPE_PARTNER_AFFILIATE, + ), + ) + if ( + response.get_status_code() != 200 + or not response.get_body_success() + ): + raise pytest.UsageError( + "Unable to resolve Cloud POS terminal group id: " + "GET /json/terminals did not return a successful response", + ) + + listing = response.get_body_data() + if not isinstance(listing, list) or not listing: + break + + for terminal in listing: + listed_terminal_id = terminal.get("id") + terminal_code = terminal.get("code") + if terminal_id not in {listed_terminal_id, terminal_code}: + continue + + group_id = terminal.get("group_id") + if group_id is None: + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id: " + f"terminal {terminal_id} has no group_id", + ) + return str(group_id) + + if len(listing) < limit: + break + + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id for terminal {terminal_id}", + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_id() -> str: + """Return terminal id used by SSE E2E tests.""" + return _require_first_env(*TERMINAL_ID_ENVS) + + +@pytest.fixture(scope="session") +def cloud_pos_base_url() -> str: + """Return dev-backed base URL used by SSE E2E tests.""" + return _validate_base_url( + _require_first_env(*BASE_URL_ENVS), + *BASE_URL_ENVS, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_group_id( + cloud_pos_base_url: str, + cloud_pos_terminal_id: str, +) -> str: + """Return terminal group id from env or resolve it via /json/terminals.""" + explicit_group_id = _get_first_env(*TERMINAL_GROUP_ID_ENVS) + if explicit_group_id: + return explicit_group_id + + return _resolve_terminal_group_id( + base_url=cloud_pos_base_url, + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + partner_api_key=_require_first_env(*PARTNER_API_KEY_ENVS), + terminal_id=cloud_pos_terminal_id, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_events_sdk( + cloud_pos_base_url: str, + cloud_pos_terminal_group_id: str, +) -> Sdk: + """Return SDK configured for Cloud POS creation plus SSE subscription.""" + credential_resolver = ScopedCredentialResolver( + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + terminal_group_api_keys={ + cloud_pos_terminal_group_id: _require_first_env( + *TERMINAL_GROUP_API_KEY_ENVS, + ), + }, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = cloud_pos_base_url + return sdk + + +def pytest_collection_modifyitems( + config: pytest.Config, # noqa: ARG001 + items: list[pytest.Item], +) -> None: + """Skip SSE E2E tests when the required credentials are missing.""" + if _has_event_manager_e2e_env(): + return + + skip = pytest.mark.skip( + reason=( + "SSE E2E tests require E2E_API_KEY or API_KEY, " + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT or " + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT, " + "E2E_CLOUD_POS_TERMINAL_ID or CLOUD_POS_TERMINAL_ID, " + "E2E_NO_SANDBOX_BASE_URL or MSP_SDK_CUSTOM_BASE_URL, " + "and either CLOUD_POS_TERMINAL_GROUP_ID or PARTNER_API_KEY" + ), + ) + for item in items: + if item.nodeid.startswith(EVENT_MANAGER_E2E_NODE_PREFIXES): + item.add_marker(skip) diff --git a/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py new file mode 100644 index 0000000..c7d137b --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py @@ -0,0 +1,88 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""E2E coverage for examples/event_manager/subscribe_events.py.""" + +import time + +import pytest + +from multisafepay.api.base.response.custom_api_response import ( + CustomApiResponse, +) +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.order_manager import OrderManager +from multisafepay.api.paths.orders.request.order_request import OrderRequest +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.sdk import Sdk + + +@pytest.fixture(scope="module") +def order_manager(cloud_pos_events_sdk: Sdk) -> OrderManager: + """Return OrderManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_order_manager() + + +@pytest.fixture(scope="module") +def event_manager(cloud_pos_events_sdk: Sdk) -> EventManager: + """Return EventManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_event_manager() + + +def _build_cloud_pos_order_request( + order_id: str, + terminal_id: str, +) -> OrderRequest: + """Create the Cloud POS order request used by the SSE example.""" + return ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order for event streaming") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": terminal_id, + }, + ) + ) + + +def test_subscribe_order_events_opens_stream( + order_manager: OrderManager, + event_manager: EventManager, + cloud_pos_terminal_group_id: str, + cloud_pos_terminal_id: str, +) -> None: + """Create a Cloud POS order and verify the SSE stream opens successfully.""" + order_id = f"cloud-pos-events-e2e-{int(time.time())}" + + create_response = order_manager.create( + request_order=_build_cloud_pos_order_request( + order_id=order_id, + terminal_id=cloud_pos_terminal_id, + ), + terminal_group_id=cloud_pos_terminal_group_id, + ) + + assert isinstance(create_response, CustomApiResponse) + assert create_response.get_status_code() == 200 + assert create_response.get_body_success() is True + + order = create_response.get_data() + assert isinstance(order, Order) + assert order.order_id == order_id + assert order.events_token or order.event_token + assert order.events_stream_url or order.event_stream_url + + with event_manager.subscribe_order_events(order, timeout=10.0) as stream: + assert isinstance(stream, EventStream) + assert stream.closed is False + + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py new file mode 100644 index 0000000..d6c709e --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py @@ -0,0 +1,274 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event-path stream contracts.""" + +from __future__ import annotations + +import io +import urllib.request + +import pytest + +from multisafepay.api.paths.events.stream import Event, EventData, EventStream + +EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +EVENTS_TOKEN = "events-token" +LAST_EVENT_ID = "last-10" +PING_PAYLOAD = b"data: ping\n\n" + + +class _FakeStreamingResponse: + """Small streaming response stub used for unit testing.""" + + def __init__(self: _FakeStreamingResponse, payload: bytes) -> None: + self._buffer = io.BytesIO(payload) + self.closed = False + self.status_code = 200 + self.headers: dict[str, str] = {} + + def readline(self: _FakeStreamingResponse) -> bytes: + return self._buffer.readline() + + def json(self: _FakeStreamingResponse) -> object: + return {} + + def raise_for_status(self: _FakeStreamingResponse) -> None: + return None + + def close(self: _FakeStreamingResponse) -> None: + self._buffer.close() + self.closed = True + + +class _FailingStatusStreamingResponse(_FakeStreamingResponse): + """Streaming response that fails HTTP status validation.""" + + def raise_for_status(self: _FailingStatusStreamingResponse) -> None: + raise RuntimeError("stream failed") + + +class _FakeStreamingTransport: + """Capture stream opens and return a configurable streaming response.""" + + def __init__( + self: _FakeStreamingTransport, + response: _FakeStreamingResponse, + ) -> None: + self._response = response + self.calls: list[dict[str, object]] = [] + + def request( + self: _FakeStreamingTransport, + _method: str, + _url: str, + _headers: dict[str, str] | None = None, + _data: str | None = None, + **_kwargs: object, + ) -> object: + raise AssertionError("request() should not be used for SSE") + + def open_stream( + self: _FakeStreamingTransport, + method: str, + url: str, + headers: dict[str, str] | None = None, + data: str | None = None, + **kwargs: object, + ) -> _FakeStreamingResponse: + self.calls.append( + { + "method": method, + "url": url, + "headers": headers, + "data": data, + "kwargs": kwargs, + }, + ) + return self._response + + +class _TransportWithoutStreaming: + """Transport double that intentionally does not implement stream support.""" + + def request( + self: _TransportWithoutStreaming, + _method: str, + _url: str, + _headers: dict[str, str] | None = None, + _data: str | None = None, + **_kwargs: object, + ) -> object: + raise AssertionError("request() should not be used for SSE") + + +def test_open_builds_expected_headers(monkeypatch: pytest.MonkeyPatch) -> None: + """Build event-specific auth headers before opening the transport stream.""" + + def fail_urlopen(*_args: object, **_kwargs: object) -> object: + raise AssertionError("urllib should not be used for SSE") + + monkeypatch.setattr(urllib.request, "urlopen", fail_urlopen) + transport = _FakeStreamingTransport(_FakeStreamingResponse(PING_PAYLOAD)) + + stream = EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + transport=transport, + last_event_id=LAST_EVENT_ID, + timeout=9.5, + ) + event = next(stream) + captured = transport.calls[0] + + assert isinstance(event, Event) + assert event.data == "ping" + assert captured["url"] == EVENTS_STREAM_URL + assert captured["method"] == "GET" + headers = captured["headers"] + assert headers["Authorization"] == f"Bearer {EVENTS_TOKEN}" + assert headers["Accept"] == "text/event-stream" + assert headers["Cache-Control"] == "no-cache" + assert headers["Last-Event-ID"] == LAST_EVENT_ID + assert captured["kwargs"]["timeout"] == 9.5 + + +def test_open_fails_clearly_when_transport_has_no_stream_support() -> None: + """Fail explicitly instead of bypassing the configured transport.""" + with pytest.raises( + NotImplementedError, + match="does not support streaming", + ): + EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + transport=_TransportWithoutStreaming(), + ) + + +def test_open_closes_stream_when_status_validation_fails() -> None: + """Close an opened transport stream when HTTP status validation fails.""" + response = _FailingStatusStreamingResponse(PING_PAYLOAD) + transport = _FakeStreamingTransport(response) + + with pytest.raises(RuntimeError, match="stream failed"): + EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + transport=transport, + ) + + assert response.closed is True + + +def test_wraps_generic_sse_messages_as_event_contracts() -> None: + """Adapt generic SSE messages into the events-path Event contract.""" + payload = ( + b"event: order.updated\n" + b"id: 15\n" + b"retry: 1000\n" + b'data: {"status": "completed", "order_id": "123"}\n\n' + ) + stream = EventStream(response=_FakeStreamingResponse(payload)) + + event = next(stream) + + assert isinstance(event, Event) + assert event.event == "order.updated" + assert event.event_id == "15" + assert event.retry == 1000 + assert isinstance(event.data, EventData) + assert event.data.status == "completed" + assert event.data.order_id == "123" + + +def test_event_from_dict_adapts_nested_payloads() -> None: + """Build nested event payload models through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.updated", + "data": { + "status": "processing", + "data": { + "status": "completed", + "order_id": "nested-1", + }, + }, + }, + ) + + assert event is not None + assert isinstance(event.data, EventData) + assert event.data.status == "processing" + assert isinstance(event.data.data, EventData) + assert event.data.data.status == "completed" + assert event.data.data.order_id == "nested-1" + + +def test_event_data_from_dict_adapts_nested_list_payloads() -> None: + """Build nested EventData items when payload data contains a list.""" + payload = EventData.from_dict( + { + "status": "processing", + "data": [ + { + "status": "completed", + "order_id": "nested-2", + }, + "keep-me", + ], + }, + ) + + assert payload is not None + assert payload.status == "processing" + assert isinstance(payload.data, list) + assert isinstance(payload.data[0], EventData) + assert payload.data[0].status == "completed" + assert payload.data[0].order_id == "nested-2" + assert payload.data[1] == "keep-me" + + +def test_event_from_dict_adapts_list_payload_items() -> None: + """Build top-level event payload lists through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.batch", + "data": [ + { + "status": "completed", + "order_id": "batch-1", + }, + "tail", + ], + }, + ) + + assert event is not None + assert isinstance(event.data, list) + assert isinstance(event.data[0], EventData) + assert event.data[0].status == "completed" + assert event.data[0].order_id == "batch-1" + assert event.data[1] == "tail" + + +def test_from_dict_returns_none_for_none_payload() -> None: + """Return None when from_dict receives None input.""" + assert Event.from_dict(None) is None + assert EventData.from_dict(None) is None + + +def test_closes_wrapped_stream_on_eof() -> None: + """Close the wrapped generic stream when EOF is reached.""" + response = _FakeStreamingResponse(payload=b"") + stream = EventStream(response=response) + + with pytest.raises(StopIteration): + next(stream) + + assert response.closed is True + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py new file mode 100644 index 0000000..935ce73 --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py @@ -0,0 +1,133 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event manager subscription helpers.""" + +from typing import Optional +from unittest.mock import MagicMock + +import pytest + +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.orders.response.order_response import Order + +TEST_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +ORDER_EVENTS_STREAM_URL = "https://stream.example/events/stream/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example/events/stream/" +MISSING_EVENTS_ERROR = "events_token/events_stream_url" + + +def _patch_event_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> tuple[dict[str, object], object]: + """Patch EventStream.open and return capture dict plus sentinel stream.""" + captured: dict[str, object] = {} + expected_stream = object() + + def fake_open( + events_token: str, + events_stream_url: str, + *, + transport: object, + last_event_id: Optional[str] = None, + timeout: float = 30.0, + ) -> object: + captured["events_token"] = events_token + captured["events_stream_url"] = events_stream_url + captured["transport"] = transport + captured["last_event_id"] = last_event_id + captured["timeout"] = timeout + return expected_stream + + monkeypatch.setattr( + "multisafepay.api.paths.events.event_manager.EventStream.open", + fake_open, + ) + + return captured, expected_stream + + +def test_subscribe_events_delegates_to_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Delegate direct subscriptions to EventStream.open.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() + + manager = EventManager(client) + stream = manager.subscribe_events( + events_token="token-abc", + events_stream_url=TEST_EVENTS_STREAM_URL, + last_event_id="last-15", + timeout=10.0, + ) + + assert stream is expected_stream + assert captured["events_token"] == "token-abc" + assert captured["events_stream_url"] == TEST_EVENTS_STREAM_URL + assert captured["transport"] is client.transport + assert captured["last_event_id"] == "last-15" + assert captured["timeout"] == 10.0 + + +def test_subscribe_order_events_uses_plural_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Read events credentials from events_* fields when present.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() + + manager = EventManager(client) + order = Order( + order_id="order-1", + events_token="events-token", + events_stream_url=ORDER_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "events-token" + assert captured["events_stream_url"] == ORDER_EVENTS_STREAM_URL + assert captured["transport"] is client.transport + + +def test_subscribe_order_events_falls_back_to_legacy_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Support old event_* field names for backward compatibility.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + client = MagicMock() + client.transport = object() + + manager = EventManager(client) + order = Order( + order_id="order-2", + event_token="legacy-token", + event_stream_url=LEGACY_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "legacy-token" + assert captured["events_stream_url"] == LEGACY_EVENTS_STREAM_URL + assert captured["transport"] is client.transport + + +def test_subscribe_order_events_requires_token_and_stream_url() -> None: + """Raise a clear error when event credentials are missing in order.""" + manager = EventManager(MagicMock()) + order = Order(order_id="order-3") + + with pytest.raises( + ValueError, + match=MISSING_EVENTS_ERROR, + ): + manager.subscribe_order_events(order) diff --git a/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py new file mode 100644 index 0000000..60a5b5c --- /dev/null +++ b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py @@ -0,0 +1,74 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for order response event fields compatibility.""" + +from typing import Optional + +from multisafepay.api.paths.orders.response.order_response import Order + +PLURAL_EVENTS_TOKEN = "token-123" +PLURAL_EVENTS_URL = "wss://testapi.multisafepay.com/events/" +PLURAL_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +LEGACY_EVENTS_TOKEN = "legacy-token" +LEGACY_EVENTS_URL = "wss://legacy.example.com/events/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example.com/events/stream/" + + +def _assert_event_fields( + order: Optional[Order], + expected_token: str, + expected_url: str, + expected_stream_url: str, +) -> None: + """Assert both plural and legacy event fields are populated consistently.""" + assert order is not None + + assert order.events_token == expected_token + assert order.events_url == expected_url + assert order.events_stream_url == expected_stream_url + assert order.event_token == expected_token + assert order.event_url == expected_url + assert order.event_stream_url == expected_stream_url + + +def test_from_dict_maps_plural_event_fields_to_legacy_aliases() -> None: + """Map events_* fields to both plural and legacy singular attributes.""" + data = { + "order_id": "order-1", + "events_token": PLURAL_EVENTS_TOKEN, + "events_url": PLURAL_EVENTS_URL, + "events_stream_url": PLURAL_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=PLURAL_EVENTS_TOKEN, + expected_url=PLURAL_EVENTS_URL, + expected_stream_url=PLURAL_EVENTS_STREAM_URL, + ) + + +def test_from_dict_maps_legacy_event_fields_to_plural_names() -> None: + """Map event_* fields to newer plural names for consistency.""" + data = { + "order_id": "order-2", + "event_token": LEGACY_EVENTS_TOKEN, + "event_url": LEGACY_EVENTS_URL, + "event_stream_url": LEGACY_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=LEGACY_EVENTS_TOKEN, + expected_url=LEGACY_EVENTS_URL, + expected_stream_url=LEGACY_EVENTS_STREAM_URL, + ) diff --git a/tests/multisafepay/unit/client/test_unit_client.py b/tests/multisafepay/unit/client/test_unit_client.py index b4bbc90..8f6049d 100644 --- a/tests/multisafepay/unit/client/test_unit_client.py +++ b/tests/multisafepay/unit/client/test_unit_client.py @@ -88,6 +88,22 @@ def test_initializes_with_custom_requests_session_via_transport(): session.close() +def test_exposes_auth_scope_aliases_for_backward_compatibility(): + """Test that Client keeps the public auth scope aliases in sync.""" + assert ( + Client.AUTH_SCOPE_DEFAULT + == ScopedCredentialResolver.AUTH_SCOPE_DEFAULT + ) + assert ( + Client.AUTH_SCOPE_PARTNER_AFFILIATE + == ScopedCredentialResolver.AUTH_SCOPE_PARTNER_AFFILIATE + ) + assert ( + Client.AUTH_SCOPE_TERMINAL_GROUP + == ScopedCredentialResolver.AUTH_SCOPE_TERMINAL_GROUP + ) + + def test_defaults_to_test_url(monkeypatch: pytest.MonkeyPatch): """Test that client defaults to test URL when not in production.""" monkeypatch.delenv("MSP_SDK_BUILD_PROFILE", raising=False) diff --git a/tests/multisafepay/unit/test_unit_sdk.py b/tests/multisafepay/unit/test_unit_sdk.py index f160905..a7fa84c 100644 --- a/tests/multisafepay/unit/test_unit_sdk.py +++ b/tests/multisafepay/unit/test_unit_sdk.py @@ -7,9 +7,12 @@ """Unit tests for SDK-level environment/base URL guardrails.""" +from unittest.mock import MagicMock + import pytest from multisafepay import Sdk +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver @@ -122,6 +125,17 @@ def test_sdk_requires_api_key_or_resolver() -> None: Sdk(is_production=False) +def test_sdk_returns_event_manager() -> None: + """Expose EventManager through SDK convenience getter.""" + sdk = Sdk( + api_key="mock_api_key", + is_production=False, + transport=MagicMock(), + ) + + assert isinstance(sdk.get_event_manager(), EventManager) + + def test_sdk_uses_credential_resolver_with_custom_transport() -> None: """Wire resolver + transport together and use resolved auth header.""" transport = _CaptureTransport() diff --git a/tests/multisafepay/unit/transport/test_unit_transport.py b/tests/multisafepay/unit/transport/test_unit_transport.py index 1735ed5..0648483 100644 --- a/tests/multisafepay/unit/transport/test_unit_transport.py +++ b/tests/multisafepay/unit/transport/test_unit_transport.py @@ -78,6 +78,41 @@ def test_context_manager_closes_session( mock_session.close.assert_called_once() + def test_open_stream_uses_shared_session_send( + self: "TestRequestsTransportWithRequests", + requires_requests: object, + ) -> None: + """Open streaming responses through the same configured session.""" + assert requires_requests is not None + mock_session = Mock() + prepared = object() + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {"Content-Type": "text/event-stream"} + mock_response.raw.readline.return_value = b"data: ping\n" + mock_session.prepare_request.return_value = prepared + mock_session.send.return_value = mock_response + + transport = RequestsTransport(session=mock_session) + response = transport.open_stream( + method="GET", + url="https://api.example.com/events/stream", + headers={"Accept": "text/event-stream"}, + timeout=9.5, + ) + + assert response.readline() == b"data: ping\n" + request_obj = mock_session.prepare_request.call_args.args[0] + assert isinstance(request_obj, requires_requests.Request) + mock_session.send.assert_called_once_with( + prepared, + timeout=9.5, + stream=True, + ) + + response.close() + mock_response.close.assert_called_once() + class TestRequestsTransportWithoutRequests: """Failure modes when `requests` isn't installed and no transport is injected."""