diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py
index 338e7d06..d991f123 100644
--- a/xrspatial/geotiff/_reader.py
+++ b/xrspatial/geotiff/_reader.py
@@ -19,6 +19,27 @@
from ._geotags import GeoInfo, GeoTransform, extract_geo_info
from ._header import IFD, TIFFHeader, parse_all_ifds, parse_header
+# ---------------------------------------------------------------------------
+# Allocation guard: reject TIFF dimensions that would exhaust memory
+# ---------------------------------------------------------------------------
+
+#: Default maximum total pixel count (width * height * samples).
+#: ~1 billion pixels, which is ~4 GB for float32 single-band.
+#: Override per-call via the ``max_pixels`` keyword argument.
+MAX_PIXELS_DEFAULT = 1_000_000_000
+
+
+def _check_dimensions(width, height, samples, max_pixels):
+ """Raise ValueError if the requested allocation exceeds *max_pixels*."""
+ total = width * height * samples
+ if total > max_pixels:
+ raise ValueError(
+ f"TIFF image dimensions ({width} x {height} x {samples} = "
+ f"{total:,} pixels) exceed the safety limit of "
+ f"{max_pixels:,} pixels. Pass a larger max_pixels value to "
+ f"read_to_array() if this file is legitimate."
+ )
+
# ---------------------------------------------------------------------------
# Data source abstraction
@@ -292,7 +313,8 @@ def _decode_strip_or_tile(data_slice, compression, width, height, samples,
# ---------------------------------------------------------------------------
def _read_strips(data: bytes, ifd: IFD, header: TIFFHeader,
- dtype: np.dtype, window=None) -> np.ndarray:
+ dtype: np.dtype, window=None,
+ max_pixels: int = MAX_PIXELS_DEFAULT) -> np.ndarray:
"""Read a strip-organized TIFF image.
Parameters
@@ -307,6 +329,8 @@ def _read_strips(data: bytes, ifd: IFD, header: TIFFHeader,
Output pixel dtype.
window : tuple or None
(row_start, col_start, row_stop, col_stop) or None for full image.
+ max_pixels : int
+ Maximum allowed pixel count (width * height * samples).
Returns
-------
@@ -344,6 +368,8 @@ def _read_strips(data: bytes, ifd: IFD, header: TIFFHeader,
out_h = r1 - r0
out_w = c1 - c0
+ _check_dimensions(out_w, out_h, samples, max_pixels)
+
if samples > 1:
result = np.empty((out_h, out_w, samples), dtype=dtype)
else:
@@ -408,7 +434,8 @@ def _read_strips(data: bytes, ifd: IFD, header: TIFFHeader,
# ---------------------------------------------------------------------------
def _read_tiles(data: bytes, ifd: IFD, header: TIFFHeader,
- dtype: np.dtype, window=None) -> np.ndarray:
+ dtype: np.dtype, window=None,
+ max_pixels: int = MAX_PIXELS_DEFAULT) -> np.ndarray:
"""Read a tile-organized TIFF image.
Parameters
@@ -423,6 +450,8 @@ def _read_tiles(data: bytes, ifd: IFD, header: TIFFHeader,
Output pixel dtype.
window : tuple or None
(row_start, col_start, row_stop, col_stop) or None for full image.
+ max_pixels : int
+ Maximum allowed pixel count (width * height * samples).
Returns
-------
@@ -462,6 +491,8 @@ def _read_tiles(data: bytes, ifd: IFD, header: TIFFHeader,
out_h = r1 - r0
out_w = c1 - c0
+ _check_dimensions(out_w, out_h, samples, max_pixels)
+
_alloc = np.zeros if window is not None else np.empty
if samples > 1:
result = _alloc((out_h, out_w, samples), dtype=dtype)
@@ -545,7 +576,9 @@ def _decode_one(job):
# ---------------------------------------------------------------------------
def _read_cog_http(url: str, overview_level: int | None = None,
- band: int | None = None) -> tuple[np.ndarray, GeoInfo]:
+ band: int | None = None,
+ max_pixels: int = MAX_PIXELS_DEFAULT,
+ ) -> tuple[np.ndarray, GeoInfo]:
"""Read a COG via HTTP range requests.
Parameters
@@ -556,6 +589,8 @@ def _read_cog_http(url: str, overview_level: int | None = None,
Which overview to read (0 = full res, 1 = first overview, etc.).
band : int
Band index (0-based, for multi-band files).
+ max_pixels : int
+ Maximum allowed pixel count (width * height * samples).
Returns
-------
@@ -613,6 +648,8 @@ def _read_cog_http(url: str, overview_level: int | None = None,
tiles_across = math.ceil(width / tw)
tiles_down = math.ceil(height / th)
+ _check_dimensions(width, height, samples, max_pixels)
+
if samples > 1:
result = np.empty((height, width, samples), dtype=dtype)
else:
@@ -653,7 +690,9 @@ def _read_cog_http(url: str, overview_level: int | None = None,
# ---------------------------------------------------------------------------
def read_to_array(source: str, *, window=None, overview_level: int | None = None,
- band: int | None = None) -> tuple[np.ndarray, GeoInfo]:
+ band: int | None = None,
+ max_pixels: int = MAX_PIXELS_DEFAULT,
+ ) -> tuple[np.ndarray, GeoInfo]:
"""Read a GeoTIFF/COG to a numpy array.
Parameters
@@ -666,13 +705,18 @@ def read_to_array(source: str, *, window=None, overview_level: int | None = None
Overview level (0 = full res).
band : int
Band index for multi-band files.
+ max_pixels : int
+ Maximum allowed total pixel count (width * height * samples).
+ Prevents memory exhaustion from crafted TIFF headers.
+ Default is 1 billion (~4 GB for float32 single-band).
Returns
-------
(np.ndarray, GeoInfo) tuple
"""
if source.startswith(('http://', 'https://')):
- return _read_cog_http(source, overview_level=overview_level, band=band)
+ return _read_cog_http(source, overview_level=overview_level, band=band,
+ max_pixels=max_pixels)
# Local file or cloud storage: read all bytes then parse
if _is_fsspec_uri(source):
@@ -701,9 +745,11 @@ def read_to_array(source: str, *, window=None, overview_level: int | None = None
geo_info = extract_geo_info(ifd, data, header.byte_order)
if ifd.is_tiled:
- arr = _read_tiles(data, ifd, header, dtype, window)
+ arr = _read_tiles(data, ifd, header, dtype, window,
+ max_pixels=max_pixels)
else:
- arr = _read_strips(data, ifd, header, dtype, window)
+ arr = _read_strips(data, ifd, header, dtype, window,
+ max_pixels=max_pixels)
# For multi-band with band selection, extract single band
if arr.ndim == 3 and ifd.samples_per_pixel > 1 and band is not None:
diff --git a/xrspatial/geotiff/_vrt.py b/xrspatial/geotiff/_vrt.py
index ca817f14..616c84b1 100644
--- a/xrspatial/geotiff/_vrt.py
+++ b/xrspatial/geotiff/_vrt.py
@@ -136,6 +136,8 @@ def parse_vrt(xml_str: str, vrt_dir: str = '.') -> VRTDataset:
relative.get('relativeToVRT', '0') == '1')
if is_relative and not os.path.isabs(filename):
filename = os.path.join(vrt_dir, filename)
+ # Canonicalize to prevent path traversal (e.g. ../)
+ filename = os.path.realpath(filename)
src_band = int(_text(src_elem, 'SourceBand') or '1')
diff --git a/xrspatial/geotiff/tests/test_features.py b/xrspatial/geotiff/tests/test_features.py
index 06d76cfd..8aabc689 100644
--- a/xrspatial/geotiff/tests/test_features.py
+++ b/xrspatial/geotiff/tests/test_features.py
@@ -1,6 +1,8 @@
"""Tests for new features: multi-band, integer nodata, packbits, zstd, dask, BigTIFF."""
from __future__ import annotations
+import os
+
import numpy as np
import pytest
import xarray as xr
@@ -823,12 +825,10 @@ def test_vrt_parser(self):
assert vrt.bands[0].nodata == 0.0
assert len(vrt.bands[0].sources) == 1
src = vrt.bands[0].sources[0]
- assert src.filename == '/data/tile.tif'
+ assert src.filename == os.path.realpath('/data/tile.tif')
assert src.src_rect.x_off == 10
-import os
-
class TestCloudStorage:
def test_cloud_scheme_detection(self):
diff --git a/xrspatial/geotiff/tests/test_security.py b/xrspatial/geotiff/tests/test_security.py
new file mode 100644
index 00000000..a3dd8c88
--- /dev/null
+++ b/xrspatial/geotiff/tests/test_security.py
@@ -0,0 +1,194 @@
+"""Security tests for the geotiff subpackage.
+
+Tests for:
+- Unbounded allocation guard (issue #1184)
+- VRT path traversal prevention (issue #1185)
+"""
+from __future__ import annotations
+
+import os
+import struct
+import tempfile
+
+import numpy as np
+import pytest
+
+from xrspatial.geotiff._reader import (
+ MAX_PIXELS_DEFAULT,
+ _check_dimensions,
+ _read_strips,
+ _read_tiles,
+ read_to_array,
+)
+from xrspatial.geotiff._header import parse_header, parse_all_ifds
+from xrspatial.geotiff._dtypes import tiff_dtype_to_numpy
+from .conftest import make_minimal_tiff
+
+
+# ---------------------------------------------------------------------------
+# Cat 1: Unbounded allocation guard
+# ---------------------------------------------------------------------------
+
+class TestDimensionGuard:
+ def test_check_dimensions_rejects_oversized(self):
+ """_check_dimensions raises when total pixels exceed the limit."""
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ _check_dimensions(100_000, 100_000, 1, MAX_PIXELS_DEFAULT)
+
+ def test_check_dimensions_accepts_normal(self):
+ """_check_dimensions does not raise for normal sizes."""
+ _check_dimensions(1000, 1000, 1, MAX_PIXELS_DEFAULT)
+
+ def test_check_dimensions_considers_samples(self):
+ """Multi-band images multiply the pixel budget."""
+ # 50_000 x 50_000 x 3 = 7.5 billion, should be rejected
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ _check_dimensions(50_000, 50_000, 3, MAX_PIXELS_DEFAULT)
+
+ def test_custom_limit(self):
+ """A custom max_pixels lets callers tighten or relax the limit."""
+ # Tight limit: 100 pixels
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ _check_dimensions(20, 20, 1, max_pixels=100)
+
+ # Relaxed: passes with large limit
+ _check_dimensions(100_000, 100_000, 1, max_pixels=100_000_000_000)
+
+ def test_read_strips_rejects_huge_header(self):
+ """_read_strips refuses to allocate when header claims huge dims."""
+ # Build a valid TIFF with small pixel data but huge header dimensions.
+ # We fake the header to claim 100000x100000 but only provide 4x4 data.
+ data = make_minimal_tiff(4, 4, np.dtype('float32'))
+ header = parse_header(data)
+ ifds = parse_all_ifds(data, header)
+ ifd = ifds[0]
+
+ # Monkey-patch the IFD width/height to simulate a crafted header
+ from xrspatial.geotiff._header import IFDEntry
+ ifd.entries[256] = IFDEntry(tag=256, type_id=3, count=1, value=100_000)
+ ifd.entries[257] = IFDEntry(tag=257, type_id=3, count=1, value=100_000)
+
+ dtype = tiff_dtype_to_numpy(ifd.bits_per_sample, ifd.sample_format)
+
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ _read_strips(data, ifd, header, dtype, max_pixels=1_000_000)
+
+ def test_read_tiles_rejects_huge_header(self):
+ """_read_tiles refuses to allocate when header claims huge dims."""
+ data = make_minimal_tiff(8, 8, np.dtype('float32'), tiled=True, tile_size=4)
+ header = parse_header(data)
+ ifds = parse_all_ifds(data, header)
+ ifd = ifds[0]
+
+ from xrspatial.geotiff._header import IFDEntry
+ ifd.entries[256] = IFDEntry(tag=256, type_id=3, count=1, value=100_000)
+ ifd.entries[257] = IFDEntry(tag=257, type_id=3, count=1, value=100_000)
+
+ dtype = tiff_dtype_to_numpy(ifd.bits_per_sample, ifd.sample_format)
+
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ _read_tiles(data, ifd, header, dtype, max_pixels=1_000_000)
+
+ def test_read_to_array_max_pixels_kwarg(self, tmp_path):
+ """read_to_array passes max_pixels through to the internal readers."""
+ expected = np.arange(16, dtype=np.float32).reshape(4, 4)
+ data = make_minimal_tiff(4, 4, np.dtype('float32'), pixel_data=expected)
+ path = str(tmp_path / "small.tif")
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ # Should succeed with a generous limit
+ arr, _ = read_to_array(path, max_pixels=1_000_000)
+ np.testing.assert_array_equal(arr, expected)
+
+ # Should fail with a tiny limit
+ with pytest.raises(ValueError, match="exceed the safety limit"):
+ read_to_array(path, max_pixels=10)
+
+ def test_normal_read_unaffected(self, tmp_path):
+ """Normal reads within the default limit are not affected."""
+ expected = np.arange(64, dtype=np.float32).reshape(8, 8)
+ data = make_minimal_tiff(8, 8, np.dtype('float32'), pixel_data=expected)
+ path = str(tmp_path / "normal.tif")
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ arr, _ = read_to_array(path)
+ np.testing.assert_array_equal(arr, expected)
+
+
+# ---------------------------------------------------------------------------
+# Cat 5: VRT path traversal
+# ---------------------------------------------------------------------------
+
+class TestVRTPathTraversal:
+ def test_relative_path_canonicalized(self, tmp_path):
+ """Relative paths in VRT SourceFilename are canonicalized."""
+ from xrspatial.geotiff._vrt import parse_vrt
+
+ vrt_xml = '''
+
+
+ ../../../etc/shadow
+ 1
+
+
+
+
+'''
+
+ vrt_dir = str(tmp_path / "subdir")
+ os.makedirs(vrt_dir)
+
+ vrt = parse_vrt(vrt_xml, vrt_dir)
+ source_path = vrt.bands[0].sources[0].filename
+
+ # After canonicalization, the path should NOT contain ".."
+ assert ".." not in source_path
+ # It should be an absolute path
+ assert os.path.isabs(source_path)
+ # Verify it was resolved through realpath
+ assert source_path == os.path.realpath(source_path)
+
+ def test_normal_relative_path_still_works(self, tmp_path):
+ """Normal relative paths without traversal still resolve correctly."""
+ from xrspatial.geotiff._vrt import parse_vrt
+
+ vrt_xml = '''
+
+
+ data/tile.tif
+ 1
+
+
+
+
+'''
+
+ vrt_dir = str(tmp_path)
+ vrt = parse_vrt(vrt_xml, vrt_dir)
+ source_path = vrt.bands[0].sources[0].filename
+
+ expected = os.path.realpath(os.path.join(vrt_dir, "data", "tile.tif"))
+ assert source_path == expected
+
+ def test_absolute_path_also_canonicalized(self, tmp_path):
+ """Absolute paths in VRT are also canonicalized."""
+ from xrspatial.geotiff._vrt import parse_vrt
+
+ vrt_xml = '''
+
+
+ /tmp/../tmp/test.tif
+ 1
+
+
+
+
+'''
+
+ vrt = parse_vrt(vrt_xml, str(tmp_path))
+ source_path = vrt.bands[0].sources[0].filename
+
+ assert ".." not in source_path
+ assert source_path == os.path.realpath("/tmp/../tmp/test.tif")