Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT=your-e2e-terminal-group-api-key-for-def
# Optional custom base URL for dev-backed examples and terminal endpoint E2E.
# MSP_SDK_BUILD_PROFILE=dev
# MSP_SDK_ALLOW_CUSTOM_BASE_URL=1
# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url
# MSP_SDK_CUSTOM_BASE_URL=your-custom-base-url
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ The SDK uses a small transport abstraction so you can choose (and swap) the unde
### How it works

- The SDK expects an object implementing the `HTTPTransport` / `HTTPResponse` protocols defined in `src/multisafepay/transport/http_transport.py`.
- Event stream subscriptions also require the transport to implement `open_stream(...)` and return an `HTTPStreamResponse` with `readline()`, `close()`, and `raise_for_status()`.
- If you do not provide a transport, the SDK defaults to `RequestsTransport`.
- `requests` is an optional extra:
- To use the default transport, install `multisafepay[requests]`.
- To avoid `requests`, inject your own transport (for example, `httpx` or `urllib3`).

The built-in `RequestsTransport` supports both regular requests and SSE streams through the same configured `requests.Session`. Custom transports that only implement `request(...)` can still be used for regular API calls, but SSE subscriptions fail explicitly until `open_stream(...)` is added. The SDK does not fall back to another HTTP library for event streams.

### Custom transport example

```bash
Expand Down Expand Up @@ -85,19 +88,59 @@ from multisafepay import Sdk
from multisafepay.client import ScopedCredentialResolver


credential_resolver = ScopedCredentialResolver(
default_api_key="<default_api_key>",
partner_affiliate_api_key="<partner_api_key>",
terminal_group_api_keys={
"<terminal_group_id>": "<terminal_group_api_key>",
},
)

sdk = Sdk(
is_production=False,
credential_resolver=credential_resolver,
)
```

### Event stream subscriptions

Use `EventManager` to subscribe to MultiSafepay SSE streams directly, or to subscribe from an order response that already contains event credentials.

```python
from multisafepay import Sdk
from multisafepay.client import ScopedCredentialResolver


credential_resolver = ScopedCredentialResolver(
default_api_key="<default_api_key>",
terminal_group_api_keys={
"Default": "<terminal_group_api_key>",
"<terminal_group_id>": "<terminal_group_api_key>",
},
)

sdk = Sdk(
is_production=False,
credential_resolver=credential_resolver,
)

order_manager = sdk.get_order_manager()
event_manager = sdk.get_event_manager()

create_response = order_manager.create(
request_order=order_request,
terminal_group_id="<terminal_group_id>",
)
order = create_response.get_data()

with event_manager.subscribe_order_events(order, timeout=45.0) as stream:
for event in stream:
print(event)
```

Use `subscribe_events(events_token=..., events_stream_url=...)` when the token and stream URL are already available separately.

SSE subscriptions use the same configured SDK transport as regular API calls. With the default transport this reuses the same `requests.Session`; with a custom transport, implement `open_stream(...)` on that transport instead of opening a separate HTTP connection path.

### Development-only custom base URL override

By default, the SDK only targets:
Expand Down Expand Up @@ -143,6 +186,29 @@ In any non-dev profile (including default `release`), custom base URLs are block

Go to the folder `examples` to see how to use the SDK.

The event-stream example in `examples/event_manager/subscribe_events.py` requires:

```bash
export API_KEY="<account_api_key>"
export TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="<terminal_group_api_key>"
export CLOUD_POS_TERMINAL_GROUP_ID="<terminal_group_id>"
export CLOUD_POS_TERMINAL_ID="<terminal_id>"
```

The SSE E2E test can also run against a dev-backed base URL and optionally resolve the terminal group automatically:

```bash
export E2E_NO_SANDBOX_BASE_URL="https://dev-api.example.com/v1/"
export MSP_SDK_BUILD_PROFILE=dev
export MSP_SDK_ALLOW_CUSTOM_BASE_URL=1
export MSP_SDK_CUSTOM_BASE_URL="https://dev-api.example.com/v1/"
export E2E_API_KEY="<account_api_key>"
export E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="<terminal_group_api_key>"
export E2E_CLOUD_POS_TERMINAL_ID="<terminal_id>"
# Optional when CLOUD_POS_TERMINAL_GROUP_ID is not set
export E2E_PARTNER_API_KEY="<partner_api_key>"
```

## Code quality checks

