Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions paimon-python/pypaimon/common/options/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ class S3Options:
S3_REGION = ConfigOptions.key("fs.s3.region").string_type().no_default_value().with_description("S3 region")


class GcsOptions:
GCS_ACCESS_TOKEN = (
ConfigOptions.key("gcs.access-token").string_type().no_default_value()
.with_description(
"GCS access token. If not set, ADC (Application Default Credentials) is used "
"automatically."))
GCS_ACCESS_TOKEN_EXPIRATION = (
ConfigOptions.key("gcs.access-token.expiration").string_type().no_default_value()
.with_description(
"ISO 8601 expiration datetime for the GCS access token. "
"Required when gcs.access-token is set."))
GCS_PROJECT_ID = (
ConfigOptions.key("gcs.project-id").string_type().no_default_value()
.with_description("GCP project ID for GCS requests."))


class PVFSOptions:
CACHE_ENABLED = ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description(
"Enable cache")
Expand Down
29 changes: 29 additions & 0 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def __init__(self, path: str, catalog_options: Options):
self.filesystem = self._initialize_s3_fs()
elif scheme in {"hdfs", "viewfs"}:
self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
elif scheme == "gs":
self.filesystem = self._initialize_gcs_fs()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down Expand Up @@ -292,6 +294,26 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
)

def _initialize_gcs_fs(self) -> FileSystem:
access_token = self._get_property("gcs.access-token")
token_expiry = self._get_property("gcs.access-token.expiration")
project_id = self._get_property("gcs.project-id")

kwargs = {}
if access_token:
from datetime import datetime
kwargs["access_token"] = access_token
kwargs["credential_token_expiration"] = (
datetime.fromisoformat(token_expiry) if token_expiry
else datetime(9999, 12, 31)
)
if project_id:
kwargs["project_id"] = project_id

# With no kwargs, GcsFileSystem uses ADC automatically
# (GOOGLE_APPLICATION_CREDENTIALS or GCP metadata server / Workload Identity)
return pafs.GcsFileSystem(**kwargs)

@staticmethod
def _kerberos_login_from_keytab(principal: str, keytab: str):
if not os.path.isfile(keytab):
Expand Down Expand Up @@ -724,6 +746,13 @@ def to_filesystem_path(self, path: str) -> str:
else:
return str(path)

from pyarrow.fs import GcsFileSystem
if isinstance(self.filesystem, GcsFileSystem):
if parsed.scheme and parsed.netloc:
path_part = normalized_path.lstrip('/')
return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
return str(path)

if parsed.scheme:
if not normalized_path:
return '.'
Expand Down
57 changes: 57 additions & 0 deletions paimon-python/pypaimon/tests/file_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,5 +489,62 @@ def test_path_on_windows(self):
self.assertNotIn("\\", call[0][0], f"backslash in path: {call[0][0]}")


class GCSFileIOPathTest(unittest.TestCase):
"""Unit tests for PyArrowFileIO.to_filesystem_path with GCS (no credentials required)."""

def setUp(self):
self.file_io = PyArrowFileIO("gs://my-bucket/warehouse", Options({}))

def test_gcs_filesystem_type(self):
"""GCS warehouse path should produce a GcsFileSystem."""
self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)

def test_gcs_path_conversion(self):
"""gs://bucket/key should map to bucket/key (no leading slash)."""
self.assertEqual(
self.file_io.to_filesystem_path("gs://my-bucket/path/to/file.parquet"),
"my-bucket/path/to/file.parquet"
)
self.assertEqual(
self.file_io.to_filesystem_path("gs://my-bucket/warehouse/db.db/tbl/data-0.parquet"),
"my-bucket/warehouse/db.db/tbl/data-0.parquet"
)

def test_gcs_path_bucket_only(self):
"""gs://bucket with no path should return just the bucket name."""
self.assertEqual(
self.file_io.to_filesystem_path("gs://my-bucket"),
"my-bucket"
)

def test_gcs_path_normalization(self):
"""Multiple consecutive slashes in the path should be collapsed."""
self.assertEqual(
self.file_io.to_filesystem_path("gs://my-bucket///path///to///file.parquet"),
"my-bucket/path/to/file.parquet"
)

def test_gcs_path_idempotency(self):
"""Already-converted bucket/key paths should pass through unchanged."""
converted = "my-bucket/path/to/file.parquet"
self.assertEqual(self.file_io.to_filesystem_path(converted), converted)
parent = str(Path(converted).parent)
self.assertEqual(self.file_io.to_filesystem_path(parent), parent)

def test_gcs_path_no_leading_slash(self):
"""to_filesystem_path must never return a path starting with '/'."""
cases = [
"gs://my-bucket/path/to/file.parquet",
"gs://my-bucket",
"gs://my-bucket///path",
]
for uri in cases:
result = self.file_io.to_filesystem_path(uri)
self.assertFalse(
result.startswith("/"),
f"Path must not start with '/' for GcsFileSystem, got: {result!r} (input: {uri!r})"
)


if __name__ == '__main__':
unittest.main()
184 changes: 184 additions & 0 deletions paimon-python/pypaimon/tests/gcs_file_io_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import unittest
import uuid

import pyarrow.fs as pafs

from pypaimon.common.options import Options
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO


class GCSFileIOTest(unittest.TestCase):
"""Integration tests for PyArrowFileIO with GCS.

Requires the following environment variable to be set:
GCS_TEST_BUCKET — name of the GCS bucket to use (without gs:// prefix)

Credentials are picked up automatically via Application Default Credentials
(GOOGLE_APPLICATION_CREDENTIALS, GCP metadata server, or Workload Identity).
All tests are skipped when GCS_TEST_BUCKET is not configured.
"""

