From 04825ecc86d3c896d15542090a17f1c7608e6352 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 20 Apr 2026 17:12:05 +0000 Subject: [PATCH 1/7] fix: propagate quota_project_id and api_endpoint in AsyncGrpcClient --- .../storage/asyncio/async_grpc_client.py | 10 ++++++ .../unit/asyncio/test_async_grpc_client.py | 34 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_grpc_client.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_grpc_client.py index 5a5ad5acc918..d48fa07c00d9 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_grpc_client.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_grpc_client.py @@ -23,6 +23,8 @@ ) from google.cloud.storage import __version__ +_DEFAULT_HOST = "storage.googleapis.com" + class AsyncGrpcClient: """An asynchronous client for interacting with Google Cloud Storage using the gRPC API. @@ -109,7 +111,15 @@ def _create_async_grpc_client( primary_user_agent = client_info.to_user_agent() + host = _DEFAULT_HOST + quota_project_id = None + if client_options: + host = getattr(client_options, "api_endpoint", None) or _DEFAULT_HOST + quota_project_id = getattr(client_options, "quota_project_id", None) + channel = transport_cls.create_channel( + host=host, + quota_project_id=quota_project_id, attempt_direct_path=attempt_direct_path, credentials=credentials, options=(("grpc.primary_user_agent", primary_user_agent),), diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py index 1c9400ae32d9..f0d8b5bfb260 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py @@ -53,6 +53,7 @@ def test_constructor_default_options(self, mock_async_storage_client): expected_options = (("grpc.primary_user_agent", primary_user_agent),) mock_transport_cls.create_channel.assert_called_once_with( + host="storage.googleapis.com", attempt_direct_path=True, credentials=mock_creds, options=expected_options, @@ -82,6 +83,37 @@ def test_constructor_with_client_info(self, mock_async_storage_client): expected_options = (("grpc.primary_user_agent", primary_user_agent),) mock_transport_cls.create_channel.assert_called_once_with( + host="storage.googleapis.com", + attempt_direct_path=True, + credentials=mock_creds, + options=expected_options, + ) + + @mock.patch("google.cloud._storage_v2.StorageAsyncClient") + def test_constructor_with_quota_project_and_endpoint(self, mock_async_storage_client): + mock_transport_cls = mock.MagicMock() + mock_async_storage_client.get_transport_class.return_value = mock_transport_cls + mock_creds = _make_credentials() + + from google.api_core import client_options + mock_client_options = client_options.ClientOptions( + api_endpoint="custom-endpoint.com", + quota_project_id="my-quota-project" + ) + + async_grpc_client.AsyncGrpcClient( + credentials=mock_creds, + client_options=mock_client_options + ) + + kwargs = mock_async_storage_client.call_args.kwargs + client_info = kwargs["client_info"] + primary_user_agent = client_info.to_user_agent() + expected_options = (("grpc.primary_user_agent", primary_user_agent),) + + mock_transport_cls.create_channel.assert_called_once_with( + host="custom-endpoint.com", + quota_project_id="my-quota-project", attempt_direct_path=True, credentials=mock_creds, options=expected_options, @@ -105,6 +137,7 @@ def test_constructor_disables_directpath(self, mock_async_storage_client): expected_options = (("grpc.primary_user_agent", primary_user_agent),) mock_transport_cls.create_channel.assert_called_once_with( + host="storage.googleapis.com", attempt_direct_path=False, credentials=mock_creds, options=expected_options, @@ -147,6 +180,7 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): expected_options = (("grpc.primary_user_agent", primary_user_agent),) mock_transport_cls.create_channel.assert_called_once_with( + host="storage.googleapis.com", attempt_direct_path=mock_attempt_direct_path, credentials=mock_creds, options=expected_options, From b8f88281d0e54a75f1a5c2faa6d97e4c9a401ce4 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 20 Apr 2026 17:16:02 +0000 Subject: [PATCH 2/7] test: update expected calls in test_async_grpc_client.py after refactor --- .../unit/asyncio/test_async_grpc_client.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py index f0d8b5bfb260..cea0c785788f 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_grpc_client.py @@ -54,6 +54,7 @@ def test_constructor_default_options(self, mock_async_storage_client): mock_transport_cls.create_channel.assert_called_once_with( host="storage.googleapis.com", + quota_project_id=None, attempt_direct_path=True, credentials=mock_creds, options=expected_options, @@ -84,26 +85,28 @@ def test_constructor_with_client_info(self, mock_async_storage_client): mock_transport_cls.create_channel.assert_called_once_with( host="storage.googleapis.com", + quota_project_id=None, attempt_direct_path=True, credentials=mock_creds, options=expected_options, ) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_constructor_with_quota_project_and_endpoint(self, mock_async_storage_client): + def test_constructor_with_quota_project_and_endpoint( + self, mock_async_storage_client + ): mock_transport_cls = mock.MagicMock() mock_async_storage_client.get_transport_class.return_value = mock_transport_cls mock_creds = _make_credentials() - + from google.api_core import client_options + mock_client_options = client_options.ClientOptions( - api_endpoint="custom-endpoint.com", - quota_project_id="my-quota-project" + api_endpoint="custom-endpoint.com", quota_project_id="my-quota-project" ) async_grpc_client.AsyncGrpcClient( - credentials=mock_creds, - client_options=mock_client_options + credentials=mock_creds, client_options=mock_client_options ) kwargs = mock_async_storage_client.call_args.kwargs @@ -138,6 +141,7 @@ def test_constructor_disables_directpath(self, mock_async_storage_client): mock_transport_cls.create_channel.assert_called_once_with( host="storage.googleapis.com", + quota_project_id=None, attempt_direct_path=False, credentials=mock_creds, options=expected_options, @@ -181,6 +185,7 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): mock_transport_cls.create_channel.assert_called_once_with( host="storage.googleapis.com", + quota_project_id=None, attempt_direct_path=mock_attempt_direct_path, credentials=mock_creds, options=expected_options, From 2a68c5384eef546f887e8cf1ee63e61b0c018cdf Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 24 Apr 2026 10:26:56 +0000 Subject: [PATCH 3/7] feat: Add reads_regional microbenchmark comparing JSON, gRPC DP, and gRPC CP --- .../time_based/reads_regional/config.py | 101 +++++++ .../time_based/reads_regional/config.yaml | 27 ++ .../time_based/reads_regional/parameters.py | 25 ++ .../time_based/reads_regional/test_reads.py | 252 ++++++++++++++++++ 4 files changed, 405 insertions(+) create mode 100644 packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py create mode 100644 packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml create mode 100644 packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py create mode 100644 packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py new file mode 100644 index 000000000000..88b2c1924784 --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py @@ -0,0 +1,101 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +import os +from typing import Dict, List + +import yaml + +try: + from tests.perf.microbenchmarks.time_based.reads_regional.parameters import ( + TimeBasedReadParameters, + ) +except ModuleNotFoundError: + from reads_regional.parameters import TimeBasedReadParameters + + +def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: + """Generates a dictionary of benchmark parameters for time based read operations.""" + params: Dict[str, List[TimeBasedReadParameters]] = {} + config_path = os.path.join(os.path.dirname(__file__), "config.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_params = config["common"] + read_types = common_params["read_types"] + file_sizes_mib = common_params["file_sizes_mib"] + chunk_sizes_kib = common_params["chunk_sizes_kib"] + num_ranges = common_params["num_ranges"] + rounds = common_params["rounds"] + duration = common_params["duration"] + warmup_duration = common_params["warmup_duration"] + + # All read types use the same regional bucket + bucket_name = os.environ.get( + "DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"] + ) + + for workload in config["workload"]: + workload_name = workload["name"] + params[workload_name] = [] + pattern = workload["pattern"] + processes = workload["processes"] + coros = workload["coros"] + + # Create a product of all parameter combinations + product = itertools.product( + read_types, + file_sizes_mib, + chunk_sizes_kib, + num_ranges, + processes, + coros, + ) + + for ( + read_type, + file_size_mib, + chunk_size_kib, + num_ranges_val, + num_processes, + num_coros, + ) in product: + file_size_bytes = file_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_kib * 1024 + + num_files = num_processes + + # Create a descriptive name for the parameter set + name = f"{pattern}_{read_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" + + params[workload_name].append( + TimeBasedReadParameters( + name=name, + workload_name=workload_name, + pattern=pattern, + bucket_name=bucket_name, + bucket_type="regional", + read_type=read_type, + num_coros=num_coros, + num_processes=num_processes, + num_files=num_files, + rounds=rounds, + chunk_size_bytes=chunk_size_bytes, + file_size_bytes=file_size_bytes, + duration=duration, + warmup_duration=warmup_duration, + num_ranges=num_ranges_val, + ) + ) + return params diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml new file mode 100644 index 000000000000..0f3a70cbf8a2 --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml @@ -0,0 +1,27 @@ +common: + read_types: + - "async_json" + - "async_grpc_dp" + - "async_grpc_cp" + file_sizes_mib: + - 10240 # 10GiB + chunk_sizes_kib: [64] + num_ranges: [1] + rounds: 1 + duration: 30 # seconds + warmup_duration: 5 # seconds + +workload: + ############# multi process multi coroutine ######### + - name: "read_seq_multi_process" + pattern: "seq" + coros: [1] + processes: [96] + + - name: "read_rand_multi_process" + pattern: "rand" + coros: [1, 16] + processes: [1] + +defaults: + DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py new file mode 100644 index 000000000000..f1b1cb3fb8ca --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py @@ -0,0 +1,25 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass + +from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters + + +@dataclass +class TimeBasedReadParameters(IOBenchmarkParameters): + pattern: str + duration: int + warmup_duration: int + num_ranges: int + read_type: str diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py new file mode 100644 index 000000000000..8d37edf5d415 --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py @@ -0,0 +1,252 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Microbenchmarks for time-based Google Cloud Storage read operations on regional buckets.""" + +import asyncio +import logging +import multiprocessing +import os +import random +import time +from io import BytesIO + +import pytest +import aiohttp +import subprocess +import math + +token = subprocess.run( + ["gcloud", "auth", "print-access-token"], + capture_output=True, + text=True, + check=True, +).stdout.strip() + + +import tests.perf.microbenchmarks.time_based.reads_regional.config as config +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) +from tests.perf.microbenchmarks._utils import ( + get_irq_affinity, + publish_benchmark_extra_info, +) +from tests.perf.microbenchmarks.conftest import ( + publish_resource_metrics, +) + +all_params = config._get_params() + + +async def create_client(attempt_direct_path=True): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient(attempt_direct_path=attempt_direct_path) + + +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_session = None + + +def _worker_init(read_type): + """Initializes a persistent event loop and client for each worker process.""" + cpu_affinity = get_irq_affinity() + if cpu_affinity: + os.sched_setaffinity( + 0, {i for i in range(0, os.cpu_count()) if i not in cpu_affinity} + ) + + global worker_loop, worker_client, worker_session + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + + if read_type == "async_json": + async def _init_session(): + return aiohttp.ClientSession() + worker_session = worker_loop.run_until_complete(_init_session()) + + import atexit + def _cleanup_session(): + if not worker_session.closed: + worker_loop.run_until_complete(worker_session.close()) + atexit.register(_cleanup_session) + elif read_type == "async_grpc_dp": + worker_client = worker_loop.run_until_complete(create_client(attempt_direct_path=True)) + elif read_type == "async_grpc_cp": + worker_client = worker_loop.run_until_complete(create_client(attempt_direct_path=False)) + + +async def _download_time_based_json_async(session, filename, params): + """Performs time-based downloads using the JSON API via aiohttp.""" + total_bytes_downloaded = 0 + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + bytes_in_iteration = 0 + for _ in range(params.num_ranges): + if params.pattern == "rand": + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + + url = f"https://storage.googleapis.com/storage/v1/b/{params.bucket_name}/o/{filename}?alt=media" + headers = { + "Authorization": f"Bearer {token}", + "Range": f"bytes={offset}-{offset + params.chunk_size_bytes - 1}", + } + async with session.get(url, headers=headers) as response: + data = await response.read() + bytes_in_iteration += len(data) + + if params.pattern == "seq": + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 + + assert bytes_in_iteration == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += bytes_in_iteration + + return total_bytes_downloaded + + +async def _download_time_based_async(client, filename, params): + mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) + await mrd.open() + + async def _worker_coro(): + total_bytes_downloaded = 0 + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + ranges = [] + if params.pattern == "rand": + for _ in range(params.num_ranges): + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + else: # seq + for _ in range(params.num_ranges): + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.download_ranges(ranges) + + bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) + assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges + return total_bytes_downloaded + + tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)] + results = await asyncio.gather(*tasks) + + await mrd.close() + return sum(results) + + +def _download_files_worker(process_idx, filename, params, read_type): + if read_type == "async_json": + return worker_loop.run_until_complete( + _download_time_based_json_async(worker_session, filename, params) + ) + else: # async_grpc_dp or async_grpc_cp + return worker_loop.run_until_complete( + _download_time_based_async(worker_client, filename, params) + ) + + +def download_files_mp_mc_wrapper(pool, files_names, params, read_type): + args = [(i, files_names[i], params, read_type) for i in range(len(files_names))] + + results = pool.starmap(_download_files_worker, args) + return sum(results) + + +@pytest.mark.parametrize( + "workload_params", + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], + indirect=True, + ids=lambda p: p.name, +) +def test_downloads_multi_proc_multi_coro( + benchmark, storage_client, monitor, workload_params +): + params, files_names = workload_params + logging.info(f"num files: {len(files_names)}") + + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.read_type,), + ) + + download_bytes_list = [] + + def target_wrapper(*args, **kwargs): + download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) + return + + try: + with monitor() as m: + benchmark.pedantic( + target=target_wrapper, + iterations=1, + rounds=params.rounds, + args=(files_names, params, params.read_type), + ) + finally: + pool.close() + pool.join() + total_bytes_downloaded = sum(download_bytes_list) + throughput_mib_s = ( + total_bytes_downloaded / params.duration / params.rounds + ) / (1024 * 1024) + benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print( + f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s" + ) + publish_benchmark_extra_info( + benchmark, + params, + download_bytes_list=download_bytes_list, + duration=params.duration, + ) + publish_resource_metrics(benchmark, m) From 40b1d7556be490539fe87634d5cb862c61ff53df Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 25 Apr 2026 07:19:12 +0000 Subject: [PATCH 4/7] feat: Add 'whole' pattern and update config to bytes in reads_regional benchmark --- .../time_based/reads_regional/config.py | 10 +++---- .../time_based/reads_regional/config.yaml | 9 ++++-- .../time_based/reads_regional/test_reads.py | 28 +++++++++++++++++-- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py index 88b2c1924784..f00a711c9324 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py @@ -34,7 +34,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: common_params = config["common"] read_types = common_params["read_types"] - file_sizes_mib = common_params["file_sizes_mib"] + file_sizes = common_params["file_sizes"] chunk_sizes_kib = common_params["chunk_sizes_kib"] num_ranges = common_params["num_ranges"] rounds = common_params["rounds"] @@ -56,7 +56,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: # Create a product of all parameter combinations product = itertools.product( read_types, - file_sizes_mib, + file_sizes, chunk_sizes_kib, num_ranges, processes, @@ -65,19 +65,19 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: for ( read_type, - file_size_mib, + file_size, chunk_size_kib, num_ranges_val, num_processes, num_coros, ) in product: - file_size_bytes = file_size_mib * 1024 * 1024 + file_size_bytes = file_size chunk_size_bytes = chunk_size_kib * 1024 num_files = num_processes # Create a descriptive name for the parameter set - name = f"{pattern}_{read_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" + name = f"{pattern}_{read_type}_{num_processes}p_{num_coros}c_{file_size / (1024 * 1024)}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" params[workload_name].append( TimeBasedReadParameters( diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml index 0f3a70cbf8a2..6555e4672a84 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml @@ -3,8 +3,8 @@ common: - "async_json" - "async_grpc_dp" - "async_grpc_cp" - file_sizes_mib: - - 10240 # 10GiB + file_sizes: + - 10737418240 # 10GiB in bytes chunk_sizes_kib: [64] num_ranges: [1] rounds: 1 @@ -23,5 +23,10 @@ workload: coros: [1, 16] processes: [1] + - name: "read_whole_multi_process" + pattern: "whole" + coros: [1] + processes: [1] + defaults: DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py index 8d37edf5d415..6d2e20f419c2 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py @@ -91,6 +91,15 @@ def _cleanup_session(): async def _download_time_based_json_async(session, filename, params): """Performs time-based downloads using the JSON API via aiohttp.""" + if params.pattern == "whole": + url = f"https://storage.googleapis.com/storage/v1/b/{params.bucket_name}/o/{filename}?alt=media" + headers = { + "Authorization": f"Bearer {token}", + } + async with session.get(url, headers=headers) as response: + data = await response.read() + return len(data) + total_bytes_downloaded = 0 offset = 0 is_warming_up = True @@ -137,6 +146,12 @@ async def _download_time_based_async(client, filename, params): mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) await mrd.open() + if params.pattern == "whole": + ranges = [(0, params.file_size_bytes, BytesIO())] + await mrd.download_ranges(ranges) + await mrd.close() + return ranges[0][2].getbuffer().nbytes + async def _worker_coro(): total_bytes_downloaded = 0 offset = 0 @@ -224,21 +239,28 @@ def target_wrapper(*args, **kwargs): download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) return + duration_pedantic = 0 try: with monitor() as m: + start_pedantic = time.monotonic() benchmark.pedantic( target=target_wrapper, iterations=1, rounds=params.rounds, args=(files_names, params, params.read_type), ) + end_pedantic = time.monotonic() + duration_pedantic = end_pedantic - start_pedantic finally: pool.close() pool.join() total_bytes_downloaded = sum(download_bytes_list) - throughput_mib_s = ( - total_bytes_downloaded / params.duration / params.rounds - ) / (1024 * 1024) + if params.pattern == "whole": + throughput_mib_s = (total_bytes_downloaded / duration_pedantic) / (1024 * 1024) + else: + throughput_mib_s = ( + total_bytes_downloaded / params.duration / params.rounds + ) / (1024 * 1024) benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" print( f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s" From c6c881edab1c6507d9f2348c96c2ce73903d754c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 26 Apr 2026 12:33:35 +0000 Subject: [PATCH 5/7] feat: Implement num_downloads_after_open and ignore_first_download in reads_regional --- .../time_based/reads_regional/config.py | 4 ++++ .../time_based/reads_regional/config.yaml | 2 ++ .../time_based/reads_regional/parameters.py | 2 ++ .../time_based/reads_regional/test_reads.py | 20 +++++++++++++------ 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py index f00a711c9324..714b30dff7d9 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.py @@ -40,6 +40,8 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: rounds = common_params["rounds"] duration = common_params["duration"] warmup_duration = common_params["warmup_duration"] + num_downloads_after_open = common_params["num_downloads_after_open"] + ignore_first_download = common_params["ignore_first_download"] # All read types use the same regional bucket bucket_name = os.environ.get( @@ -96,6 +98,8 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: duration=duration, warmup_duration=warmup_duration, num_ranges=num_ranges_val, + num_downloads_after_open=num_downloads_after_open, + ignore_first_download=ignore_first_download, ) ) return params diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml index 6555e4672a84..3dde687558e0 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/config.yaml @@ -8,6 +8,8 @@ common: chunk_sizes_kib: [64] num_ranges: [1] rounds: 1 + num_downloads_after_open: 3 + ignore_first_download: true duration: 30 # seconds warmup_duration: 5 # seconds diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py index f1b1cb3fb8ca..8209d746ab14 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/parameters.py @@ -23,3 +23,5 @@ class TimeBasedReadParameters(IOBenchmarkParameters): warmup_duration: int num_ranges: int read_type: str + num_downloads_after_open: int + ignore_first_download: bool diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py index 6d2e20f419c2..9e226e2202e7 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py @@ -96,9 +96,13 @@ async def _download_time_based_json_async(session, filename, params): headers = { "Authorization": f"Bearer {token}", } - async with session.get(url, headers=headers) as response: - data = await response.read() - return len(data) + total_bytes = 0 + for i in range(params.num_downloads_after_open): + async with session.get(url, headers=headers) as response: + data = await response.read() + if not (params.ignore_first_download and i == 0): + total_bytes += len(data) + return total_bytes total_bytes_downloaded = 0 offset = 0 @@ -147,10 +151,14 @@ async def _download_time_based_async(client, filename, params): await mrd.open() if params.pattern == "whole": - ranges = [(0, params.file_size_bytes, BytesIO())] - await mrd.download_ranges(ranges) + total_bytes = 0 + for i in range(params.num_downloads_after_open): + ranges = [(0, params.file_size_bytes, BytesIO())] + await mrd.download_ranges(ranges) + if not (params.ignore_first_download and i == 0): + total_bytes += ranges[0][2].getbuffer().nbytes await mrd.close() - return ranges[0][2].getbuffer().nbytes + return total_bytes async def _worker_coro(): total_bytes_downloaded = 0 From cc25b4654ff857c067e563b7835aa4763c70b35b Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 26 Apr 2026 12:35:15 +0000 Subject: [PATCH 6/7] feat: Add explicit Range header for 'whole' pattern in JSON read --- .../perf/microbenchmarks/time_based/reads_regional/test_reads.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py index 9e226e2202e7..59a422d19e36 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py @@ -95,6 +95,7 @@ async def _download_time_based_json_async(session, filename, params): url = f"https://storage.googleapis.com/storage/v1/b/{params.bucket_name}/o/{filename}?alt=media" headers = { "Authorization": f"Bearer {token}", + "Range": f"bytes=0-{params.file_size_bytes - 1}", } total_bytes = 0 for i in range(params.num_downloads_after_open): From 62a3c9155239432efb88d5ba0ff663f7167315c4 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 27 Apr 2026 05:20:50 +0000 Subject: [PATCH 7/7] feat: Fix syntax error in test_reads.py and update _utils.py for reads_regional benchmark --- .../tests/perf/microbenchmarks/_utils.py | 18 +++++++++++++++++- .../time_based/reads_regional/test_reads.py | 14 ++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py index 357432dc0c20..3dff5e588af7 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py @@ -46,13 +46,29 @@ def publish_benchmark_extra_info( benchmark.extra_info["bucket_name"] = params.bucket_name benchmark.extra_info["bucket_type"] = params.bucket_type benchmark.extra_info["processes"] = params.num_processes + benchmark.extra_info["num_downloads_after_open"] = params.num_downloads_after_open + benchmark.extra_info["ignore_first_download"] = params.ignore_first_download benchmark.group = benchmark_group if download_bytes_list is not None: assert duration is not None, ( "Duration must be provided if total_bytes_transferred is provided." ) - throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] + effective_downloads = params.num_downloads_after_open + if params.ignore_first_download: + effective_downloads -= 1 + effective_downloads = max(1, effective_downloads) + + if params.pattern == "whole": + # duration is total time for all rounds + duration_per_round = duration / len(download_bytes_list) + throughputs_list = [ + (x / duration_per_round / effective_downloads) / (1024 * 1024) + for x in download_bytes_list + ] + else: + # duration is time per round + throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] min_throughput = min(throughputs_list) max_throughput = max(throughputs_list) mean_throughput = statistics.mean(throughputs_list) diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py index 59a422d19e36..f9727d689fca 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads_regional/test_reads.py @@ -265,7 +265,17 @@ def target_wrapper(*args, **kwargs): pool.join() total_bytes_downloaded = sum(download_bytes_list) if params.pattern == "whole": - throughput_mib_s = (total_bytes_downloaded / duration_pedantic) / (1024 * 1024) + effective_downloads = ( + params.num_downloads_after_open - 1 + if params.ignore_first_download + else params.num_downloads_after_open + ) + effective_downloads = max(1, effective_downloads) + throughput_mib_s = ( + total_bytes_downloaded + / duration_pedantic + / effective_downloads + ) / (1024 * 1024) else: throughput_mib_s = ( total_bytes_downloaded / params.duration / params.rounds @@ -278,6 +288,6 @@ def target_wrapper(*args, **kwargs): benchmark, params, download_bytes_list=download_bytes_list, - duration=params.duration, + duration=duration_pedantic if params.pattern == "whole" else params.duration, ) publish_resource_metrics(benchmark, m)