Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions auth/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ against Google Cloud. The other ``gcloud-aio-*`` package components accept a
these components or define one for each. Each component corresponds to a given
Google Cloud service and each service requires various "`scopes`_".

The library supports multiple authentication methods:
- Service account credentials
- Authorized user credentials
- GCE metadata credentials
- Impersonated service account credentials
- External account credentials (for workload identity federation)

|pypi| |pythons|

Installation
Expand Down
160 changes: 160 additions & 0 deletions auth/gcloud/aio/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,166 @@
the **roles/iam.serviceAccountTokenCreator** role on the service account that
is specified in the ``target_principal``.

Basic Usage
~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import Token

# Use default credentials (searches for credentials in standard locations)
token = Token()
access_token = await token.get()

# Use a specific service account file
token = Token(service_file='path/to/service-account.json')
access_token = await token.get()

# Use a custom session
import aiohttp
async with aiohttp.ClientSession() as session:
token = Token(session=session)
access_token = await token.get()

Service Account Authentication
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import Token

# Use service account with specific scopes
token = Token(
service_file='path/to/service-account.json',
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
access_token = await token.get()

Authorized User Authentication
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import Token

# Use authorized user credentials (e.g., from gcloud auth application-default login)
token = Token(service_file='~/.config/gcloud/application_default_credentials.json')
access_token = await token.get()

GCE Metadata Authentication
~~~~~~~~~~~~~~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import Token

# When running on GCE, the metadata server is used automatically
token = Token()
access_token = await token.get()

Service Account Impersonation
~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import Token

# Impersonate a service account
token = Token(
service_file='path/to/source-credentials.json',
target_principal='target-service@project.iam.gserviceaccount.com',
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
access_token = await token.get()

# With delegation chain
token = Token(
service_file='path/to/source-credentials.json',
target_principal='target-service@project.iam.gserviceaccount.com',
delegates=['delegate-service@project.iam.gserviceaccount.com'],
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
access_token = await token.get()

External Account Credentials
---------------------------

The library supports external account credentials for workload identity
federation. This allows you to use credentials from external identity providers
(like AWS, Azure, or OIDC) to access Google Cloud resources.

Example configuration file:

.. code-block:: json

{
"type": "external_account",
"audience": "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/pool/subject",
"subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"type": "url",
"url": "http://169.254.169.254/metadata/identity/oauth2/token",
"headers": {
"Metadata": "true"
}
}
}

Usage:

.. code-block:: python

from gcloud.aio.auth import Token

# Basic usage with external account credentials
token = Token(service_file='path/to/external_account_credentials.json')
access_token = await token.get()

# With specific scopes
token = Token(
service_file='path/to/external_account_credentials.json',
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
access_token = await token.get()

The library supports multiple credential source types:
- URL: Fetches token from a URL endpoint (supports both plaintext and JSON)
- File: Reads token from a file
- Environment: Gets token from an environment variable

IAP Token Usage
~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import IapToken

# Basic IAP token usage
iap_token = IapToken('https://your-iap-secured-service.com')
id_token = await iap_token.get()

# With service account impersonation
iap_token = IapToken(
'https://your-iap-secured-service.com',
impersonating_service_account='service@project.iam.gserviceaccount.com'
)
id_token = await iap_token.get()

IAM Client Usage
~~~~~~~~~~~~~~

.. code-block:: python

from gcloud.aio.auth import IamClient

# List public keys
client = IamClient()
pubkeys = await client.list_public_keys()

# Get a specific public key
key = await client.get_public_key('key-id')

CLI
---

Expand Down
120 changes: 116 additions & 4 deletions auth/gcloud/aio/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@

class Type(enum.Enum):
AUTHORIZED_USER = 'authorized_user'
EXTERNAL_ACCOUNT = 'external_account'
GCE_METADATA = 'gce_metadata'
SERVICE_ACCOUNT = 'service_account'
IMPERSONATED_SERVICE_ACCOUNT = 'impersonated_service_account'
SERVICE_ACCOUNT = 'service_account'


def get_service_data(
Expand All @@ -94,7 +95,7 @@ def get_service_data(
precedence order of various approaches MUST be maintained. It was last
updated to match the following commit:

https://github.com/googleapis/google-auth-library-python/blob/6c1297c4d69ba40a8b9392775c17411253fcd73b/google/auth/_default.py#L504
https://github.com/googleapis/google-auth-library-python/blob/v2.48.0/google/auth/_default.py#L597
"""
# pylint: disable=too-complex
# _get_explicit_environ_credentials()
Expand Down Expand Up @@ -184,6 +185,18 @@ def __init__(
self.service_data = get_service_data(service_file)
if self.service_data:
self.token_type = Type(self.service_data['type'])
if self.token_type == Type.EXTERNAL_ACCOUNT:
required_fields = {
'audience',
'credential_source',
'subject_token_type',
'token_url',
}
if required_fields - self.service_data.keys():
raise ValueError(
'external_account credentials missing required '
f"fields: {', '.join(required_fields)}"
)
self.token_uri = self.service_data.get(
'token_uri', 'https://oauth2.googleapis.com/token',
)
Expand Down Expand Up @@ -366,6 +379,103 @@ async def _refresh_source_authorized_user(
return TokenResponse(value=str(content['access_token']),
expires_in=int(content['expires_in']))

async def _refresh_external_account(self, timeout: int) -> TokenResponse:
if not self.service_data:
raise ValueError('external_account auth requires service_data')

credential_source = self.service_data['credential_source']
subject_token = await self._get_subject_token(
credential_source, timeout,
)

# exchange the subject token for a Google access token
data = {
'audience': self.service_data['audience'],
'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'requested_token_type': (
'urn:ietf:params:oauth:token-type:access_token'
),
'subject_token': subject_token,
'subject_token_type': self.service_data['subject_token_type'],
}
# add optional service account impersonation if configured
if self.service_data.get('service_account_impersonation_url'):
data['service_account_impersonation_url'] = self.service_data[
'service_account_impersonation_url'
]
# add optional client ID and secret if configured
if self.service_data.get('client_id'):
data['client_id'] = self.service_data['client_id']
if self.service_data.get('client_secret'):
data['client_secret'] = self.service_data['client_secret']
# add scopes if configured
if self.scopes:
data['scope'] = ' '.join(self.scopes)

resp = await self.session.post(
self.service_data['token_url'],
data=urlencode(data),
headers=REFRESH_HEADERS,
timeout=timeout,
)
try:
data = await resp.json()
except (AttributeError, TypeError):
data = json.loads(await resp.text())

return TokenResponse(
value=data['access_token'],
expires_in=data.get('expires_in', self.default_token_ttl),
)

async def _get_subject_token(
self, credential_source: dict[str, Any], timeout: int
) -> str:
# pylint: disable=too-complex
source_type = credential_source.get('type')
if not source_type:
# TODO: looks like sometimes the type can be found elsewhere or
# needs to be infered.
# https://github.com/talkiq/gcloud-aio/pull/906/changes#r2206959538
raise ValueError('credential_source is missing type field')

if source_type == 'url':
url = credential_source['url']
format_ = credential_source.get('format', {})
format_type = format_.get('type', 'text')

resp = await self.session.get(
url,
headers=credential_source.get('headers', {}),
timeout=timeout,
)

if format_type == 'json':
try:
data = await resp.json()
except (AttributeError, TypeError):
data = json.loads(await resp.text())

token: str = data[format_['subject_token_field_name']]
return token

try:
return await resp.text()
except (AttributeError, TypeError):
return str(resp.text)

if source_type == 'file':
try:
with open(credential_source['file'], encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
raise ValueError('failed to read subject token file') from e

if source_type == 'environment':
return os.environ[credential_source['environment_id']]

raise ValueError(f'unsupported credential_source type: {source_type}')

async def _refresh_gce_metadata(self, timeout: int) -> TokenResponse:
resp = await self.session.get(
url=self.token_uri, headers=GCE_METADATA_HEADERS, timeout=timeout,
Expand Down Expand Up @@ -433,13 +543,15 @@ async def _impersonate(self, token: TokenResponse,
async def refresh(self, *, timeout: int) -> TokenResponse:
if self.token_type == Type.AUTHORIZED_USER:
resp = await self._refresh_authorized_user(timeout=timeout)
elif self.token_type == Type.EXTERNAL_ACCOUNT:
resp = await self._refresh_external_account(timeout=timeout)
elif self.token_type == Type.GCE_METADATA:
resp = await self._refresh_gce_metadata(timeout=timeout)
elif self.token_type == Type.SERVICE_ACCOUNT:
resp = await self._refresh_service_account(timeout=timeout)
elif self.token_type == Type.IMPERSONATED_SERVICE_ACCOUNT:
# impersonation requires a source authorized user
resp = await self._refresh_source_authorized_user(timeout=timeout)
elif self.token_type == Type.SERVICE_ACCOUNT:
resp = await self._refresh_service_account(timeout=timeout)
else:
raise Exception(f'unsupported token type {self.token_type}')

Expand Down
Loading
Loading