diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..41ad6653509d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -21,6 +21,9 @@ # pytype: skip-file import logging +import subprocess +import sys +import textwrap import unittest import mock @@ -38,6 +41,52 @@ # pylint: enable=wrong-import-order, wrong-import-position +@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported') +class GCSFileSystemLazyImportTest(unittest.TestCase): + def test_get_filesystem_without_gcp_extra(self): + # Regression test for https://github.com/apache/beam/issues/37445. + # + # Without the gcp extra installed, GCSFileSystem must still import and + # register so FileSystems.get_filesystem('gs://...') returns it (like + # S3FileSystem), deferring the dependency error to usage. The deferred + # behavior now lives in gcsio (see gcsio_test); here we only assert that the + # filesystem still registers and that using it raises a clear error. + # + # This runs in a subprocess because the behavior is decided at import time + # and this test environment has the gcp extra installed; we simulate its + # absence by blocking the google.cloud import in a fresh interpreter. + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['google.cloud'] = None + + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://bucket/object') + assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__ + assert fs.scheme() == 'gs' + + # Using the filesystem raises a clear error (deferred validation). + try: + fs._gcsIO() + except RuntimeError: + pass + else: + raise AssertionError('expected RuntimeError using GCS without gcp') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index da5c20aa0e70..fbb58f61a31a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,21 +35,41 @@ from typing import Optional from typing import Union -from google.api_core.exceptions import RetryError -from google.cloud import storage -from google.cloud.exceptions import NotFound -from google.cloud.exceptions import from_http_response -from google.cloud.storage.fileio import BlobReader -from google.cloud.storage.fileio import BlobWriter -from google.cloud.storage.retry import DEFAULT_RETRY - from apache_beam import version as beam_version from apache_beam.internal.gcp import auth -from apache_beam.io.gcp import gcsio_retry from apache_beam.metrics.metric import Metrics from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +try: + # pylint: disable=wrong-import-order, wrong-import-position + # pylint: disable=ungrouped-imports + from google.api_core.exceptions import RetryError + from google.cloud import storage + from google.cloud.exceptions import NotFound + from google.cloud.exceptions import from_http_response + from google.cloud.storage.fileio import BlobReader + from google.cloud.storage.fileio import BlobWriter + from google.cloud.storage.retry import DEFAULT_RETRY + from apache_beam.io.gcp import gcsio_retry + GOOGLE_CLOUD_STORAGE_INSTALLED = True +except ImportError: + # The gcp extra (google-cloud-storage and friends) may not be installed. The + # module must still import so GcsIO can be looked up; the dependency error is + # deferred until a GcsIO is actually created (mirroring s3io's BOTO3_INSTALLED + # behavior). See https://github.com/apache/beam/issues/37445. + RetryError = None + storage = None + NotFound = None + from_http_response = None + # BlobReader/BlobWriter are used as base classes below, so they need a + # concrete fallback to keep the class definitions importable. + BlobReader = object + BlobWriter = object + DEFAULT_RETRY = None + gcsio_retry = None + GOOGLE_CLOUD_STORAGE_INSTALLED = False + __all__ = ['GcsIO', 'create_storage_client'] _LOGGER = logging.getLogger(__name__) @@ -147,13 +167,18 @@ class GcsIO(object): """Google Cloud Storage I/O client.""" def __init__( self, - storage_client: Optional[storage.Client] = None, + storage_client: 'Optional[storage.Client]' = None, pipeline_options: Optional[Union[dict, PipelineOptions]] = None) -> None: if pipeline_options is None: pipeline_options = PipelineOptions() elif isinstance(pipeline_options, dict): pipeline_options = PipelineOptions.from_dictionary(pipeline_options) if storage_client is None: + if not GOOGLE_CLOUD_STORAGE_INSTALLED: + raise RuntimeError( + 'Google Cloud Storage dependencies are not installed, and no ' + 'alternative client was provided to GcsIO. Please install the gcp ' + 'dependencies with: pip install apache-beam[gcp]') storage_client = create_storage_client(pipeline_options) google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index ec4ccbf1cf57..5e5957fc26c7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -21,6 +21,9 @@ import logging import os import random +import subprocess +import sys +import textwrap import unittest from datetime import datetime @@ -238,6 +241,53 @@ def exists(self, **kwargs): return self.bucket.get_blob(self.name) is not None +class TestGcsioWithoutGcpExtra(unittest.TestCase): + """Lazy-import behavior when the gcp extra is not installed. + + Mirrors s3io's BOTO3_INSTALLED handling: importing the module must succeed + without the optional dependency, and the error is deferred until a GcsIO is + actually created. See https://github.com/apache/beam/issues/37445. + + This runs in a subprocess because this test environment has the gcp extra + installed; we simulate its absence by blocking the google.cloud import in a + fresh interpreter. + """ + def test_import_succeeds_and_creation_raises(self): + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['google.cloud'] = None + + from apache_beam.io.gcp import gcsio + assert gcsio.GOOGLE_CLOUD_STORAGE_INSTALLED is False + + # Module-level constants and pure helpers remain usable. + assert gcsio.MAX_BATCH_OPERATION_SIZE == 100 + assert gcsio.parse_gcs_path('gs://bucket/object') == ( + 'bucket', 'object') + + # Creating a GcsIO without a client raises a clear, actionable error. + try: + gcsio.GcsIO() + except RuntimeError as e: + assert 'apache-beam[gcp]' in str(e), str(e) + else: + raise AssertionError('expected RuntimeError creating GcsIO') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') class TestGCSPathParser(unittest.TestCase):