diff --git a/docs/config.md b/docs/config.md index ccd13ad83..358a65cad 100644 --- a/docs/config.md +++ b/docs/config.md @@ -49,6 +49,23 @@ logger.info("This will be shown") # (2)! | `cacheShards` | `int` | `128` | Number of cache shards for parallel access (immutable) | | `batchSize` | `int` | `1000` | Number of URLs per batch (100–5000) | | `batchPollInterval` | `float` | `0.5` | Batch polling interval in seconds | +| `compression` | `CompressionConfig \| None` | `None` | Per-upload image-compression settings; see [Compression override](#compression-override) below. | + +#### Compression override + +```python +from rapidata import rapidata_config, CompressionConfig + +# Force the asset service to compress images at quality 70 with a max dimension of 1024px, +# regardless of the server-side default (which is currently off in production). +rapidata_config.upload.compression = CompressionConfig( + enabled=True, + quality=70, + max_dimension=1024, +) +``` + +Any field left as `None` falls back to the server-side default. Currently applies to single-asset uploads (`/asset/file` and `/asset/url`); batched URL uploads will pick the override up in a follow-up after the OpenAPI client regenerates. ## Environment Variables diff --git a/src/rapidata/__init__.py b/src/rapidata/__init__.py index 636de10df..fdeab21da 100644 --- a/src/rapidata/__init__.py +++ b/src/rapidata/__init__.py @@ -62,6 +62,7 @@ rapidata_config, logger, managed_print, + CompressionConfig, ) from . import types diff --git a/src/rapidata/rapidata_client/__init__.py b/src/rapidata/rapidata_client/__init__.py index 424880cf9..04cc33997 100644 --- a/src/rapidata/rapidata_client/__init__.py +++ b/src/rapidata/rapidata_client/__init__.py @@ -64,3 +64,4 @@ ) from .exceptions import FailedUploadException, FailedUpload from .config import rapidata_config, logger, managed_print +from .config.upload_config import CompressionConfig diff --git a/src/rapidata/rapidata_client/config/upload_config.py b/src/rapidata/rapidata_client/config/upload_config.py index 872b1664b..c1991243e 100644 --- a/src/rapidata/rapidata_client/config/upload_config.py +++ b/src/rapidata/rapidata_client/config/upload_config.py @@ -10,6 +10,74 @@ from rapidata.rapidata_client.config._env_utils import apply_env_overrides +class CompressionConfig(BaseModel): + """ + Per-upload override for the asset service's image-compression behaviour. + + Any field left as ``None`` falls back to the value the asset service has + configured globally. Set ``enabled`` to ``True`` (or ``False``) to force the + behaviour for this client regardless of the server default. Quality is + expected in the 1..100 range and ``max_dimension`` must be at least 1; both + are validated server-side. + + Currently applies to single-asset uploads (file paths and individual URLs + via the ``/asset/file`` and ``/asset/url`` endpoints). Batched URL uploads + via the orchestrator's ``/asset/batch-upload`` path will gain the same + override in a follow-up once the SDK's OpenAPI client is regenerated to + include the ``compression`` field on the batch input model. + + Attributes: + enabled (bool | None): Force compression on or off. ``None`` to defer to the server default. + quality (int | None): WebP quality (1..100) to use when compression runs. + max_dimension (int | None): Maximum width or height in pixels when compression runs. + """ + + model_config = ConfigDict(validate_assignment=True) + + enabled: bool | None = None + quality: int | None = None + max_dimension: int | None = None + + @field_validator("quality") + @classmethod + def _validate_quality(cls, v: int | None) -> int | None: + if v is not None and not 1 <= v <= 100: + raise ValueError("quality must be between 1 and 100") + return v + + @field_validator("max_dimension") + @classmethod + def _validate_max_dimension(cls, v: int | None) -> int | None: + if v is not None and v < 1: + raise ValueError("max_dimension must be at least 1") + return v + + def is_set(self) -> bool: + """ + Whether any field has been overridden from its default of ``None``. + ``enabled=False`` counts as set — it is the explicit "force compression + off" request, distinct from "defer to server default". + """ + return any( + v is not None for v in (self.enabled, self.quality, self.max_dimension) + ) + + def cache_suffix(self) -> str: + """ + Stable string used as part of the asset upload cache key so that + the same source asset uploaded under different compression settings + does not collide on a single cache entry. + + The separator characters ``|``, ``/`` and ``=`` are reserved — none of + the existing field types (``bool``, ``int``) can produce them, so the + suffix round-trips unambiguously. Revisit this if a free-form string + field is ever added. + """ + if not self.is_set(): + return "" + return f"|c={self.enabled}/{self.quality}/{self.max_dimension}" + + class UploadConfig(BaseModel): """ Holds the configuration for the upload process. @@ -29,6 +97,8 @@ class UploadConfig(BaseModel): enableBatchUpload (bool): Enable batch URL uploading (two-step process). Defaults to True. batchSize (int): Number of URLs per batch (100-5000). Defaults to 1000. batchPollInterval (float): Polling interval in seconds. Defaults to 0.5. + compression (CompressionConfig | None): Per-upload override for the asset service's + image-compression behaviour. Defaults to None (use server-side defaults). """ model_config = ConfigDict(validate_assignment=True) @@ -63,6 +133,10 @@ def _apply_env_vars(cls, data: Any) -> Any: default=0.5, description="Polling interval in seconds", ) + compression: CompressionConfig | None = Field( + default=None, + description="Per-upload override for image compression. None uses server defaults.", + ) @field_validator("maxWorkers") @classmethod diff --git a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py index 415a8c959..d93ba937b 100644 --- a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py +++ b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py @@ -3,10 +3,12 @@ import re import os import threading +from typing import Any from rapidata.api_client.models.i_asset_input import IAssetInput from rapidata.service.openapi_service import OpenAPIService from rapidata.rapidata_client.config import logger, rapidata_config, tracer +from rapidata.rapidata_client.config.upload_config import CompressionConfig from rapidata.rapidata_client.datapoints._asset_mapper import AssetMapper from rapidata.rapidata_client.datapoints._single_flight_cache import SingleFlightCache from diskcache import FanoutCache @@ -64,19 +66,70 @@ def _get_file_cache(cls) -> SingleFlightCache: def __init__(self, openapi_service: OpenAPIService) -> None: self.openapi_service = openapi_service + # NOTE: the public ``get_*_cache_key`` helpers read + # ``rapidata_config.upload.compression`` at call time rather than taking a + # snapshot, so the upload-path call sites (which DO snapshot inside + # ``_upload_*_asset``) and external probes can drift if another thread + # mutates the config between the two reads. That asymmetry is intentional: + # external probes only ever cause cache misses (not stale uploads), and + # the upload path is the only one that must keep kwargs and cache key in + # lockstep — pass a snapshot to ``_build_*_cache_key`` directly when that + # consistency matters. def get_file_cache_key(self, asset: str) -> str: - """Generate cache key for a file, including environment.""" + """Generate cache key for a file, including environment and current compression settings.""" + return self._build_file_cache_key(asset, rapidata_config.upload.compression) + + def get_url_cache_key(self, url: str) -> str: + """Generate cache key for a URL, including environment and current compression settings.""" + return self._build_url_cache_key(url, rapidata_config.upload.compression) + + def _build_file_cache_key( + self, asset: str, compression: CompressionConfig | None + ) -> str: env = self.openapi_service.environment try: stat = os.stat(asset) except FileNotFoundError: raise FileNotFoundError(f"File not found: {asset}") from None - return f"{env}@{asset}:{stat.st_size}:{stat.st_mtime_ns}" + return ( + f"{env}@{asset}:{stat.st_size}:{stat.st_mtime_ns}" + f"{self._compression_cache_suffix(compression)}" + ) - def get_url_cache_key(self, url: str) -> str: - """Generate cache key for a URL, including environment.""" + def _build_url_cache_key( + self, url: str, compression: CompressionConfig | None + ) -> str: env = self.openapi_service.environment - return f"{env}@{url}" + return f"{env}@{url}{self._compression_cache_suffix(compression)}" + + @staticmethod + def _compression_cache_suffix(compression: CompressionConfig | None) -> str: + return compression.cache_suffix() if compression is not None else "" + + @staticmethod + def _compression_kwargs(compression: CompressionConfig | None) -> dict[str, Any]: + """ + Build the kwargs dict to pass to the OpenAPI upload methods. Only includes + fields the user explicitly set so the call shape is unchanged when + compression is disabled — keeping us forward-compatible with older + OpenAPI clients that don't yet expose the params. + + Keys match the wire-format parameter names exposed by the asset service + (``compress``, ``quality``, ``maxdim``) — note that the Pythonic + ``max_dimension`` field on ``CompressionConfig`` is mapped to the + lowercase ``maxdim`` query parameter, matching the existing + ``/asset/compress`` endpoint convention. + """ + if compression is None or not compression.is_set(): + return {} + kwargs: dict[str, Any] = {} + if compression.enabled is not None: + kwargs["compress"] = compression.enabled + if compression.quality is not None: + kwargs["quality"] = compression.quality + if compression.max_dimension is not None: + kwargs["maxdim"] = compression.max_dimension + return kwargs def _upload_url_asset(self, url: str) -> str: """ @@ -85,15 +138,24 @@ def _upload_url_asset(self, url: str) -> str: URLs are always cached in-memory (lightweight, no disk I/O overhead). Caching is required for the two-step upload flow and cannot be disabled. """ + # Snapshot the compression config once so the kwargs we send and the + # cache key we look it up under are guaranteed consistent — without + # this, another thread mutating rapidata_config.upload.compression + # between the two reads could land mismatched kwargs/cache entries. + compression = rapidata_config.upload.compression + kwargs = self._compression_kwargs(compression) + cache_key = self._build_url_cache_key(url, compression) def upload_url() -> str: - response = self.openapi_service.asset.asset_api.asset_url_post(url=url) + response = self.openapi_service.asset.asset_api.asset_url_post( + url=url, **kwargs + ) logger.info( "Asset uploaded from URL: %s, file name: %s", url, response.file_name ) return response.file_name - return self._url_cache.get_or_fetch(self.get_url_cache_key(url), upload_url) + return self._url_cache.get_or_fetch(cache_key, upload_url) def _upload_file_asset(self, file_path: str) -> str: """ @@ -102,9 +164,15 @@ def _upload_file_asset(self, file_path: str) -> str: Caching is always enabled as it's required for the two-step upload flow. Use cacheToDisk config to control whether cache is stored to disk or memory. """ + # See _upload_url_asset for why this single-snapshot pattern matters. + compression = rapidata_config.upload.compression + kwargs = self._compression_kwargs(compression) + cache_key = self._build_file_cache_key(file_path, compression) def upload_file() -> str: - response = self.openapi_service.asset.asset_api.asset_file_post(file=file_path) + response = self.openapi_service.asset.asset_api.asset_file_post( + file=file_path, **kwargs + ) logger.info( "Asset uploaded from file: %s, file name: %s", file_path, @@ -112,9 +180,7 @@ def upload_file() -> str: ) return response.file_name - return self._get_file_cache().get_or_fetch( - self.get_file_cache_key(file_path), upload_file - ) + return self._get_file_cache().get_or_fetch(cache_key, upload_file) # Accept http / https / HTTP / HTTPS / mixed case — people type URLs # by hand or copy them from docs with varying capitalisation.