Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"googleapis-common-protos>=1.70.0",
"culsans>=0.11.0 ; python_full_version < '3.13'",
"packaging>=24.0",
"typing-extensions>=4.0.0",
]

classifiers = [
Expand Down
24 changes: 21 additions & 3 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ async def _dispatch_notification(
) -> bool:
url = push_info.url
try:
headers = None
if push_info.token:
headers = {'X-A2A-Notification-Token': push_info.token}
headers = self._build_headers(push_info)

response = await self._client.post(
url,
Expand All @@ -103,3 +101,23 @@ async def _dispatch_notification(
)
return False
return True

@staticmethod
def _authorization_header(
push_info: TaskPushNotificationConfig,
) -> str | None:
auth = push_info.authentication
if not auth.scheme or not auth.credentials:
return None
return f'{auth.scheme} {auth.credentials}'

def _build_headers(
self, push_info: TaskPushNotificationConfig
) -> dict[str, str] | None:
headers: dict[str, str] = {}
if push_info.token:
headers['X-A2A-Notification-Token'] = push_info.token
authorization = self._authorization_header(push_info)
if authorization:
headers['Authorization'] = authorization
return headers or None
61 changes: 60 additions & 1 deletion tests/server/tasks/test_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BasePushNotificationSender,
)
from a2a.types.a2a_pb2 import (
AuthenticationInfo,
StreamResponse,
Task,
TaskArtifactUpdateEvent,
Expand All @@ -34,8 +35,11 @@ def _create_sample_push_config(
url: str = 'http://example.com/callback',
config_id: str = 'cfg1',
token: str | None = None,
authentication: AuthenticationInfo | None = None,
) -> TaskPushNotificationConfig:
return TaskPushNotificationConfig(id=config_id, url=url, token=token)
return TaskPushNotificationConfig(
id=config_id, url=url, token=token, authentication=authentication
)


class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -101,6 +105,61 @@ async def test_send_notification_with_token_success(self) -> None:
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_auth_header(self) -> None:
task_id = 'task_send_auth'
task_data = _create_sample_task(task_id=task_id)
config = _create_sample_push_config(
url='http://notify.me/here',
token='unique_token',
authentication=AuthenticationInfo(
scheme='Bearer', credentials='token_or_jwt'
),
)
self.mock_config_store.get_info_for_dispatch.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_id, task_data)

self.mock_config_store.get_info_for_dispatch.assert_awaited_once_with(
task_data.id
)
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=MessageToDict(StreamResponse(task=task_data)),
headers={
'X-A2A-Notification-Token': 'unique_token',
'Authorization': 'Bearer token_or_jwt',
},
)
mock_response.raise_for_status.assert_called_once()
Comment thread
wondr-wclabs marked this conversation as resolved.

def test_authorization_header_requires_scheme_and_credentials(self) -> None:
config = _create_sample_push_config()
self.assertIsNone(self.sender._authorization_header(config))

config = _create_sample_push_config(
authentication=AuthenticationInfo(credentials='token_or_jwt')
)
self.assertIsNone(self.sender._authorization_header(config))

config = _create_sample_push_config(
authentication=AuthenticationInfo(scheme='Bearer')
)
self.assertIsNone(self.sender._authorization_header(config))

config = _create_sample_push_config(
authentication=AuthenticationInfo(
scheme='Basic', credentials='token_or_jwt'
)
)
self.assertEqual(
self.sender._authorization_header(config),
'Basic token_or_jwt',
)

async def test_send_notification_no_config(self) -> None:
task_id = 'task_send_no_config'
task_data = _create_sample_task(task_id=task_id)
Expand Down
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading