From 7bc5927955ff6eb4532d82bae3f1b22ed6d2b30f Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Sat, 30 May 2026 07:47:28 -0700 Subject: [PATCH 1/2] Make GCS filesystem lookup lazy to match S3 behavior FileSystems.get_filesystem("gs://...") raised immediately when the gcp extra was not installed, because gcsfilesystem.py imported gcsio (and its google-cloud-storage dependency) at module load time. When that import failed, GCSFileSystem was never registered, unlike S3FileSystem whose s3io imports boto3 lazily. Import gcsio lazily so GCSFileSystem can still be looked up without the gcp extra, deferring the dependency error to usage time (matching S3). A single _get_gcsio_module() helper raises a clear ImportError when the module is unavailable; CHUNK_SIZE, _gcsIO and report_lineage go through it. Add a regression test that simulates the missing extra in a subprocess. Fixes #37445 Signed-off-by: wilmerdooley --- .../apache_beam/io/gcp/gcsfilesystem.py | 55 ++++++++++++++-- .../apache_beam/io/gcp/gcsfilesystem_test.py | 64 ++++++++++++++++++- 2 files changed, 113 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..003947d84473 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,22 +34,65 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + +try: + from apache_beam.io.gcp import gcsio +except ImportError: + # The gcp extra (google-cloud-storage and friends, imported by gcsio) may not + # be installed. Import it lazily so GCSFileSystem can still be looked up via + # FileSystems.get_filesystem() without the gcp extra installed; the dependency + # is only required when the filesystem is actually used, matching the behavior + # of S3FileSystem. See https://github.com/apache/beam/issues/37445. + gcsio = None # type: ignore[assignment] __all__ = ['GCSFileSystem'] +class _classproperty: + """A read-only property that resolves on the class as well as instances. + + Lets ``GCSFileSystem.CHUNK_SIZE`` remain accessible as a class attribute (as + in ``S3FileSystem``) while computing its value lazily, so that importing this + module does not require the gcp extra. See + https://github.com/apache/beam/issues/37445. + """ + def __init__(self, fget): + self._fget = fget + + def __get__(self, obj, owner): + return self._fget(owner) + + class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options + @staticmethod + def _get_gcsio_module(): + """Return the ``gcsio`` module, raising ImportError if it is unavailable. + + ``gcsio`` is imported lazily (see the module-level import) so that this + filesystem can be looked up without the gcp extra installed. The dependency + is only required when the filesystem is actually used. + """ + if gcsio is None: + raise ImportError( + 'Could not import apache_beam.io.gcp.gcsio. This usually means the ' + 'gcp dependencies are not installed. Install them with: ' + 'pip install apache-beam[gcp]') + return gcsio + + @_classproperty + def CHUNK_SIZE(cls): + """Chunk size in batch operations.""" + return cls._get_gcsio_module().MAX_BATCH_OPERATION_SIZE + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -139,7 +182,8 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return gcsio.GcsIO(pipeline_options=self._pipeline_options) + return self._get_gcsio_module().GcsIO( + pipeline_options=self._pipeline_options) def _path_open( self, @@ -370,8 +414,9 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = gcsio.parse_gcs_path(path, object_optional=True) - except ValueError: + components = self._get_gcsio_module().parse_gcs_path( + path, object_optional=True) + except (ImportError, ValueError): # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..f96d883ca9c7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -21,6 +21,9 @@ # pytype: skip-file import logging +import subprocess +import sys +import textwrap import unittest import mock @@ -38,7 +41,58 @@ # pylint: enable=wrong-import-order, wrong-import-position -@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') +@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported') +class GCSFileSystemLazyImportTest(unittest.TestCase): + def test_get_filesystem_without_gcp_extra(self): + # Regression test for https://github.com/apache/beam/issues/37445. + # + # Without the gcp extra installed, gcsio cannot be imported. GCSFileSystem + # must still import and register so FileSystems.get_filesystem('gs://...') + # returns it (like S3FileSystem), deferring the dependency error to usage. + # + # This runs in a subprocess because the behavior is decided at import time + # and this test environment has the gcp extra installed; we simulate its + # absence by blocking the gcsio import in a fresh interpreter (reloading in + # process would leave a second GCSFileSystem registered for the gs scheme). + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['apache_beam.io.gcp.gcsio'] = None + + from apache_beam.io.gcp import gcsfilesystem + assert gcsfilesystem.gcsio is None, 'expected gcsio to be unavailable' + + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://bucket/object') + assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__ + assert fs.scheme() == 'gs' + + # Using the filesystem raises a clear ImportError (deferred validation). + for use in (lambda: fs.CHUNK_SIZE, fs._gcsIO): + try: + use() + except ImportError: + pass + else: + raise AssertionError('expected ImportError using GCS without gcp') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + +@unittest.skipIf( + gcsfilesystem is None or gcsfilesystem.gcsio is None, + 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): pipeline_options = PipelineOptions() @@ -48,6 +102,14 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + def test_chunk_size_on_class_and_instance(self): + # CHUNK_SIZE must resolve both on the class and on instances (see the + # review on #37445): the lazy class-property must preserve the class-level + # access that S3FileSystem provides as a plain class attribute. + expected = gcsfilesystem.gcsio.MAX_BATCH_OPERATION_SIZE + self.assertEqual(gcsfilesystem.GCSFileSystem.CHUNK_SIZE, expected) + self.assertEqual(self.fs.CHUNK_SIZE, expected) + def test_join(self): self.assertEqual( 'gs://bucket/path/to/file', From 187970a86cf49673db4b1f7ab025b7428d10a522 Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Mon, 22 Jun 2026 18:13:43 +0000 Subject: [PATCH 2/2] [Python] Defer the GCS dependency check to gcsio, matching s3io Move the optional-dependency handling into gcsio so importing it succeeds without the gcp extra (mirroring s3io BOTO3_INSTALLED), and revert the gcsfilesystem-layer lazy-import workaround. The dependency error now surfaces at GcsIO creation rather than at import. Addresses #37445. Signed-off-by: wilmerdooley --- .../apache_beam/io/gcp/gcsfilesystem.py | 55 ++----------------- .../apache_beam/io/gcp/gcsfilesystem_test.py | 43 +++++---------- sdks/python/apache_beam/io/gcp/gcsio.py | 45 +++++++++++---- sdks/python/apache_beam/io/gcp/gcsio_test.py | 50 +++++++++++++++++ 4 files changed, 105 insertions(+), 88 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 003947d84473..3763e21abc9f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,65 +34,22 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem - -try: - from apache_beam.io.gcp import gcsio -except ImportError: - # The gcp extra (google-cloud-storage and friends, imported by gcsio) may not - # be installed. Import it lazily so GCSFileSystem can still be looked up via - # FileSystems.get_filesystem() without the gcp extra installed; the dependency - # is only required when the filesystem is actually used, matching the behavior - # of S3FileSystem. See https://github.com/apache/beam/issues/37445. - gcsio = None # type: ignore[assignment] +from apache_beam.io.gcp import gcsio __all__ = ['GCSFileSystem'] -class _classproperty: - """A read-only property that resolves on the class as well as instances. - - Lets ``GCSFileSystem.CHUNK_SIZE`` remain accessible as a class attribute (as - in ``S3FileSystem``) while computing its value lazily, so that importing this - module does not require the gcp extra. See - https://github.com/apache/beam/issues/37445. - """ - def __init__(self, fget): - self._fget = fget - - def __get__(self, obj, owner): - return self._fget(owner) - - class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ + CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options - @staticmethod - def _get_gcsio_module(): - """Return the ``gcsio`` module, raising ImportError if it is unavailable. - - ``gcsio`` is imported lazily (see the module-level import) so that this - filesystem can be looked up without the gcp extra installed. The dependency - is only required when the filesystem is actually used. - """ - if gcsio is None: - raise ImportError( - 'Could not import apache_beam.io.gcp.gcsio. This usually means the ' - 'gcp dependencies are not installed. Install them with: ' - 'pip install apache-beam[gcp]') - return gcsio - - @_classproperty - def CHUNK_SIZE(cls): - """Chunk size in batch operations.""" - return cls._get_gcsio_module().MAX_BATCH_OPERATION_SIZE - @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -182,8 +139,7 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return self._get_gcsio_module().GcsIO( - pipeline_options=self._pipeline_options) + return gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( self, @@ -414,9 +370,8 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = self._get_gcsio_module().parse_gcs_path( - path, object_optional=True) - except (ImportError, ValueError): + components = gcsio.parse_gcs_path(path, object_optional=True) + except ValueError: # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index f96d883ca9c7..41ad6653509d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -46,36 +46,33 @@ class GCSFileSystemLazyImportTest(unittest.TestCase): def test_get_filesystem_without_gcp_extra(self): # Regression test for https://github.com/apache/beam/issues/37445. # - # Without the gcp extra installed, gcsio cannot be imported. GCSFileSystem - # must still import and register so FileSystems.get_filesystem('gs://...') - # returns it (like S3FileSystem), deferring the dependency error to usage. + # Without the gcp extra installed, GCSFileSystem must still import and + # register so FileSystems.get_filesystem('gs://...') returns it (like + # S3FileSystem), deferring the dependency error to usage. The deferred + # behavior now lives in gcsio (see gcsio_test); here we only assert that the + # filesystem still registers and that using it raises a clear error. # # This runs in a subprocess because the behavior is decided at import time # and this test environment has the gcp extra installed; we simulate its - # absence by blocking the gcsio import in a fresh interpreter (reloading in - # process would leave a second GCSFileSystem registered for the gs scheme). + # absence by blocking the google.cloud import in a fresh interpreter. script = textwrap.dedent( """ import sys # Simulate the gcp extra not being installed. - sys.modules['apache_beam.io.gcp.gcsio'] = None - - from apache_beam.io.gcp import gcsfilesystem - assert gcsfilesystem.gcsio is None, 'expected gcsio to be unavailable' + sys.modules['google.cloud'] = None from apache_beam.io.filesystems import FileSystems fs = FileSystems.get_filesystem('gs://bucket/object') assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__ assert fs.scheme() == 'gs' - # Using the filesystem raises a clear ImportError (deferred validation). - for use in (lambda: fs.CHUNK_SIZE, fs._gcsIO): - try: - use() - except ImportError: - pass - else: - raise AssertionError('expected ImportError using GCS without gcp') + # Using the filesystem raises a clear error (deferred validation). + try: + fs._gcsIO() + except RuntimeError: + pass + else: + raise AssertionError('expected RuntimeError using GCS without gcp') print('OK') """) result = subprocess.run([sys.executable, '-c', script], @@ -90,9 +87,7 @@ def test_get_filesystem_without_gcp_extra(self): self.assertIn('OK', result.stdout) -@unittest.skipIf( - gcsfilesystem is None or gcsfilesystem.gcsio is None, - 'GCP dependencies are not installed') +@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): pipeline_options = PipelineOptions() @@ -102,14 +97,6 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') - def test_chunk_size_on_class_and_instance(self): - # CHUNK_SIZE must resolve both on the class and on instances (see the - # review on #37445): the lazy class-property must preserve the class-level - # access that S3FileSystem provides as a plain class attribute. - expected = gcsfilesystem.gcsio.MAX_BATCH_OPERATION_SIZE - self.assertEqual(gcsfilesystem.GCSFileSystem.CHUNK_SIZE, expected) - self.assertEqual(self.fs.CHUNK_SIZE, expected) - def test_join(self): self.assertEqual( 'gs://bucket/path/to/file', diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index da5c20aa0e70..fbb58f61a31a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,21 +35,41 @@ from typing import Optional from typing import Union -from google.api_core.exceptions import RetryError -from google.cloud import storage -from google.cloud.exceptions import NotFound -from google.cloud.exceptions import from_http_response -from google.cloud.storage.fileio import BlobReader -from google.cloud.storage.fileio import BlobWriter -from google.cloud.storage.retry import DEFAULT_RETRY - from apache_beam import version as beam_version from apache_beam.internal.gcp import auth -from apache_beam.io.gcp import gcsio_retry from apache_beam.metrics.metric import Metrics from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +try: + # pylint: disable=wrong-import-order, wrong-import-position + # pylint: disable=ungrouped-imports + from google.api_core.exceptions import RetryError + from google.cloud import storage + from google.cloud.exceptions import NotFound + from google.cloud.exceptions import from_http_response + from google.cloud.storage.fileio import BlobReader + from google.cloud.storage.fileio import BlobWriter + from google.cloud.storage.retry import DEFAULT_RETRY + from apache_beam.io.gcp import gcsio_retry + GOOGLE_CLOUD_STORAGE_INSTALLED = True +except ImportError: + # The gcp extra (google-cloud-storage and friends) may not be installed. The + # module must still import so GcsIO can be looked up; the dependency error is + # deferred until a GcsIO is actually created (mirroring s3io's BOTO3_INSTALLED + # behavior). See https://github.com/apache/beam/issues/37445. + RetryError = None + storage = None + NotFound = None + from_http_response = None + # BlobReader/BlobWriter are used as base classes below, so they need a + # concrete fallback to keep the class definitions importable. + BlobReader = object + BlobWriter = object + DEFAULT_RETRY = None + gcsio_retry = None + GOOGLE_CLOUD_STORAGE_INSTALLED = False + __all__ = ['GcsIO', 'create_storage_client'] _LOGGER = logging.getLogger(__name__) @@ -147,13 +167,18 @@ class GcsIO(object): """Google Cloud Storage I/O client.""" def __init__( self, - storage_client: Optional[storage.Client] = None, + storage_client: 'Optional[storage.Client]' = None, pipeline_options: Optional[Union[dict, PipelineOptions]] = None) -> None: if pipeline_options is None: pipeline_options = PipelineOptions() elif isinstance(pipeline_options, dict): pipeline_options = PipelineOptions.from_dictionary(pipeline_options) if storage_client is None: + if not GOOGLE_CLOUD_STORAGE_INSTALLED: + raise RuntimeError( + 'Google Cloud Storage dependencies are not installed, and no ' + 'alternative client was provided to GcsIO. Please install the gcp ' + 'dependencies with: pip install apache-beam[gcp]') storage_client = create_storage_client(pipeline_options) google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index ec4ccbf1cf57..5e5957fc26c7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -21,6 +21,9 @@ import logging import os import random +import subprocess +import sys +import textwrap import unittest from datetime import datetime @@ -238,6 +241,53 @@ def exists(self, **kwargs): return self.bucket.get_blob(self.name) is not None +class TestGcsioWithoutGcpExtra(unittest.TestCase): + """Lazy-import behavior when the gcp extra is not installed. + + Mirrors s3io's BOTO3_INSTALLED handling: importing the module must succeed + without the optional dependency, and the error is deferred until a GcsIO is + actually created. See https://github.com/apache/beam/issues/37445. + + This runs in a subprocess because this test environment has the gcp extra + installed; we simulate its absence by blocking the google.cloud import in a + fresh interpreter. + """ + def test_import_succeeds_and_creation_raises(self): + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['google.cloud'] = None + + from apache_beam.io.gcp import gcsio + assert gcsio.GOOGLE_CLOUD_STORAGE_INSTALLED is False + + # Module-level constants and pure helpers remain usable. + assert gcsio.MAX_BATCH_OPERATION_SIZE == 100 + assert gcsio.parse_gcs_path('gs://bucket/object') == ( + 'bucket', 'object') + + # Creating a GcsIO without a client raises a clear, actionable error. + try: + gcsio.GcsIO() + except RuntimeError as e: + assert 'apache-beam[gcp]' in str(e), str(e) + else: + raise AssertionError('expected RuntimeError creating GcsIO') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') class TestGCSPathParser(unittest.TestCase):