diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index ee11d12723e9..3d53c96f1f81 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -42,6 +42,22 @@ class S3Options: S3_REGION = ConfigOptions.key("fs.s3.region").string_type().no_default_value().with_description("S3 region") +class GcsOptions: + GCS_ACCESS_TOKEN = ( + ConfigOptions.key("gcs.access-token").string_type().no_default_value() + .with_description( + "GCS access token. If not set, ADC (Application Default Credentials) is used " + "automatically.")) + GCS_ACCESS_TOKEN_EXPIRATION = ( + ConfigOptions.key("gcs.access-token.expiration").string_type().no_default_value() + .with_description( + "ISO 8601 expiration datetime for the GCS access token. " + "Required when gcs.access-token is set.")) + GCS_PROJECT_ID = ( + ConfigOptions.key("gcs.project-id").string_type().no_default_value() + .with_description("GCP project ID for GCS requests.")) + + class PVFSOptions: CACHE_ENABLED = ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description( "Enable cache") diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index d6af3ca543b3..89ba1bafc341 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -81,6 +81,8 @@ def __init__(self, path: str, catalog_options: Options): self.filesystem = self._initialize_s3_fs() elif scheme in {"hdfs", "viewfs"}: self.filesystem = self._initialize_hdfs_fs(scheme, netloc) + elif scheme == "gs": + self.filesystem = self._initialize_gcs_fs() else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") @@ -292,6 +294,26 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: user=os.environ.get('HADOOP_USER_NAME', 'hadoop') ) + def _initialize_gcs_fs(self) -> FileSystem: + access_token = self._get_property("gcs.access-token") + token_expiry = self._get_property("gcs.access-token.expiration") + project_id = self._get_property("gcs.project-id") + + kwargs = {} + if access_token: + from datetime import datetime + kwargs["access_token"] = access_token + kwargs["credential_token_expiration"] = ( + datetime.fromisoformat(token_expiry) if token_expiry + else datetime(9999, 12, 31) + ) + if project_id: + kwargs["project_id"] = project_id + + # With no kwargs, GcsFileSystem uses ADC automatically + # (GOOGLE_APPLICATION_CREDENTIALS or GCP metadata server / Workload Identity) + return pafs.GcsFileSystem(**kwargs) + @staticmethod def _kerberos_login_from_keytab(principal: str, keytab: str): if not os.path.isfile(keytab): @@ -724,6 +746,13 @@ def to_filesystem_path(self, path: str) -> str: else: return str(path) + from pyarrow.fs import GcsFileSystem + if isinstance(self.filesystem, GcsFileSystem): + if parsed.scheme and parsed.netloc: + path_part = normalized_path.lstrip('/') + return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc + return str(path) + if parsed.scheme: if not normalized_path: return '.' diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index 86c4c28f82aa..66494989c495 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -489,5 +489,62 @@ def test_path_on_windows(self): self.assertNotIn("\\", call[0][0], f"backslash in path: {call[0][0]}") +class GCSFileIOPathTest(unittest.TestCase): + """Unit tests for PyArrowFileIO.to_filesystem_path with GCS (no credentials required).""" + + def setUp(self): + self.file_io = PyArrowFileIO("gs://my-bucket/warehouse", Options({})) + + def test_gcs_filesystem_type(self): + """GCS warehouse path should produce a GcsFileSystem.""" + self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem) + + def test_gcs_path_conversion(self): + """gs://bucket/key should map to bucket/key (no leading slash).""" + self.assertEqual( + self.file_io.to_filesystem_path("gs://my-bucket/path/to/file.parquet"), + "my-bucket/path/to/file.parquet" + ) + self.assertEqual( + self.file_io.to_filesystem_path("gs://my-bucket/warehouse/db.db/tbl/data-0.parquet"), + "my-bucket/warehouse/db.db/tbl/data-0.parquet" + ) + + def test_gcs_path_bucket_only(self): + """gs://bucket with no path should return just the bucket name.""" + self.assertEqual( + self.file_io.to_filesystem_path("gs://my-bucket"), + "my-bucket" + ) + + def test_gcs_path_normalization(self): + """Multiple consecutive slashes in the path should be collapsed.""" + self.assertEqual( + self.file_io.to_filesystem_path("gs://my-bucket///path///to///file.parquet"), + "my-bucket/path/to/file.parquet" + ) + + def test_gcs_path_idempotency(self): + """Already-converted bucket/key paths should pass through unchanged.""" + converted = "my-bucket/path/to/file.parquet" + self.assertEqual(self.file_io.to_filesystem_path(converted), converted) + parent = str(Path(converted).parent) + self.assertEqual(self.file_io.to_filesystem_path(parent), parent) + + def test_gcs_path_no_leading_slash(self): + """to_filesystem_path must never return a path starting with '/'.""" + cases = [ + "gs://my-bucket/path/to/file.parquet", + "gs://my-bucket", + "gs://my-bucket///path", + ] + for uri in cases: + result = self.file_io.to_filesystem_path(uri) + self.assertFalse( + result.startswith("/"), + f"Path must not start with '/' for GcsFileSystem, got: {result!r} (input: {uri!r})" + ) + + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/gcs_file_io_test.py b/paimon-python/pypaimon/tests/gcs_file_io_test.py new file mode 100644 index 000000000000..672786cf57cb --- /dev/null +++ b/paimon-python/pypaimon/tests/gcs_file_io_test.py @@ -0,0 +1,184 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import unittest +import uuid + +import pyarrow.fs as pafs + +from pypaimon.common.options import Options +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO + + +class GCSFileIOTest(unittest.TestCase): + """Integration tests for PyArrowFileIO with GCS. + + Requires the following environment variable to be set: + GCS_TEST_BUCKET — name of the GCS bucket to use (without gs:// prefix) + + Credentials are picked up automatically via Application Default Credentials + (GOOGLE_APPLICATION_CREDENTIALS, GCP metadata server, or Workload Identity). + All tests are skipped when GCS_TEST_BUCKET is not configured. + """ + + def setUp(self): + self.bucket = os.environ.get("GCS_TEST_BUCKET") + if not self.bucket: + self.skipTest("GCS_TEST_BUCKET is not configured") + return + + self.root_path = f"gs://{self.bucket}/" + self.file_io = PyArrowFileIO(self.root_path, Options({})) + self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" + + def tearDown(self): + if not hasattr(self, 'file_io'): + return + test_dir = f"{self.root_path}{self.test_prefix}" + try: + if self.file_io.exists(test_dir): + self.file_io.delete(test_dir, recursive=True) + except Exception: + pass + + def _path(self, name: str) -> str: + return f"{self.root_path}{self.test_prefix}{name}" + + def test_gcs_filesystem_type(self): + """PyArrowFileIO with gs:// should use GcsFileSystem.""" + self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem) + + def test_exists(self): + """exists() returns False for non-existent paths.""" + self.assertFalse(self.file_io.exists(self._path("nonexistent.txt"))) + with self.assertRaises(FileNotFoundError): + self.file_io.get_file_status(self._path("nonexistent.txt")) + + def test_write_and_read_file(self): + """write_file() and read_file_utf8() round-trip.""" + test_file = self._path("write_read_test.txt") + + self.file_io.write_file(test_file, "hello gcs") + self.assertTrue(self.file_io.exists(test_file)) + self.assertEqual(self.file_io.read_file_utf8(test_file), "hello gcs") + + def test_write_file_overwrite(self): + """write_file() respects the overwrite flag.""" + test_file = self._path("overwrite_test.txt") + + self.file_io.write_file(test_file, "first") + with self.assertRaises(FileExistsError): + self.file_io.write_file(test_file, "second", overwrite=False) + self.assertEqual(self.file_io.read_file_utf8(test_file), "first") + + self.file_io.write_file(test_file, "overwritten", overwrite=True) + self.assertEqual(self.file_io.read_file_utf8(test_file), "overwritten") + + def test_new_input_stream_read(self): + """new_output_stream() / new_input_stream() round-trip.""" + test_data = b"Hello, GCS! This is a test file." + test_file = self._path("input_stream_test.bin") + + with self.file_io.new_output_stream(test_file) as out: + out.write(test_data) + + with self.file_io.new_input_stream(test_file) as inp: + self.assertEqual(inp.read(), test_data) + + with self.assertRaises(FileNotFoundError): + self.file_io.new_input_stream(self._path("nonexistent.bin")) + + def test_get_file_status_directory(self): + """get_file_status() returns Directory type for a directory.""" + test_dir = self._path("test-dir/") + self.file_io.mkdirs(test_dir) + status = self.file_io.get_file_status(test_dir) + self.assertIsNotNone(status) + self.assertEqual(status.type, pafs.FileType.Directory) + + def test_get_file_status_file(self): + """get_file_status() returns File type and non-None size for a file.""" + test_file = self._path("status_test.txt") + self.file_io.write_file(test_file, "content") + status = self.file_io.get_file_status(test_file) + self.assertEqual(status.type, pafs.FileType.File) + self.assertIsNotNone(status.size) + + def test_delete_returns_false_when_not_exists(self): + """delete() returns False when the path does not exist.""" + self.assertFalse(self.file_io.delete(self._path("nonexistent.txt"))) + self.assertFalse(self.file_io.delete(self._path("nonexistent_dir"), recursive=False)) + + def test_delete_non_empty_directory_raises_error(self): + """delete() without recursive=True raises OSError for non-empty directory.""" + test_dir = self._path("nonempty-dir/") + test_file = self._path("nonempty-dir/file.txt") + self.file_io.mkdirs(test_dir) + with self.file_io.new_output_stream(test_file) as out: + out.write(b"data") + + with self.assertRaises(OSError) as ctx: + self.file_io.delete(test_dir, recursive=False) + self.assertIn("is not empty", str(ctx.exception)) + + def test_rename_returns_false_when_dst_exists(self): + """rename() returns False when the destination already exists.""" + src = self._path("src.txt") + dst = self._path("dst.txt") + with self.file_io.new_output_stream(src) as out: + out.write(b"src") + with self.file_io.new_output_stream(dst) as out: + out.write(b"dst") + + self.assertFalse(self.file_io.rename(src, dst)) + + def test_copy_file(self): + """copy_file() copies content and respects the overwrite flag.""" + src = self._path("copy_src.txt") + dst = self._path("copy_dst.txt") + with self.file_io.new_output_stream(src) as out: + out.write(b"source content") + with self.file_io.new_output_stream(dst) as out: + out.write(b"target content") + + with self.assertRaises(FileExistsError) as ctx: + self.file_io.copy_file(src, dst, overwrite=False) + self.assertIn("already exists", str(ctx.exception)) + + self.file_io.copy_file(src, dst, overwrite=True) + with self.file_io.new_input_stream(dst) as inp: + self.assertEqual(inp.read(), b"source content") + + def test_try_to_write_atomic(self): + """try_to_write_atomic() writes a file and returns True on success.""" + normal_file = self._path("atomic_file.txt") + self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "atomic content")) + self.assertEqual(self.file_io.read_file_utf8(normal_file), "atomic content") + + def test_mkdirs_raises_error_when_path_is_file(self): + """mkdirs() raises FileExistsError when the path is an existing file.""" + test_file = self._path("existing_file.txt") + self.file_io.write_file(test_file, "data") + + with self.assertRaises(FileExistsError) as ctx: + self.file_io.mkdirs(test_file) + self.assertIn("is not a directory", str(ctx.exception)) + + +if __name__ == '__main__': + unittest.main()