### Linting
Expand Down
107 changes: 107 additions & 0 deletions examples/event_manager/subscribe_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Create a Cloud POS order and subscribe to its event stream."""

import os
import time

from dotenv import load_dotenv
from multisafepay import Sdk
from multisafepay.api.paths.orders.request import OrderRequest
from multisafepay.client import ScopedCredentialResolver

# Load environment variables from a .env file
load_dotenv()


def _get_first_env(*names: str) -> str:
for name in names:
value = os.getenv(name, "").strip()
if value:
return value

return ""


def _require_first_env(*names: str) -> str:
value = _get_first_env(*names)
if value:
return value

raise RuntimeError(
f"Missing required environment variable. Set one of: {', '.join(names)}",
)


DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY")
TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env(
"TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
"E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
)
CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env(
"CLOUD_POS_TERMINAL_GROUP_ID",
)
TERMINAL_ID = _require_first_env(
"CLOUD_POS_TERMINAL_ID",
"E2E_CLOUD_POS_TERMINAL_ID",
)

if __name__ == "__main__":
# This example executes Cloud POS calls with terminal-group scope.
scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID
resolver_kwargs = {
"default_api_key": DEFAULT_ACCOUNT_API_KEY,
}
if scoped_terminal_group_id:
resolver_kwargs["terminal_group_api_keys"] = {
scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY,
}

credential_resolver = ScopedCredentialResolver(**resolver_kwargs)

multisafepay_sdk = Sdk(
is_production=False,
credential_resolver=credential_resolver,
)
order_manager = multisafepay_sdk.get_order_manager()
event_manager = multisafepay_sdk.get_event_manager()

order_id = f"cloud-pos-{int(time.time())}"

order_request = (
OrderRequest()
.add_type("redirect")
.add_order_id(order_id)
.add_description("Cloud POS order")
.add_amount(100)
.add_currency("EUR")
.add_gateway_info(
{
"terminal_id": TERMINAL_ID,
},
)
)

create_response = order_manager.create(
order_request,
terminal_group_id=scoped_terminal_group_id,
)
order = create_response.get_data()

if order is None:
raise RuntimeError("Order creation did not return order data")

print(f"Created Cloud POS order: {order.order_id}")
print("Listening for events. Press Ctrl+C to stop.")

try:
with event_manager.subscribe_order_events(order, timeout=45.0) as stream:
for event in stream:
print(event)
except KeyboardInterrupt:
print("Stream interrupted by user.")
10 changes: 8 additions & 2 deletions examples/order_manager/cloud_pos_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@
)
order_manager = multisafepay_sdk.get_order_manager()

order_id = f"cloud-pos-{int(time.time())}"

order_request = (
OrderRequest()
.add_type("redirect")
.add_order_id(f"cloud-pos-{int(time.time())}")
.add_order_id(order_id)
.add_description("Cloud POS order")
.add_amount(100)
.add_currency("EUR")
.add_gateway_info({"terminal_id": TERMINAL_ID})
.add_gateway_info(
{
"terminal_id": TERMINAL_ID,
},
)
)

create_response = order_manager.create(
Expand Down
14 changes: 14 additions & 0 deletions src/multisafepay/api/paths/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Events API endpoints."""

from multisafepay.api.paths.events.event_manager import EventManager

__all__ = [
"EventManager",
]
88 changes: 88 additions & 0 deletions src/multisafepay/api/paths/events/event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Event manager for event stream subscription helpers."""

from __future__ import annotations

from multisafepay.api.base.abstract_manager import AbstractManager
from multisafepay.api.paths.events.stream import EventStream
from multisafepay.api.paths.orders.response.order_response import Order
from multisafepay.client.client import Client


class EventManager(AbstractManager):
"""Manages event stream subscriptions for order events."""

def __init__(self: EventManager, client: Client) -> None:
"""Initialize the EventManager with a client."""
super().__init__(client)

def subscribe_events(
self: EventManager,
events_token: str,
events_stream_url: str,
last_event_id: str | None = None,
timeout: float = 30.0,
) -> EventStream:
"""
Subscribe to order events using the SSE stream endpoint.

Parameters
----------
events_token (str): Token returned by order creation for event auth.
events_stream_url (str): Full SSE stream URL.
last_event_id (str | None): Optional resume cursor.
timeout (float): Socket timeout in seconds.

Returns
-------
EventStream: An iterator over incoming SSE messages.

"""
return EventStream.open(
events_token=events_token,
events_stream_url=events_stream_url,
transport=self.client.transport,
last_event_id=last_event_id,
timeout=timeout,
)

def subscribe_order_events(
self: EventManager,
order: Order,
last_event_id: str | None = None,
timeout: float = 30.0,
) -> EventStream:
"""
Subscribe to events for an existing order response object.

Parameters
----------
order (Order): Order response that contains event credentials.
last_event_id (str | None): Optional resume cursor.
timeout (float): Socket timeout in seconds.

Returns
-------
EventStream: An iterator over incoming SSE messages.

"""
events_token = order.events_token or order.event_token
events_stream_url = order.events_stream_url or order.event_stream_url

if not events_token or not events_stream_url:
raise ValueError(
"Order does not contain events_token/events_stream_url.",
)

return self.subscribe_events(
events_token=events_token,
events_stream_url=events_stream_url,
last_event_id=last_event_id,
timeout=timeout,
)
Loading
Loading