def setUp(self):
self.bucket = os.environ.get("GCS_TEST_BUCKET")
if not self.bucket:
self.skipTest("GCS_TEST_BUCKET is not configured")
return

self.root_path = f"gs://{self.bucket}/"
self.file_io = PyArrowFileIO(self.root_path, Options({}))
self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"

def tearDown(self):
if not hasattr(self, 'file_io'):
return
test_dir = f"{self.root_path}{self.test_prefix}"
try:
if self.file_io.exists(test_dir):
self.file_io.delete(test_dir, recursive=True)
except Exception:
pass

def _path(self, name: str) -> str:
return f"{self.root_path}{self.test_prefix}{name}"

def test_gcs_filesystem_type(self):
"""PyArrowFileIO with gs:// should use GcsFileSystem."""
self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)

def test_exists(self):
"""exists() returns False for non-existent paths."""
self.assertFalse(self.file_io.exists(self._path("nonexistent.txt")))
with self.assertRaises(FileNotFoundError):
self.file_io.get_file_status(self._path("nonexistent.txt"))

def test_write_and_read_file(self):
"""write_file() and read_file_utf8() round-trip."""
test_file = self._path("write_read_test.txt")

self.file_io.write_file(test_file, "hello gcs")
self.assertTrue(self.file_io.exists(test_file))
self.assertEqual(self.file_io.read_file_utf8(test_file), "hello gcs")

def test_write_file_overwrite(self):
"""write_file() respects the overwrite flag."""
test_file = self._path("overwrite_test.txt")

self.file_io.write_file(test_file, "first")
with self.assertRaises(FileExistsError):
self.file_io.write_file(test_file, "second", overwrite=False)
self.assertEqual(self.file_io.read_file_utf8(test_file), "first")

self.file_io.write_file(test_file, "overwritten", overwrite=True)
self.assertEqual(self.file_io.read_file_utf8(test_file), "overwritten")

def test_new_input_stream_read(self):
"""new_output_stream() / new_input_stream() round-trip."""
test_data = b"Hello, GCS! This is a test file."
test_file = self._path("input_stream_test.bin")

with self.file_io.new_output_stream(test_file) as out:
out.write(test_data)

with self.file_io.new_input_stream(test_file) as inp:
self.assertEqual(inp.read(), test_data)

with self.assertRaises(FileNotFoundError):
self.file_io.new_input_stream(self._path("nonexistent.bin"))

def test_get_file_status_directory(self):
"""get_file_status() returns Directory type for a directory."""
test_dir = self._path("test-dir/")
self.file_io.mkdirs(test_dir)
status = self.file_io.get_file_status(test_dir)
self.assertIsNotNone(status)
self.assertEqual(status.type, pafs.FileType.Directory)

def test_get_file_status_file(self):
"""get_file_status() returns File type and non-None size for a file."""
test_file = self._path("status_test.txt")
self.file_io.write_file(test_file, "content")
status = self.file_io.get_file_status(test_file)
self.assertEqual(status.type, pafs.FileType.File)
self.assertIsNotNone(status.size)

def test_delete_returns_false_when_not_exists(self):
"""delete() returns False when the path does not exist."""
self.assertFalse(self.file_io.delete(self._path("nonexistent.txt")))
self.assertFalse(self.file_io.delete(self._path("nonexistent_dir"), recursive=False))

def test_delete_non_empty_directory_raises_error(self):
"""delete() without recursive=True raises OSError for non-empty directory."""
test_dir = self._path("nonempty-dir/")
test_file = self._path("nonempty-dir/file.txt")
self.file_io.mkdirs(test_dir)
with self.file_io.new_output_stream(test_file) as out:
out.write(b"data")

with self.assertRaises(OSError) as ctx:
self.file_io.delete(test_dir, recursive=False)
self.assertIn("is not empty", str(ctx.exception))

def test_rename_returns_false_when_dst_exists(self):
"""rename() returns False when the destination already exists."""
src = self._path("src.txt")
dst = self._path("dst.txt")
with self.file_io.new_output_stream(src) as out:
out.write(b"src")
with self.file_io.new_output_stream(dst) as out:
out.write(b"dst")

self.assertFalse(self.file_io.rename(src, dst))

def test_copy_file(self):
"""copy_file() copies content and respects the overwrite flag."""
src = self._path("copy_src.txt")
dst = self._path("copy_dst.txt")
with self.file_io.new_output_stream(src) as out:
out.write(b"source content")
with self.file_io.new_output_stream(dst) as out:
out.write(b"target content")

with self.assertRaises(FileExistsError) as ctx:
self.file_io.copy_file(src, dst, overwrite=False)
self.assertIn("already exists", str(ctx.exception))

self.file_io.copy_file(src, dst, overwrite=True)
with self.file_io.new_input_stream(dst) as inp:
self.assertEqual(inp.read(), b"source content")

def test_try_to_write_atomic(self):
"""try_to_write_atomic() writes a file and returns True on success."""
normal_file = self._path("atomic_file.txt")
self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "atomic content"))
self.assertEqual(self.file_io.read_file_utf8(normal_file), "atomic content")

def test_mkdirs_raises_error_when_path_is_file(self):
"""mkdirs() raises FileExistsError when the path is an existing file."""
test_file = self._path("existing_file.txt")
self.file_io.write_file(test_file, "data")

with self.assertRaises(FileExistsError) as ctx:
self.file_io.mkdirs(test_file)
self.assertIn("is not a directory", str(ctx.exception))


if __name__ == '__main__':
unittest.main()
Loading