Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# pytype: skip-file

import logging
import subprocess
import sys
import textwrap
import unittest

import mock
Expand All @@ -38,6 +41,52 @@
# pylint: enable=wrong-import-order, wrong-import-position


@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported')
class GCSFileSystemLazyImportTest(unittest.TestCase):
def test_get_filesystem_without_gcp_extra(self):
# Regression test for https://github.com/apache/beam/issues/37445.
#
# Without the gcp extra installed, GCSFileSystem must still import and
# register so FileSystems.get_filesystem('gs://...') returns it (like
# S3FileSystem), deferring the dependency error to usage. The deferred
# behavior now lives in gcsio (see gcsio_test); here we only assert that the
# filesystem still registers and that using it raises a clear error.
#
# This runs in a subprocess because the behavior is decided at import time
# and this test environment has the gcp extra installed; we simulate its
# absence by blocking the google.cloud import in a fresh interpreter.
script = textwrap.dedent(
"""
import sys
# Simulate the gcp extra not being installed.
sys.modules['google.cloud'] = None

from apache_beam.io.filesystems import FileSystems
fs = FileSystems.get_filesystem('gs://bucket/object')
assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__
assert fs.scheme() == 'gs'

# Using the filesystem raises a clear error (deferred validation).
try:
fs._gcsIO()
except RuntimeError:
pass
else:
raise AssertionError('expected RuntimeError using GCS without gcp')
print('OK')
""")
result = subprocess.run([sys.executable, '-c', script],
capture_output=True,
text=True,
check=False)
self.assertEqual(
result.returncode,
0,
msg='subprocess failed:\nstdout=%s\nstderr=%s' %
(result.stdout, result.stderr))
self.assertIn('OK', result.stdout)


@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
class GCSFileSystemTest(unittest.TestCase):
def setUp(self):
Expand Down
45 changes: 35 additions & 10 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,41 @@
from typing import Optional
from typing import Union

from google.api_core.exceptions import RetryError
from google.cloud import storage
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import from_http_response
from google.cloud.storage.fileio import BlobReader
from google.cloud.storage.fileio import BlobWriter
from google.cloud.storage.retry import DEFAULT_RETRY

from apache_beam import version as beam_version
from apache_beam.internal.gcp import auth
from apache_beam.io.gcp import gcsio_retry
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

try:
# pylint: disable=wrong-import-order, wrong-import-position
# pylint: disable=ungrouped-imports
from google.api_core.exceptions import RetryError
from google.cloud import storage
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import from_http_response
from google.cloud.storage.fileio import BlobReader
from google.cloud.storage.fileio import BlobWriter
from google.cloud.storage.retry import DEFAULT_RETRY
from apache_beam.io.gcp import gcsio_retry
GOOGLE_CLOUD_STORAGE_INSTALLED = True
except ImportError:
# The gcp extra (google-cloud-storage and friends) may not be installed. The
# module must still import so GcsIO can be looked up; the dependency error is
# deferred until a GcsIO is actually created (mirroring s3io's BOTO3_INSTALLED
# behavior). See https://github.com/apache/beam/issues/37445.
RetryError = None
storage = None
NotFound = None
from_http_response = None
# BlobReader/BlobWriter are used as base classes below, so they need a
# concrete fallback to keep the class definitions importable.
BlobReader = object
BlobWriter = object
DEFAULT_RETRY = None
gcsio_retry = None
GOOGLE_CLOUD_STORAGE_INSTALLED = False

__all__ = ['GcsIO', 'create_storage_client']

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,13 +167,18 @@ class GcsIO(object):
"""Google Cloud Storage I/O client."""
def __init__(
self,
storage_client: Optional[storage.Client] = None,
storage_client: 'Optional[storage.Client]' = None,
pipeline_options: Optional[Union[dict, PipelineOptions]] = None) -> None:
if pipeline_options is None:
pipeline_options = PipelineOptions()
elif isinstance(pipeline_options, dict):
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
if storage_client is None:
if not GOOGLE_CLOUD_STORAGE_INSTALLED:
raise RuntimeError(
'Google Cloud Storage dependencies are not installed, and no '
'alternative client was provided to GcsIO. Please install the gcp '
'dependencies with: pip install apache-beam[gcp]')
storage_client = create_storage_client(pipeline_options)

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
Expand Down
50 changes: 50 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import logging
import os
import random
import subprocess
import sys
import textwrap
import unittest
from datetime import datetime

Expand Down Expand Up @@ -238,6 +241,53 @@ def exists(self, **kwargs):
return self.bucket.get_blob(self.name) is not None


class TestGcsioWithoutGcpExtra(unittest.TestCase):
"""Lazy-import behavior when the gcp extra is not installed.

Mirrors s3io's BOTO3_INSTALLED handling: importing the module must succeed
without the optional dependency, and the error is deferred until a GcsIO is
actually created. See https://github.com/apache/beam/issues/37445.

This runs in a subprocess because this test environment has the gcp extra
installed; we simulate its absence by blocking the google.cloud import in a
fresh interpreter.
"""
def test_import_succeeds_and_creation_raises(self):
script = textwrap.dedent(
"""
import sys
# Simulate the gcp extra not being installed.
sys.modules['google.cloud'] = None

from apache_beam.io.gcp import gcsio
assert gcsio.GOOGLE_CLOUD_STORAGE_INSTALLED is False

# Module-level constants and pure helpers remain usable.
assert gcsio.MAX_BATCH_OPERATION_SIZE == 100
assert gcsio.parse_gcs_path('gs://bucket/object') == (
'bucket', 'object')

# Creating a GcsIO without a client raises a clear, actionable error.
try:
gcsio.GcsIO()
except RuntimeError as e:
assert 'apache-beam[gcp]' in str(e), str(e)
else:
raise AssertionError('expected RuntimeError creating GcsIO')
print('OK')
""")
result = subprocess.run([sys.executable, '-c', script],
capture_output=True,
text=True,
check=False)
self.assertEqual(
result.returncode,
0,
msg='subprocess failed:\nstdout=%s\nstderr=%s' %
(result.stdout, result.stderr))
self.assertIn('OK', result.stdout)


@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
class TestGCSPathParser(unittest.TestCase):

Expand Down
Loading