From 550ffa229c23f840be6e8e134790930f5a9b1307 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 08:01:12 +0200 Subject: [PATCH 1/6] New plan for variable length columns based on new ListArray --- plans/ctable-varlen-cols.md | 1080 +++++++++++++++++++++++++++++++++++ 1 file changed, 1080 insertions(+) create mode 100644 plans/ctable-varlen-cols.md diff --git a/plans/ctable-varlen-cols.md b/plans/ctable-varlen-cols.md new file mode 100644 index 00000000..451c4885 --- /dev/null +++ b/plans/ctable-varlen-cols.md @@ -0,0 +1,1080 @@ +# CTable Variable-Length Columns Implementation Plan + +## Summary + +Add support for variable-length list columns to `CTable` via a new logical list type: + +- public schema API: `b2.list(...)` +- physical row-oriented container: `blosc2.ListArray` +- internal storage backends: + - `VLArray` for row-oriented point updates + - `BatchArray` for append/read efficiency + +The design goal is to let users declare typed list columns in a `CTable` schema without exposing backend details unless they want to tune them. + +`ListArray` should also be treated as a first-class public container for row-oriented list-valued data, not merely as an internal `CTable` helper. At the same time, it should not be positioned as a replacement for `VLArray` or `BatchArray`; those remain the lower-level variable-length building blocks for more generic or explicitly batch-oriented workloads. + +Example target API: + +```python +from dataclasses import dataclass +import blosc2 as b2 + + +@dataclass +class Product: + code: str = b2.field(b2.string(max_length=32)) + ingredients: list[str] = b2.field(b2.list(b2.string(), nullable=True)) + allergens: list[str] = b2.field( + b2.list(b2.string(), storage="batch", serializer="msgpack") + ) +``` + +--- + +## Final decisions already made + +### Public API + +- Use `b2.list(...)`, not `b2.list_(...)`. +- Use `cell` only as a documentation/design term when helpful, not as a formal API term. +- `ListArray` is the public row-oriented container abstraction for variable-length list columns. +- `ListArray` should be documented as a first-class container in its own right, useful both inside and outside `CTable`. +- `ListArray` should not be presented as replacing `VLArray` or `BatchArray`; instead, it should be positioned as the natural high-level container for row-oriented list data, while `VLArray` and `BatchArray` remain the lower-level building blocks. + +### Defaults + +- default `storage="batch"` +- default `serializer="msgpack"` +- default `nullable=False` +- `serializer="arrow"` remains optional and must not introduce a hard `pyarrow` dependency +- `serializer="arrow"` is only allowed with `storage="batch"` + +### Null semantics + +For V1, distinguish: + +- `None` → null list cell +- `[]` → empty list cell + +Do not support nullable items inside the list by default. + +So V1 supports: + +- `nullable=True|False` for the whole list cell +- no `item_nullable=True` behavior yet + +### Update semantics + +Support **explicit whole-cell replacement only**: + +```python +t.ingredients[5] = ["salt", "sugar"] +``` + +Do not support implicit write-through mutation of returned Python objects: + +```python +x = t.ingredients[5] +x.append("salt") # local only +# user must reassign +``` + +### Batch layout policy + +- default `batch_rows` should follow the column chunk size +- batch-backed list columns should use an internal append buffer in `ListArray` +- buffering lives in `ListArray`, not in `BatchArray` +- flushes occur: + - when buffer reaches `batch_rows` + - on explicit `flush()` + - on persistence boundaries such as `save()` / `close()` + - before exports that must observe all rows (e.g. `to_arrow()`) + +### V1 scope + +Support in V1: + +- schema declaration via `b2.list(...)` +- append / extend +- row reads +- whole-cell replacement +- persistence (`save`, `open`, `load`) +- standalone `ListArray` reopen through `blosc2.open()` / `blosc2.from_cframe()` +- `head`, `tail`, `select`, and scalar-driven `where()`/view operations +- `compact()` +- `to_arrow()` / `from_arrow()` +- display / info support + +Explicitly out of scope for V1: + +- indexes on list columns +- computed columns over list columns +- sorting by list columns +- list-aware predicates such as contains / overlaps +- nullable items inside a list +- nested list-of-list / struct / map types +- standalone insert/delete API on `ListArray` + +--- + +## Main design principle + +Do **not** force list columns into the current scalar `np.dtype` model. + +Today the schema/compiler/storage path assumes that every column: + +- has a scalar `np.dtype` +- is physically stored as an `NDArray` +- can be coerced with scalar NumPy conversion rules + +That is not true for list columns. + +Instead, the refactor should distinguish: + +- logical scalar columns +- logical list columns + +and separately distinguish their physical storage: + +- scalar column → `NDArray` +- list column → `ListArray` + +This keeps the scalar path fast and clean while adding a first-class path for variable-length lists. + +--- + +## High-level architecture + +### Logical layer + +Add a new schema descriptor: + +- `ListSpec` + +Keep existing scalar specs, but conceptually move toward: + +- `ColumnSpec` + - `ScalarSpec` + - `ListSpec` + +This can be implemented either by introducing explicit base classes or by broadening the meaning of the current spec system. The important part is that `CompiledColumn` and `CTable` must stop assuming every spec has a scalar `dtype`. + +### Physical layer + +Add a new container: + +- `blosc2.ListArray` + +`ListArray` is cell-oriented: + +- `arr[i]` returns one list cell or `None` +- `arr[i:j]` returns a Python `list` of cells +- `arr[i] = value` replaces one cell +- `append(value)` appends one cell +- `extend(values)` appends many cells + +Internally it wraps one of: + +- `VLArray` +- `BatchArray` + +### CTable layer + +Teach `CTable` to manage two families of physical columns: + +- scalar columns backed by `NDArray` +- list columns backed by `ListArray` + +`CTable` should understand list columns at the schema and storage levels, but should not need backend-specific logic beyond creation/open/flush/update hooks. + +--- + +## Phase 1: Schema system changes + +## 1.1 Add `b2.list(...)` + +Add a new public builder in `src/blosc2/schema.py`: + +```python +def list( + item_spec, + *, + nullable=False, + storage="batch", + serializer="msgpack", + batch_rows=None, + items_per_block=None, +): ... +``` + +Initial accepted parameters: + +- `item_spec` + - typically a scalar spec such as `b2.string()` or `b2.int32()` +- `nullable` + - whether the whole list cell may be `None` +- `storage` + - `"batch"`, `"vl"` +- `serializer` + - `"msgpack"`, `"arrow"` +- `batch_rows` + - optional row count per persisted batch for batch backend +- `items_per_block` + - forwarded to `BatchArray` when backend is batch + +Validation rules for V1: + +- `storage` must be `"batch"` or `"vl"` +- `serializer` must be `"msgpack"` or `"arrow"` +- if `storage == "vl"`, serializer must be `"msgpack"` +- if `serializer == "arrow"`, storage must be `"batch"` +- `item_spec` should initially be restricted to scalar specs + +## 1.2 Introduce `ListSpec` + +Add a new schema descriptor class with at least: + +- `python_type = list` +- `item_spec` +- `nullable` +- `storage` +- `serializer` +- `batch_rows` +- `items_per_block` + +Methods analogous to existing specs: + +- `to_metadata_dict()` +- optional `display_label()` helper or equivalent + +Suggested serialized form: + +```json +{ + "kind": "list", + "item": {"kind": "string", "max_length": 64}, + "nullable": true, + "storage": "batch", + "serializer": "msgpack", + "batch_rows": 65536, + "items_per_block": 256 +} +``` + +## 1.3 Broaden `field()` acceptance + +`b2.field(...)` should accept any valid column spec, not just scalar specs. + +The implementation contract becomes: + +- scalar field spec allowed +- list field spec allowed + +No user-visible API change beyond this. + +--- + +## Phase 2: Schema compiler changes + +## 2.1 Relax `CompiledColumn` + +`CompiledColumn` currently assumes a scalar `dtype`. Refactor it so list columns are first-class. + +Target shape: + +- `name` +- `py_type` +- `spec` +- `default` +- `config` +- `display_width` +- optional scalar dtype information only when applicable + +There are two acceptable implementation styles: + +### Option A: minimal change + +Keep `dtype` on `CompiledColumn`, but allow it to be `None` for non-scalar columns. + +### Option B: cleaner long-term change + +Replace mandatory `dtype` with something like: + +- `storage_dtype: np.dtype | None` +- `logical_kind` +- convenience properties such as `is_scalar`, `is_list` + +Recommendation: choose the smallest refactor that avoids fake object dtypes. + +## 2.2 Update annotation validation + +Current annotation validation is scalar-oriented. Extend it to support: + +```python +ingredients: list[str] = b2.field(b2.list(b2.string())) +``` + +Compiler responsibilities: + +- inspect `typing.get_origin(annotation)` +- inspect `typing.get_args(annotation)` +- validate that `list[...]` annotations match `ListSpec` +- validate that the item annotation matches `item_spec` + +For V1, support: + +- built-in `list[T]` +- likely `typing.List[T]` as a compatibility path if desired + +Restrict V1 item annotations to scalar item types. + +## 2.3 Schema serialization/deserialization + +Extend `schema_to_dict()` / `schema_from_dict()` so list specs round-trip through stored schema metadata. + +This includes: + +- emitting `kind="list"` +- recursively serializing `item_spec` +- restoring `ListSpec` on reopen/load + +--- + +## Phase 3: Add `ListArray` + +## 3.1 Public role + +Create a new file: + +- `src/blosc2/list_array.py` + +And export it from: + +- `src/blosc2/__init__.py` + +`ListArray` should be the row-oriented facade used by `CTable` and also a standalone public container for users working with row-oriented list-valued data. + +It should not expose `BatchArray`'s native batch-oriented semantics. + +Documentation should encourage `ListArray` for typed, row-oriented list data, while still keeping `VLArray` and `BatchArray` visible as the lower-level containers for arbitrary object payloads and explicitly batch-oriented workflows. + +## 3.2 Core API + +Initial API: + +- constructor taking list spec / backend hints / storage kwargs +- `append(value)` +- `extend(values)` +- `flush()` +- `close()` +- `__enter__()` / `__exit__()` +- `__getitem__(index|slice)` +- `__setitem__(index, value)` +- `__len__()` +- `__iter__()` +- `copy(**kwargs)` +- `info` +- `to_arrow()` when possible +- `from_arrow()` if useful as a constructor helper + +V1 read behavior: + +- `arr[i]` → `list | None` +- `arr[i:j]` → `list[list | None]` + +No item-level API like `arr.items` is required for V1. + +## 3.3 Validation/coercion inside `ListArray` + +`ListArray` should validate cell values against the provided `ListSpec`. + +Rules for V1: + +- `None` allowed only if `nullable=True` +- otherwise value must be list-like +- strings / bytes are not accepted as list-like cells +- each item must satisfy `item_spec` +- `None` items rejected for V1 + +The goal is that `ListArray` can be safely used both inside and outside `CTable`. + +## 3.4 Backend selection + +Selection policy: + +- `storage="batch"` → batch backend +- `storage="vl"` → VL backend +- `serializer="arrow"` only valid with batch backend +- default backend = batch +- default serializer = msgpack + +Implementation note: backend choice should be explicit in metadata and persisted schema, not inferred later from heuristics. + +--- + +## Phase 4: `ListArray` backend implementation + +## 4.1 VL backend + +Map one logical cell to one VLArray entry. + +Properties: + +- simplest implementation +- natural row-level replacement +- no internal buffer needed + +Implementation behavior: + +- `append(cell)` → `VLArray.append(cell)` +- `extend(cells)` → `VLArray.extend(cells)` +- `__getitem__(i)` → `VLArray[i]` +- `__setitem__(i, cell)` → `VLArray[i] = cell` + +This backend is the easiest one and should be implemented first to stabilize list semantics. + +## 4.2 Batch backend + +Map many logical cells to one persisted batch in `BatchArray`. + +Persisted representation for V1 msgpack path: + +- one batch = list of cells +- each cell = `None` or Python list of scalar items + +Arrow-serializer path, implemented after the msgpack path but within the same overall design: + +- one batch = Arrow array where each slot corresponds to one list cell +- type = `list` + +## 4.3 Internal append buffer + +`ListArray(storage="batch")` maintains: + +- persisted batches in a `BatchArray` +- a pending in-memory Python list of cells not yet flushed + +Suggested internal state: + +- `_store`: `BatchArray` +- `_pending_cells: list` +- `_batch_rows: int` +- mapping helpers derived from persisted batch lengths + +Append flow: + +- validate cell +- append to `_pending_cells` +- if `len(_pending_cells) >= batch_rows`, flush full batches + +Extend flow: + +- validate each cell +- fill pending buffer +- flush full batches as needed + +Flush flow: + +- write full `batch_rows` groups to `BatchArray.append(...)` +- keep any tail cells pending unless caller requested a full final flush +- on explicit final flush / close / save, write tail as last batch + +## 4.4 Logical indexing in batch backend + +`ListArray` must expose row-level indexing across: + +- persisted cells in `BatchArray` +- pending cells in `_pending_cells` + +This requires row → batch lookup for persisted rows. + +Suggested approach: + +- rely on `BatchArray`'s stored batch lengths and prefix sums for persisted portion +- append pending tail logically after persisted rows + +Point update behavior: + +- if target row is in pending cells: replace in memory +- if target row is persisted: load batch, replace cell, rewrite entire batch + +This should be supported but documented as more expensive than VL-backed updates. + +## 4.5 Flush semantics + +`ListArray.flush()` should: + +- write all pending cells, including the tail +- leave `_pending_cells` empty + +Automatic flushes should occur on: + +- buffer full +- explicit `flush()` +- `close()` / context-manager exit +- `CTable.save()` +- `CTable.to_arrow()` +- any persistence-sensitive operation that must see all rows on disk + +--- + +## Phase 5: `CTable` storage abstraction changes + +## 5.1 Broaden `TableStorage` + +Current `TableStorage.create_column()` / `open_column()` assume `NDArray`. Extend storage abstraction to support list columns. + +Recommended shape: + +- `create_scalar_column(...)` +- `open_scalar_column(...)` +- `create_list_column(...)` +- `open_list_column(...)` + +Alternative minimal path: + +- keep `create_column(...)` but branch on compiled spec kind + +Recommendation: use explicit separate methods if the diff stays manageable; it makes the abstraction cleaner. + +## 5.2 File-backed layout + +Current scalar layout is under: + +- `/_cols/` + +Keep that logical namespace for list columns too. + +Possible physical forms: + +- `/_cols/` points directly to the underlying backend object (`VLArray` or `BatchArray`), tagged so it reopens logically as `ListArray` +- `/_cols/` plus optional side metadata stored in schema + +For V1, prefer storing backend configuration in the schema metadata and keeping the on-disk object itself as the concrete backend container. + +## 5.3 In-memory layout + +In-memory `CTable` should keep list columns as live `ListArray` objects. + +No persistence is needed there beyond normal `save()` behavior. + +--- + +## Phase 6: `CTable` core changes + +## 6.1 Column creation/opening + +During table creation/open/open-from-schema: + +- scalar compiled columns create/open `NDArray` +- list compiled columns create/open `ListArray` + +`self._cols` may continue to map names to physical column objects, but code using `self._cols[name]` must stop assuming every entry is an `NDArray`. + +### 6.1.1 Physical length vs capacity + +Scalar columns remain capacity-based and grow with `_valid_rows`. + +List columns should instead be treated as append-sized physical stores: + +- their physical length tracks the number of written physical rows +- they are not preallocated out to `len(_valid_rows)` +- `_grow()` should continue to resize scalar columns and `_valid_rows`, but should not pad list columns with placeholder cells + +This means `CTable` internals must stop assuming every stored column has physical length equal to `len(_valid_rows)`. + +Logical row resolution should always go through physical row positions that are actually written. + +## 6.2 Row coercion path + +Current `_coerce_row_to_storage()` is scalar-only. Refactor it to branch on column kind. + +For scalar columns: + +- keep current NumPy scalar coercion + +For list columns: + +- validate and normalize through list-spec logic +- store Python list / `None` as-is for handoff to `ListArray` + +## 6.3 Append path + +Current `append()` loops over columns and assigns directly into scalar arrays. Refactor: + +- scalar column: `ndarray[pos] = scalar` +- list column: `list_array.append(cell)` + +Important invariants: + +- all stored columns must stay logically aligned by row index +- list columns append one physical cell per newly written physical row position +- `_last_pos` remains the source of truth for the next physical row id + +This means `append()` must ensure each list column receives exactly one new cell per appended row. + +### Write coordination / partial-failure safety + +This deserves explicit care because mixed scalar/list writes are no longer a single homogeneous NDArray operation. + +V1 should guarantee at least: + +- validate and normalize the full incoming row or batch before mutating storage +- mark `_valid_rows` only after all column writes for the row(s) succeed +- avoid leaving logically visible partial rows after a failure + +For list columns in particular, the implementation should prefer staging writes in memory where possible, or otherwise provide best-effort rollback for per-row appends that fail after some columns were already updated. + +## 6.4 Extend path + +Current `extend()` materializes column arrays and writes slices into NDArrays. For list columns, this should become backend-aware. + +Suggested behavior: + +- scalar columns: keep vectorized path +- list columns: collect Python list of cells and call `ListArray.extend(cells)` + +This may be less vectorized than the scalar path, which is acceptable for V1. + +## 6.5 Delete behavior + +No special physical delete support is required for V1. + +`CTable.delete()` already uses `_valid_rows`, so list columns can simply remain append-only physically while logical row deletion is controlled by the validity mask. + +## 6.6 Row reads + +`_Row.__getitem__` and column reads must support list columns. + +Expected behavior: + +- row access on a list column returns the corresponding list cell or `None` +- sliced column access on a list column returns a Python `list` of cells + +## 6.7 Column wrapper behavior + +Current `Column` logic is NumPy/NDArray-centric. For V1, update it carefully for list columns. + +Key points: + +- `Column.dtype` is not meaningful for list columns and should return `None` rather than a fake NumPy dtype +- many scalar comparison/aggregate methods should reject list columns cleanly +- `Column.__getitem__` must work for logical row indexing on list columns +- `Column.__setitem__` should support whole-cell replacement for list columns + +Recommendation: + +- branch internally on compiled spec kind +- do not try to make list columns mimic NumPy arrays + +## 6.8 String representation and info + +Update display and info methods so list columns show a logical type label like: + +- `list[string]` +- `list[int32]` + +For previews, show Python repr-style cells with truncation. + +Statistics in `describe()` for list columns can be omitted in V1 or show a message like: + +- `(stats not available for list columns)` + +--- + +## Phase 7: Arrow interoperability + +## 7.1 Keep Arrow optional + +`pyarrow` must remain optional. + +Rules: + +- importing / using list columns with msgpack default must not require `pyarrow` +- `serializer="arrow"` should raise a clear `ImportError` when `pyarrow` is absent +- `CTable.to_arrow()` / `from_arrow()` may already be optional-import based and should stay that way + +## 7.2 `CTable.to_arrow()` + +Extend `to_arrow()` so list columns export to Arrow list arrays. + +For msgpack-backed list columns: + +- materialize Python cells +- build `pa.array(values)` with appropriate list type when possible + +For Arrow-backed batch list columns later: + +- a faster path may reuse Arrow-native data more directly if feasible + +V1 success criterion: + +- correct Arrow output, even if implemented through Python materialization + +## 7.3 `CTable.from_arrow()` + +Extend Arrow import to recognize Arrow list fields and create `ListSpec` columns. + +Initial supported Arrow list cases for V1: + +- `list` +- possibly `large_list` if trivial to normalize +- optionally numeric item types if easy to map consistently + +Import policy: + +- create `b2.list(item_spec, storage="batch", serializer="msgpack")` by default unless caller later gains a way to override +- append row cells into `ListArray` + +The important part for V1 is round-tripping list columns through `CTable`. + +--- + +## Phase 8: Persistence and reopen behavior + +## 8.1 Schema metadata + +Persist list-specific metadata in the table schema. + +Each list column should carry at least: + +- `kind="list"` +- serialized `item_spec` +- `nullable` +- `storage` +- `serializer` +- `batch_rows` if relevant +- `items_per_block` if relevant + +## 8.2 Container reopen behavior + +When a table is reopened: + +- compiled schema reconstructs `ListSpec` +- storage layer opens the concrete list column backend +- `CTable` re-wraps it as `ListArray` + +For standalone use, persistent `ListArray` containers should also reopen as `ListArray` through the generic dispatch path. + +### 8.2.1 Layered metadata tagging + +`ListArray` should have its own explicit container tag in fixed metadata, while preserving the underlying backend tag. + +Examples: + +- batch-backed `ListArray`: + - `meta["batcharray"] = {...}` + - `meta["listarray"] = {...}` +- VL-backed `ListArray`: + - `meta["vlarray"] = {...}` + - `meta["listarray"] = {...}` + +This keeps both identities available: + +- physical identity: the underlying storage container (`BatchArray` or `VLArray`) +- logical identity: the object should reopen as `ListArray` + +Suggested `meta["listarray"]` payload: + +```json +{ + "version": 1, + "backend": "batch", + "serializer": "msgpack", + "nullable": false, + "item_spec": {"kind": "string", "max_length": 64}, + "batch_rows": 65536, + "items_per_block": 256 +} +``` + +The exact payload can be trimmed, but it should at least record: + +- format version +- backend kind +- serializer +- nullability +- serialized item spec +- backend-specific layout hints when relevant + +Recommendation: store this in `schunk.meta`, not only in `vlmeta`, because it defines the container kind used for reopen dispatch. + +### 8.2.2 Generic reopen dispatch + +`blosc2.open()` and `blosc2.from_cframe()` should prefer `ListArray` when `meta["listarray"]` is present. + +Suggested dispatch priority: + +1. `listarray` +2. `batcharray` +3. `vlarray` +4. existing fallback behavior + +This makes the generic open path return the logical container type rather than exposing the raw backend by default. + +Advanced users can still reach the lower-level backend explicitly if needed. + +### 8.2.3 `ListArray` reopen constructor + +`ListArray` should support an internal reopen hook such as: + +```python +ListArray(_from_schunk=schunk) +``` + +This path should: + +- validate the `listarray` tag +- validate consistency with the backend tag (`batcharray` or `vlarray`) +- reconstruct the correct backend wrapper +- return a row-oriented `ListArray` object + +## 8.3 Save/load and flush coordination + +Ensure all list columns are flushed before: + +- saving to disk +- serializing table data for export if needed +- closing persistent stores + +Add a helper on `CTable` such as an internal `_flush_varlen_columns()` used by: + +- `save()` +- `to_arrow()` +- close/discard paths + +--- + +## Phase 9: Testing plan + +Add focused tests rather than trying to cover the entire matrix at once. + +## 9.1 Schema/compiler tests + +New tests for: + +- `b2.list(...)` construction +- invalid storage/serializer combinations +- annotation matching for `list[str]` +- schema serialization/deserialization of `ListSpec` + +## 9.2 `ListArray` tests + +Standalone tests covering both backends: + +- append / extend +- `None` vs `[]` +- reject nullable items in V1 +- row reads +- slice reads +- whole-cell replacement +- negative indexing +- flush behavior, including `close()` / context-manager flush-on-exit +- reopen behavior for persistent stores +- `blosc2.open()` / `blosc2.from_cframe()` dispatch returning `ListArray` +- read-only handling where applicable + +Batch backend specific tests: + +- pending-buffer reads before flush +- automatic flush on buffer full +- update in pending region +- update in persisted region +- correct length across persisted + pending regions + +VL backend specific tests: + +- direct row replacement + +## 9.3 `CTable` tests + +Add `CTable` tests for: + +- schema with one list column +- schema with scalar + list columns mixed +- append row with list column +- extend rows with list column +- read rows back +- `head`, `tail`, `select` +- scalar-driven `where()` / view operations with list columns carried through correctly +- `compact()` with list columns +- row deletion via `_valid_rows` +- reopen persistent table +- `save()` / `load()` round-trip +- `to_arrow()` / `from_arrow()` with list columns + +## 9.4 Non-goal tests for V1 + +Do not add tests for unsupported features such as: + +- list column indexes +- sorting by list column +- computed expressions over lists +- nullable items inside lists + +Instead, add clear failure-path tests where appropriate. + +--- + +## Phase 10: Documentation plan + +Update documentation incrementally. + +## 10.1 Reference docs + +Add docs for: + +- `b2.list(...)` +- `ListArray` +- `CTable` list column support + +## 10.2 Tutorial/examples + +Add at least one example such as: + +- products with `ingredients: list[str]` + +Show: + +- schema declaration +- append / extend +- distinction between `None` and `[]` +- whole-cell replacement +- save/reopen +- Arrow export if `pyarrow` is installed + +## 10.3 Design notes in docs + +Explain briefly: + +- `cell` as a descriptive concept only +- batch vs VL backend tradeoffs +- why `msgpack` is the default +- why returned Python lists must be reassigned after mutation + +--- + +## Recommended implementation order + +To reduce risk, implement in this order: + +1. **Schema groundwork** + - add `ListSpec` + - add `b2.list(...)` + - update schema serialization and compiler + +2. **Standalone `ListArray` with VL backend first** + - easiest path to stabilize list semantics + - validates `None` vs `[]`, whole-cell replacement, persistence + - includes layered tagging and standalone reopen through `blosc2.open()` / `from_cframe()` + +3. **`CTable` integration for VL-backed list columns** + - proves scalar/list coexistence in compiler and core table paths + +4. **Batch-backed `ListArray` with pending buffer** + - implement `batch_rows` buffering + - add flush semantics and persisted/pending indexing + +5. **`CTable` integration for batch-backed list columns** + - update save/open/load and flush coordination + +6. **Arrow import/export support for list columns** + - keep optional + - start with materialization-based path + +7. **Docs and broader test coverage** + +This staged rollout makes it easier to separate: + +- logical list semantics +- `CTable` schema/compiler changes +- batch buffering complexity + +--- + +## Practical code touch points + +Expected Python files to update or add: + +### New files + +- `src/blosc2/list_array.py` +- `tests/test_list_array.py` +- `tests/test_ctable_varlen.py` or equivalent +- `doc/reference/list_array.rst` or a combined varlen/list reference page +- example file under `examples/ctable/` + +### Existing files likely to change + +- `src/blosc2/__init__.py` +- `src/blosc2/schema.py` +- `src/blosc2/schema_compiler.py` +- `src/blosc2/ctable.py` +- `src/blosc2/ctable_storage.py` +- likely `src/blosc2/schema_validation.py` +- likely `src/blosc2/schema_vectorized.py` +- `src/blosc2/schunk.py` for standalone reopen dispatch +- `src/blosc2/core.py` for generic `open()` / `from_cframe()` dispatch for `ListArray` +- docs under `doc/reference/ctable.rst` and related tutorial pages + +--- + +## Open follow-up items after V1 + +These are intentionally postponed, not rejected: + +- `item_nullable=True` +- nested list-of-list +- struct / map item types +- list-aware query predicates +- indexing for membership tests +- sorting/grouping semantics for list columns +- optimized Arrow-native batch representation +- convenience APIs for explicitly opening the raw backend (`VLArray` / `BatchArray`) behind a `ListArray` when advanced users want to bypass the logical container + +--- + +## Short rationale for the chosen defaults + +### Why `b2.list(...)` + +It reads naturally next to `list[str]` annotations and fits the rest of the schema-builder API. + +### Why `ListArray` should be first-class + +It gives users a clear row-oriented abstraction for list-valued data that is useful both on its own and as the natural physical model for list columns inside `CTable`, while still preserving `VLArray` and `BatchArray` as lower-level containers. + +### Why `msgpack` default + +It avoids a hard `pyarrow` dependency and works uniformly with both `VLArray` and `BatchArray`. + +### Why `storage="batch"` default + +It better matches append/scan-oriented table workloads and Parquet-like usage, while `VLArray` remains available for update-heavy cases. + +### Why whole-cell replacement only + +It keeps behavior explicit and avoids surprising write-through mutation of returned Python lists. + +### Why `None` vs `[]` + +The distinction is valuable and common, while item-level nullability can be added later without breaking the model. + +--- + +## Outcome expected from V1 + +After this work: + +- `CTable` should be able to host list columns in a way that is: + - typed at the schema level + - persistent + - row-addressable + - backend-tunable + - install-light by default + - compatible with optional Arrow export/import + +- `ListArray` should stand as a public reusable container for row-oriented list-valued data. + +This should happen without compromising the current scalar-column fast path or displacing `VLArray` / `BatchArray` from their lower-level roles. From e100d76fcbeff4c1254d80d1f874d6dba3118d99 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 09:12:08 +0200 Subject: [PATCH 2/6] First implementation for varlen cols in CTable --- doc/reference/classes.rst | 2 + doc/reference/ctable.rst | 31 +- doc/reference/list_array.rst | 78 ++++ examples/ctable/varlen_columns.py | 38 ++ src/blosc2/__init__.py | 4 + src/blosc2/core.py | 10 +- src/blosc2/ctable.py | 343 ++++++++++---- src/blosc2/ctable_storage.py | 71 ++- src/blosc2/list_array.py | 480 ++++++++++++++++++++ src/blosc2/schema.py | 102 +++++ src/blosc2/schema_compiler.py | 57 ++- src/blosc2/schema_validation.py | 24 +- src/blosc2/schema_vectorized.py | 7 + src/blosc2/schunk.py | 5 + tests/ctable/test_varlen_columns.py | 82 ++++ tests/ctable/test_varlen_schema_compiler.py | 48 ++ tests/test_list_array.py | 76 ++++ 17 files changed, 1346 insertions(+), 112 deletions(-) create mode 100644 doc/reference/list_array.rst create mode 100644 examples/ctable/varlen_columns.py create mode 100644 src/blosc2/list_array.py create mode 100644 tests/ctable/test_varlen_columns.py create mode 100644 tests/ctable/test_varlen_schema_compiler.py create mode 100644 tests/test_list_array.py diff --git a/doc/reference/classes.rst b/doc/reference/classes.rst index 39e22b6a..933da9ef 100644 --- a/doc/reference/classes.rst +++ b/doc/reference/classes.rst @@ -15,6 +15,7 @@ Main Classes C2Array Array BatchArray + ListArray VLArray SChunk DictStore @@ -41,6 +42,7 @@ Main Classes tree_store embed_store batch_array + list_array vlarray ndfield ref diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 0cca3749..8082f87e 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -3,10 +3,12 @@ CTable ====== -A columnar compressed table backed by one :class:`~blosc2.NDArray` per column. -Each column is stored, compressed, and queried independently; rows are never -materialised in their entirety unless you explicitly call :meth:`~blosc2.CTable.to_arrow` -or iterate with :meth:`~blosc2.CTable.__iter__`. +A columnar compressed table backed by one physical container per column. +Scalar columns use :class:`~blosc2.NDArray`; list-valued columns use +:class:`~blosc2.ListArray`. Each column is stored, compressed, and queried +independently; rows are never materialised in their entirety unless you +explicitly call :meth:`~blosc2.CTable.to_arrow` or iterate with +:meth:`~blosc2.CTable.__iter__`. .. currentmodule:: blosc2 @@ -361,6 +363,27 @@ Text & binary string bytes + list .. autoclass:: string .. autoclass:: bytes +.. autofunction:: list + +List columns +------------ + +List columns are declared with :func:`blosc2.list`, for example:: + + from dataclasses import dataclass + import blosc2 as b2 + + @dataclass + class Product: + code: str = b2.field(b2.string(max_length=8)) + tags: list[str] = b2.field(b2.list(b2.string(), nullable=True)) + +Whole-cell replacement is supported, so users should reassign modified lists:: + + row_tags = table.tags[0] + row_tags.append("extra") # local Python list only + table.tags[0] = row_tags # explicit write-back diff --git a/doc/reference/list_array.rst b/doc/reference/list_array.rst new file mode 100644 index 00000000..1b2c9f5d --- /dev/null +++ b/doc/reference/list_array.rst @@ -0,0 +1,78 @@ +.. _ListArray: + +ListArray +========= + +Overview +-------- +ListArray is a row-oriented container for variable-length list cells. +It is the natural public container for list-valued :class:`blosc2.CTable` +columns, but it is also useful on its own whenever you want typed, +row-addressable list data. + +Internally, ListArray uses one of two lower-level backends: + +- :class:`blosc2.BatchArray` for append/scan-oriented workloads +- :class:`blosc2.VLArray` for simpler row-level replacement semantics + +Quick example +------------- + +.. code-block:: python + + import blosc2 + + arr = blosc2.ListArray( + item_spec=blosc2.string(max_length=16), + nullable=True, + storage="batch", + urlpath="ingredients.b2b", + mode="w", + ) + arr.append(["salt", "sugar"]) + arr.append([]) + arr.append(None) + + print(arr[0]) + print(arr[1:]) + + reopened = blosc2.open("ingredients.b2b", mode="r") + print(type(reopened).__name__) + +.. note:: + Returned Python lists are detached values. Mutating them locally does not + write back to the container; reassign the whole cell instead. + +.. currentmodule:: blosc2 + +.. autoclass:: ListArray + + Constructors + ------------ + .. automethod:: __init__ + .. automethod:: from_arrow + + Row Interface + ------------- + .. automethod:: __getitem__ + .. automethod:: __setitem__ + .. automethod:: __len__ + .. automethod:: __iter__ + + Mutation + -------- + .. automethod:: append + .. automethod:: extend + .. automethod:: flush + .. automethod:: copy + .. automethod:: close + + Context Manager + --------------- + .. automethod:: __enter__ + .. automethod:: __exit__ + + Public Members + -------------- + .. automethod:: to_arrow + .. automethod:: to_cframe diff --git a/examples/ctable/varlen_columns.py b/examples/ctable/varlen_columns.py new file mode 100644 index 00000000..1a666133 --- /dev/null +++ b/examples/ctable/varlen_columns.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import blosc2 as b2 + + +@dataclass +class Product: + code: str = b2.field(b2.string(max_length=8)) + ingredients: list[str] = b2.field( # noqa: RUF009 + b2.list(b2.string(max_length=16), nullable=True, batch_rows=2) + ) + + +products = b2.CTable(Product) +products.append(("A1", ["salt", "water"])) +products.append(("B2", [])) +products.append(("C3", None)) +products.extend( + [ + ("D4", ["flour", "oil"]), + ("E5", ["cocoa"]), + ] +) + +print("ingredients:", products.ingredients[:]) +print("tail:", products.tail(2).ingredients[:]) + +# Whole-cell replacement is explicit. +ing = products.ingredients[0] +ing.append("pepper") +products.ingredients[0] = ing +print("updated first row:", products.ingredients[0]) + +standalone = b2.ListArray(item_spec=b2.string(max_length=16), nullable=True) +standalone.extend([["a", "b"], [], None]) +print("standalone list array:", standalone[:]) diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index b28cb218..aaa818fe 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -565,6 +565,7 @@ def _raise(exc): from .dict_store import DictStore from .tree_store import TreeStore from .batch_array import Batch, BatchArray +from .list_array import ListArray from .vlarray import VLArray, vlarray_from_cframe from .ref import Ref from .b2objects import open_b2object @@ -744,6 +745,7 @@ def _raise(exc): int16, int32, int64, + list, string, uint8, uint16, @@ -782,6 +784,7 @@ def _raise(exc): "int16", "int32", "int64", + "list", "string", "uint8", "uint16", @@ -804,6 +807,7 @@ def _raise(exc): "DSLSyntaxError", "LazyExpr", "LazyUDF", + "ListArray", "NDArray", "NDField", "Operand", diff --git a/src/blosc2/core.py b/src/blosc2/core.py index ceb78acd..ffe7b7da 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1922,7 +1922,13 @@ def ndarray_from_cframe(cframe: bytes | str, copy: bool = False) -> blosc2.NDArr def from_cframe( cframe: bytes | str, copy: bool = True ) -> ( - blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchArray | blosc2.VLArray | blosc2.C2Array + blosc2.EmbedStore + | blosc2.NDArray + | blosc2.SChunk + | blosc2.ListArray + | blosc2.BatchArray + | blosc2.VLArray + | blosc2.C2Array ): """Create a :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk `, :ref:`BatchArray ` or :ref:`VLArray ` instance @@ -1957,6 +1963,8 @@ def from_cframe( # Check the metalayer to determine the type if "b2embed" in schunk.meta: return blosc2.estore_from_cframe(cframe, copy=copy) + if "listarray" in schunk.meta: + return blosc2.ListArray(_from_schunk=schunk_from_cframe(cframe, copy=copy)) if "batcharray" in schunk.meta: return blosc2.BatchArray(_from_schunk=schunk_from_cframe(cframe, copy=copy)) if "vlarray" in schunk.meta: diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index f6f11879..ef52d284 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -28,6 +28,7 @@ from blosc2 import compute_chunks_blocks from blosc2.ctable_storage import FileTableStorage, InMemoryTableStorage, TableStorage +from blosc2.list_array import ListArray, coerce_list_cell from blosc2.schema_compiler import schema_from_dict, schema_to_dict try: @@ -45,6 +46,7 @@ def wrapper(*args, **kwargs): import blosc2 from blosc2.info import InfoReporter, format_nbytes_info from blosc2.schema import ( + ListSpec, SchemaSpec, complex64, complex128, @@ -578,6 +580,11 @@ def is_computed(self) -> bool: """True if this column is a virtual computed column (read-only).""" return self._col_name in self._table._computed_cols + @property + def is_list(self) -> bool: + col = self._table._schema.columns_by_name.get(self._col_name) + return col is not None and isinstance(col.spec, ListSpec) + @property def _valid_rows(self): if self._mask is None: @@ -597,7 +604,7 @@ def __getitem__(self, key: int | slice | list | np.ndarray): """ return self._values_from_key(key) - def _values_from_key(self, key): + def _values_from_key(self, key): # noqa: C901 """Materialise values for a logical index key.""" if isinstance(key, int): n_rows = len(self) @@ -613,12 +620,14 @@ def _values_from_key(self, key): real_pos = blosc2.where(valid, _arange(len(valid))).compute() start, stop, step = key.indices(len(real_pos)) if start >= stop: - return np.array([], dtype=self.dtype) + return [] if self.is_list else np.array([], dtype=self.dtype) selected_pos = real_pos[start:stop:step] # physical row positions if self.is_computed: lo, hi = int(selected_pos.min()), int(selected_pos.max()) chunk = np.asarray(self._raw_col[lo : hi + 1]) return chunk[selected_pos - lo] + if self.is_list: + return [self._raw_col[int(pos)] for pos in selected_pos] return np.asarray(self._raw_col[selected_pos]) elif isinstance(key, np.ndarray) and key.dtype == np.bool_: @@ -632,6 +641,8 @@ def _values_from_key(self, key): if self.is_computed: raw_np = np.asarray(self._raw_col[:]) return raw_np[phys_indices] + if self.is_list: + return [self._raw_col[int(pos)] for pos in phys_indices] return self._raw_col[phys_indices] elif isinstance(key, (list, tuple, np.ndarray)): @@ -640,6 +651,8 @@ def _values_from_key(self, key): if self.is_computed: raw_np = np.asarray(self._raw_col[:]) return raw_np[phys_indices] + if self.is_list: + return [self._raw_col[int(pos)] for pos in phys_indices] return self._raw_col[phys_indices] raise TypeError(f"Invalid index type: {type(key)}") @@ -708,7 +721,7 @@ def view(self) -> ColumnViewIndexer: """ return ColumnViewIndexer(self) - def __setitem__(self, key: int | slice | list | np.ndarray, value): + def __setitem__(self, key: int | slice | list | np.ndarray, value): # noqa: C901 if self._table._read_only: raise ValueError("Table is read-only (opened with mode='r').") if self.is_computed: @@ -723,7 +736,6 @@ def __setitem__(self, key: int | slice | list | np.ndarray, value): self._raw_col[int(pos_true)] = value elif isinstance(key, np.ndarray) and key.dtype == np.bool_: - # Boolean mask in logical space. n_live = len(self) if len(key) != n_live: raise IndexError( @@ -731,9 +743,15 @@ def __setitem__(self, key: int | slice | list | np.ndarray, value): ) all_pos = np.where(self._valid_rows[:])[0] phys_indices = all_pos[key] - if isinstance(value, (list, tuple)): - value = np.array(value, dtype=self._raw_col.dtype) - self._raw_col[phys_indices] = value + if self.is_list: + if len(value) != len(phys_indices): + raise ValueError("Length mismatch in list-column assignment") + for pos, cell in zip(phys_indices, value, strict=True): + self._raw_col[int(pos)] = cell + else: + if isinstance(value, (list, tuple)): + value = np.array(value, dtype=self._raw_col.dtype) + self._raw_col[phys_indices] = value elif isinstance(key, (slice, list, tuple, np.ndarray)): real_pos = blosc2.where(self._valid_rows, _arange(len(self._valid_rows))).compute() @@ -743,9 +761,15 @@ def __setitem__(self, key: int | slice | list | np.ndarray, value): else: phys_indices = np.array([real_pos[i] for i in key], dtype=np.int64) - if isinstance(value, (list, tuple)): - value = np.array(value, dtype=self._raw_col.dtype) - self._raw_col[phys_indices] = value + if self.is_list: + if len(value) != len(phys_indices): + raise ValueError("Length mismatch in list-column assignment") + for pos, cell in zip(phys_indices, value, strict=True): + self._raw_col[int(pos)] = cell + else: + if isinstance(value, (list, tuple)): + value = np.array(value, dtype=self._raw_col.dtype) + self._raw_col[phys_indices] = value else: raise TypeError(f"Invalid index type: {type(key)}") @@ -755,6 +779,10 @@ def __iter__(self): if self.is_computed: yield from self._iter_chunks_computed(size=None) return + if self.is_list: + for pos in np.where(self._valid_rows[:])[0]: + yield self._raw_col[int(pos)] + return arr = self._valid_rows chunk_size = arr.chunks[0] @@ -814,7 +842,7 @@ def __ge__(self, other): @property def dtype(self): - return self._raw_col.dtype + return getattr(self._raw_col, "dtype", None) def iter_chunks(self, size: int = 65536): """Iterate over live column values in chunks of *size* rows. @@ -840,6 +868,8 @@ def iter_chunks(self, size: int = 65536): if self.is_computed: yield from self._iter_chunks_computed(size=size) return + if self.is_list: + raise TypeError("Column.iter_chunks() is not supported for list columns in V1.") valid = self._valid_rows raw = self._raw_col arr_len = len(valid) @@ -950,6 +980,15 @@ def assign(self, data) -> None: raise ValueError("Table is read-only (opened with mode='r').") if self.is_computed: raise ValueError(f"Column {self._col_name!r} is a computed column and cannot be written to.") + if self.is_list: + values = list(data) + if len(values) != len(self): + raise ValueError(f"assign() requires {len(self)} values (live rows), got {len(values)}.") + live_pos = np.where(self._valid_rows[:])[0] + for pos, cell in zip(live_pos, values, strict=True): + self._raw_col[int(pos)] = cell + self._table._root_table._mark_all_indexes_stale() + return n_live = len(self) arr = np.asarray(data) if len(arr) != n_live: @@ -1286,7 +1325,7 @@ def __init__( self._validate = validate self._table_cparams = cparams self._table_dparams = dparams - self._cols: dict[str, blosc2.NDArray] = {} + self._cols: dict[str, blosc2.NDArray | ListArray] = {} self._computed_cols: dict[str, dict] = {} # virtual/computed columns self._materialized_cols: dict[str, dict] = {} # stored columns auto-filled from expressions self._expr_index_arrays: dict[str, blosc2.NDArray] = {} @@ -1326,9 +1365,12 @@ def __init__( self.col_names = [c["name"] for c in schema_dict["columns"]] self._valid_rows = storage.open_valid_rows() for name in self.col_names: - col = storage.open_column(name) - self._cols[name] = col cc = self._schema.columns_by_name[name] + if self._is_list_column(cc): + col = storage.open_list_column(name) + else: + col = storage.open_column(name) + self._cols[name] = col self._col_widths[name] = max(len(name), cc.display_width) self._n_rows = int(blosc2.count_nonzero(self._valid_rows)) self._last_pos = None # resolve lazily on first write @@ -1366,6 +1408,8 @@ def __init__( def close(self) -> None: """Close any persistent backing store held by this table.""" + with contextlib.suppress(Exception): + self._flush_varlen_columns() storage = getattr(self, "_storage", None) if storage is not None and hasattr(storage, "close"): storage.close() @@ -1385,14 +1429,35 @@ def __del__(self): elif storage is not None and hasattr(storage, "close"): storage.close() + @staticmethod + def _is_list_column(col: CompiledColumn) -> bool: + return isinstance(col.spec, ListSpec) + + @staticmethod + def _is_list_spec(spec: SchemaSpec) -> bool: + return isinstance(spec, ListSpec) + + def _flush_varlen_columns(self) -> None: + for col in self._schema.columns: + if self._is_list_column(col): + self._cols[col.name].flush() + def _init_columns( self, expected_size: int, default_chunks, default_blocks, storage: TableStorage ) -> None: - """Create one NDArray per column using the compiled schema.""" + """Create one physical column per compiled schema column.""" for col in self._schema.columns: self.col_names.append(col.name) self._col_widths[col.name] = max(len(col.name), col.display_width) col_storage = self._resolve_column_storage(col, default_chunks, default_blocks) + if self._is_list_column(col): + self._cols[col.name] = storage.create_list_column( + col.name, + spec=col.spec, + cparams=col_storage.get("cparams"), + dparams=col_storage.get("dparams"), + ) + continue self._cols[col.name] = storage.create_column( col.name, dtype=col.dtype, @@ -1442,17 +1507,22 @@ def _normalize_row_input(self, data: Any) -> dict[str, Any]: return dict(zip(stored, data, strict=False)) if dataclasses.is_dataclass(data) and not isinstance(data, type): return dataclasses.asdict(data) + if isinstance(data, _Row): + return {name: data[name] for name in stored} if isinstance(data, (np.void, np.record)): return {name: data[name] for name in stored} # Fallback: try positional indexing return {name: data[i] for i, name in enumerate(stored)} def _coerce_row_to_storage(self, row: dict[str, Any]) -> dict[str, Any]: - """Coerce each value in *row* to the column's storage dtype.""" + """Coerce each value in *row* to the column's storage representation.""" result = {} for col in self._schema.columns: val = row[col.name] - result[col.name] = np.array(val, dtype=col.dtype).item() + if self._is_list_column(col): + result[col.name] = coerce_list_cell(col.spec, val) + else: + result[col.name] = np.array(val, dtype=col.dtype).item() return result def _resolve_last_pos(self) -> int: @@ -1494,9 +1564,11 @@ def _resolve_last_pos(self) -> int: return self._last_pos def _grow(self) -> None: - """Double the physical capacity of all columns and the valid_rows mask.""" + """Double the scalar-column capacity and the valid_rows mask.""" c = len(self._valid_rows) - for col_arr in self._cols.values(): + for name, col_arr in self._cols.items(): + if self._is_list_column(self._schema.columns_by_name[name]): + continue col_arr.resize((c * 2,)) self._valid_rows.resize((c * 2,)) @@ -1526,10 +1598,9 @@ def __str__(self) -> str: # -- per-column display widths -- widths: dict[str, int] = {} for name in self.col_names: - widths[name] = max( - self._col_widths[name], - len(str(self._col_dtype(name))), - ) + spec = self._schema.columns_by_name.get(name) + dtype_label = self._dtype_info_label(self._col_dtype(name), spec.spec if spec else None) + widths[name] = max(self._col_widths[name], len(dtype_label)) sep = " ".join("─" * (w + 2) for w in widths.values()) @@ -1547,11 +1618,26 @@ def rows_to_dicts(positions) -> list[dict]: if len(positions) == 0: return [] col_data = {n: self._fetch_col_at_positions(n, positions) for n in self.col_names} - return [{n: col_data[n][i].item() for n in self.col_names} for i in range(len(positions))] + rows = [] + for i in range(len(positions)): + row = {} + for n in self.col_names: + value = col_data[n][i] + row[n] = value.item() if isinstance(value, np.generic) else value + rows.append(row) + return rows lines = [ fmt_row({n: n for n in self.col_names}), - fmt_row({n: str(self._col_dtype(n)) for n in self.col_names}), + fmt_row( + { + n: self._dtype_info_label( + self._col_dtype(n), + self._schema.columns_by_name[n].spec if n in self._schema.columns_by_name else None, + ) + for n in self.col_names + } + ), sep, ] @@ -1632,8 +1718,11 @@ def open(cls, urlpath: str, *, mode: str = "r") -> CTable: obj._valid_rows = storage.open_valid_rows() for name in col_names: - obj._cols[name] = storage.open_column(name) cc = schema.columns_by_name[name] + if obj._is_list_column(cc): + obj._cols[name] = storage.open_list_column(name) + else: + obj._cols[name] = storage.open_column(name) obj._col_widths[name] = max(len(name), cc.display_width) obj._n_rows = int(blosc2.count_nonzero(obj._valid_rows)) @@ -1676,6 +1765,8 @@ def save(self, urlpath: str, *, overwrite: bool = False) -> None: else: os.remove(target_path) + self._flush_varlen_columns() + # Collect live physical positions valid_np = self._valid_rows[:] live_pos = np.where(valid_np)[0] @@ -1696,8 +1787,17 @@ def save(self, urlpath: str, *, overwrite: bool = False) -> None: # --- columns --- for col in self._schema.columns: name = col.name - # Use dtype-aware defaults so large-itemsize columns (e.g. U4096) get - # sensible chunk/block sizes rather than the uint8-based defaults. + if self._is_list_column(col): + disk_col = file_storage.create_list_column( + name, + spec=col.spec, + cparams=col.config.cparams if col.config.cparams is not None else self._table_cparams, + dparams=col.config.dparams if col.config.dparams is not None else self._table_dparams, + ) + if n_live > 0: + disk_col.extend(self._cols[name][int(pos)] for pos in live_pos) + disk_col.flush() + continue dtype_chunks, dtype_blocks = compute_chunks_blocks((capacity,), dtype=col.dtype) col_storage = self._resolve_column_storage(col, dtype_chunks, dtype_blocks) disk_col = file_storage.create_column( @@ -1744,7 +1844,12 @@ def load(cls, urlpath: str) -> CTable: col_names = [c["name"] for c in schema_dict["columns"]] disk_valid = file_storage.open_valid_rows() - disk_cols = {name: file_storage.open_column(name) for name in col_names} + disk_cols = {} + for col in schema.columns: + if cls._is_list_column(col): + disk_cols[col.name] = file_storage.open_list_column(col.name) + else: + disk_cols[col.name] = file_storage.open_column(col.name) phys_size = len(disk_valid) n_live = int(blosc2.count_nonzero(disk_valid)) capacity = max(phys_size, 1) @@ -1760,9 +1865,15 @@ def load(cls, urlpath: str) -> CTable: if phys_size > 0: mem_valid[:phys_size] = disk_valid[:] - mem_cols: dict[str, blosc2.NDArray] = {} + mem_cols: dict[str, blosc2.NDArray | ListArray] = {} for col in schema.columns: name = col.name + if cls._is_list_column(col): + mem_col = mem_storage.create_list_column(name, spec=col.spec, cparams=None, dparams=None) + mem_col.extend(disk_cols[name][:]) + mem_col.flush() + mem_cols[name] = mem_col + continue col_chunks, col_blocks = compute_chunks_blocks((capacity,), dtype=col.dtype) mem_col = mem_storage.create_column( name, @@ -2000,7 +2111,9 @@ def describe(self) -> None: for name in self.col_names: col = self[name] dtype = col.dtype - lines.append(f" {name} [{dtype}]") + spec = self._schema.columns_by_name.get(name) + label = self._dtype_info_label(dtype, spec.spec if spec else None) + lines.append(f" {name} [{label}]") if n == 0: lines.append(" (empty)") @@ -2010,7 +2123,10 @@ def describe(self) -> None: nc = col.null_count() n_nonnull = n - nc - if dtype.kind in "biufc" and dtype.kind != "c": + if isinstance(spec.spec, ListSpec) if spec is not None else False: + lines.append(f" count : {n:,}") + lines.append(" (stats not available for list columns)") + elif dtype.kind in "biufc" and dtype.kind != "c": # numeric + bool if dtype.kind == "b": arr = col[:] @@ -2082,7 +2198,7 @@ def cov(self) -> np.ndarray: """ for name in self.col_names: dtype = self._col_dtype(name) - if not ( + if dtype is None or not ( np.issubdtype(dtype, np.integer) or np.issubdtype(dtype, np.floating) or dtype == np.bool_ ): raise TypeError( @@ -2146,12 +2262,14 @@ def to_arrow(self): "pyarrow is required for to_arrow(). Install it with: pip install pyarrow" ) from None + self._flush_varlen_columns() arrays = {} for name in self.col_names: col = self[name] + if col.is_list: + arrays[name] = self._cols[name].to_arrow() + continue arr = col[:] - # Only compute null mask when a sentinel is actually configured — - # avoids allocating a 1M-element zeros array for every non-nullable column. nv = col.null_value if nv is not None: null_mask = col._null_mask_for(arr) @@ -2229,6 +2347,14 @@ def _arrow_type_to_spec(pa_type, arrow_col): if pa_type == arrow_t: return spec_cls() + if pa.types.is_list(pa_type) or pa.types.is_large_list(pa_type): + py_values = arrow_col.to_pylist() + flat_values = [item for cell in py_values if cell is not None for item in cell] + item_arrow_col = pa.array(flat_values, type=pa_type.value_type) + item_spec = _arrow_type_to_spec(pa_type.value_type, item_arrow_col) + nullable = any(v is None for v in py_values) + return b2s.list(item_spec, nullable=nullable, storage="batch", serializer="msgpack") + # String types: determine max_length from the data if pa_type in (pa.string(), pa.large_string(), pa.utf8(), pa.large_utf8()): values = [v for v in arrow_col.to_pylist() if v is not None] @@ -2252,7 +2378,7 @@ def _arrow_type_to_spec(pa_type, arrow_col): name=name, py_type=spec.python_type, spec=spec, - dtype=spec.dtype, + dtype=getattr(spec, "dtype", None), default=MISSING, config=col_config, display_width=compute_display_width(spec), @@ -2275,17 +2401,22 @@ def _arrow_type_to_spec(pa_type, arrow_col): chunks=default_chunks, blocks=default_blocks, ) - new_cols: dict[str, blosc2.NDArray] = {} + new_cols: dict[str, blosc2.NDArray | ListArray] = {} for col in columns: - new_cols[col.name] = mem_storage.create_column( - col.name, - dtype=col.dtype, - shape=(capacity,), - chunks=default_chunks, - blocks=default_blocks, - cparams=None, - dparams=None, - ) + if cls._is_list_column(col): + new_cols[col.name] = mem_storage.create_list_column( + col.name, spec=col.spec, cparams=None, dparams=None + ) + else: + new_cols[col.name] = mem_storage.create_column( + col.name, + dtype=col.dtype, + shape=(capacity,), + chunks=default_chunks, + blocks=default_blocks, + cparams=None, + dparams=None, + ) obj = cls.__new__(cls) obj._row_type = None @@ -2315,11 +2446,15 @@ def _arrow_type_to_spec(pa_type, arrow_col): # fixed-width unicode coercion. All other types use zero-copy numpy. for col in columns: arrow_col = arrow_table.column(col.name) - if col.dtype.kind in "US": + if cls._is_list_column(col): + new_cols[col.name].extend(arrow_col.to_pylist()) + new_cols[col.name].flush() + elif col.dtype.kind in "US": arr = np.array(arrow_col.to_pylist(), dtype=col.dtype) + new_cols[col.name][:n] = arr else: arr = arrow_col.to_numpy(zero_copy_only=False).astype(col.dtype) - new_cols[col.name][:n] = arr + new_cols[col.name][:n] = arr new_valid[:n] = True obj._n_rows = n @@ -2735,12 +2870,12 @@ def computed_columns(self) -> dict[str, dict]: """ return dict(self._computed_cols) # shallow copy so callers can't mutate - def _col_dtype(self, name: str) -> np.dtype: + def _col_dtype(self, name: str) -> np.dtype | None: """Return the dtype for *name*, routing through computed cols.""" cc = self._computed_cols.get(name) if cc is not None: return cc["dtype"] - return self._cols[name].dtype + return getattr(self._cols[name], "dtype", None) @staticmethod def _readable_computed_expr(cc: dict) -> str: @@ -2758,18 +2893,21 @@ def _sub(m: re.Match) -> str: return re.sub(r"\bo(\d+)\b", _sub, cc["expression"]) - def _fetch_col_at_positions(self, name: str, positions: np.ndarray) -> np.ndarray: + def _fetch_col_at_positions(self, name: str, positions: np.ndarray): """Fetch values at *positions* (physical indices) — used for display.""" cc = self._computed_cols.get(name) if cc is not None: if len(positions) == 0: return np.array([], dtype=cc["dtype"]) - # Evaluate element-by-element for scattered display positions (max ~20). return np.array( [np.asarray(cc["lazy"][int(p)]).ravel()[0] for p in positions], dtype=cc["dtype"], ) - return self._cols[name][positions] + col = self._cols[name] + spec = self._schema.columns_by_name[name].spec + if self._is_list_spec(spec): + return [col[int(p)] for p in positions] + return col[positions] def _schema_dict_with_computed(self) -> dict: """Return the schema dict extended with computed/materialized metadata.""" @@ -3172,19 +3310,29 @@ def compact(self): raise ValueError("Table is read-only (opened with mode='r').") if self.base is not None: raise ValueError("Cannot compact a view.") + self._flush_varlen_columns() real_poss = blosc2.where(self._valid_rows, np.array(range(len(self._valid_rows)))).compute() - start = 0 - block_size = self._valid_rows.blocks[0] - end = min(block_size, self._n_rows) - while start < end: - for _k, v in self._cols.items(): + for col in self._schema.columns: + name = col.name + v = self._cols[name] + if self._is_list_column(col): + compacted = [v[int(pos)] for pos in real_poss[: self._n_rows]] + replacement = ListArray(spec=col.spec) + replacement.extend(compacted) + replacement.flush() + self._cols[name] = replacement + continue + start = 0 + block_size = self._valid_rows.blocks[0] + end = min(block_size, self._n_rows) + while start < end: v[start:end] = v[real_poss[start:end]] - start += block_size - end = min(end + block_size, self._n_rows) + start += block_size + end = min(end + block_size, self._n_rows) self._valid_rows[: self._n_rows] = True self._valid_rows[self._n_rows :] = False - self._last_pos = self._n_rows # next write goes right after live rows + self._last_pos = self._n_rows self._mark_all_indexes_stale() def _normalise_sort_keys( @@ -3205,6 +3353,10 @@ def _normalise_sort_keys( if name not in self._cols and name not in self._computed_cols: raise KeyError(f"No column named {name!r}. Available: {self.col_names}") dtype = self._col_dtype(name) + if dtype is None: + raise TypeError( + f"Column {name!r} is a list column and does not support sort ordering in V1." + ) if np.issubdtype(dtype, np.complexfloating): raise TypeError( f"Column {name!r} has complex dtype {dtype} which does not support ordering." @@ -3382,8 +3534,14 @@ def sort_by( else: # Build a new in-memory table with the sorted rows result = self._empty_copy() - for col_name, arr in self._cols.items(): - result._cols[col_name][:n] = arr[sorted_pos] + for col in self._schema.columns: + col_name = col.name + arr = self._cols[col_name] + if self._is_list_column(col): + result._cols[col_name].extend(arr[int(pos)] for pos in sorted_pos) + result._cols[col_name].flush() + else: + result._cols[col_name][:n] = arr[sorted_pos] result._valid_rows[:n] = True result._valid_rows[n:] = False result._n_rows = n @@ -3406,15 +3564,23 @@ def _empty_copy(self) -> CTable: new_cols = {} for col in self._schema.columns: col_storage = self._resolve_column_storage(col, default_chunks, default_blocks) - new_cols[col.name] = mem_storage.create_column( - col.name, - dtype=col.dtype, - shape=(capacity,), - chunks=col_storage["chunks"], - blocks=col_storage["blocks"], - cparams=col_storage.get("cparams"), - dparams=col_storage.get("dparams"), - ) + if self._is_list_column(col): + new_cols[col.name] = mem_storage.create_list_column( + col.name, + spec=col.spec, + cparams=col_storage.get("cparams"), + dparams=col_storage.get("dparams"), + ) + else: + new_cols[col.name] = mem_storage.create_column( + col.name, + dtype=col.dtype, + shape=(capacity,), + chunks=col_storage["chunks"], + blocks=col_storage["blocks"], + cparams=col_storage.get("cparams"), + dparams=col_storage.get("dparams"), + ) obj = CTable.__new__(CTable) obj._schema = self._schema @@ -3869,7 +4035,7 @@ def _build_index_persistent( _PERSISTENT_INDEXES.pop(proxy_key, None) # evict proxy to avoid memory leak return result - def create_index( + def create_index( # noqa: C901 self, col_name: str | None = None, *, @@ -3962,6 +4128,8 @@ def create_index( ) col_arr = self._cols[col_name] + if isinstance(self._schema.columns_by_name[col_name].spec, ListSpec): + raise ValueError(f"Cannot create an index on list column {col_name!r} in V1.") is_persistent = self._storage.index_anchor_path(col_name) is not None if is_persistent: @@ -4266,7 +4434,12 @@ def info_items(self) -> list[tuple[str, object]]: f"{cc['dtype']} (computed: {self._readable_computed_expr(cc)})" ) else: - schema_summary[name] = _InfoLiteral(self._dtype_info_label(self._cols[name].dtype)) + col_meta = self._schema.columns_by_name.get(name) + schema_summary[name] = _InfoLiteral( + self._dtype_info_label( + getattr(self._cols[name], "dtype", None), col_meta.spec if col_meta else None + ) + ) index_summary = {} for idx in self.indexes: @@ -4304,8 +4477,12 @@ def info_items(self) -> list[tuple[str, object]]: return items @staticmethod - def _dtype_info_label(dtype: np.dtype) -> str: + def _dtype_info_label(dtype: np.dtype | None, spec: SchemaSpec | None = None) -> str: """Return a compact dtype label for info reports.""" + if isinstance(spec, ListSpec): + return spec.display_label() + if dtype is None: + return "None" if dtype.kind == "U": nchars = dtype.itemsize // 4 return f"U{nchars} (Unicode, max {nchars} chars)" @@ -4366,8 +4543,13 @@ def append(self, data: list | np.void | np.ndarray) -> None: if pos >= len(self._valid_rows): self._grow() - for name, col_array in self._cols.items(): - col_array[pos] = row[name] + for col in self._schema.columns: + name = col.name + col_array = self._cols[name] + if self._is_list_column(col): + col_array.append(row[name]) + else: + col_array[pos] = row[name] self._valid_rows[pos] = True self._last_pos = pos + 1 @@ -4396,7 +4578,7 @@ def delete(self, ind: int | slice | str | Iterable) -> None: self._last_pos = None # recalculate on next write self._storage.bump_visibility_epoch() - def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> None: + def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> None: # noqa: C901 if self._read_only: raise ValueError("Table is read-only (opened with mode='r').") if self.base is not None: @@ -4407,6 +4589,11 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> # Resolve effective validate flag: per-call override takes precedence do_validate = self._validate if validate is None else validate + if any(self._is_list_column(col) for col in self._schema.columns): + for row in data: + self.append(row) + return + start_pos = self._resolve_last_pos() current_col_names = self._stored_col_names # skip computed columns diff --git a/src/blosc2/ctable_storage.py b/src/blosc2/ctable_storage.py index 54efa59b..e55033ad 100644 --- a/src/blosc2/ctable_storage.py +++ b/src/blosc2/ctable_storage.py @@ -22,11 +22,15 @@ import copy import json import os -from typing import Any +from typing import TYPE_CHECKING, Any import numpy as np import blosc2 +from blosc2.list_array import ListArray + +if TYPE_CHECKING: + from blosc2.schema import ListSpec # Directory inside the table root that holds per-column index sidecar files. _INDEXES_DIR = "_indexes" @@ -56,6 +60,19 @@ def create_column( def open_column(self, name: str) -> blosc2.NDArray: raise NotImplementedError + def create_list_column( + self, + name: str, + *, + spec: ListSpec, + cparams: dict[str, Any] | None, + dparams: dict[str, Any] | None, + ) -> ListArray: + raise NotImplementedError + + def open_list_column(self, name: str) -> ListArray: + raise NotImplementedError + def create_valid_rows( self, *, @@ -151,6 +168,17 @@ def create_column(self, name, *, dtype, shape, chunks, blocks, cparams, dparams) def open_column(self, name): raise RuntimeError("In-memory tables have no on-disk representation to open.") + def create_list_column(self, name, *, spec, cparams, dparams): + kwargs = {} + if cparams is not None: + kwargs["cparams"] = cparams + if dparams is not None: + kwargs["dparams"] = dparams + return ListArray(spec=spec, **kwargs) + + def open_list_column(self, name): + raise RuntimeError("In-memory tables have no on-disk representation to open.") + def create_valid_rows(self, *, shape, chunks, blocks): return blosc2.zeros(shape, dtype=np.bool_, chunks=chunks, blocks=blocks) @@ -249,6 +277,10 @@ def _valid_rows_path(self) -> str: def _col_path(self, name: str) -> str: return self._key_to_path(self._col_key(name)) + def _list_col_path(self, name: str) -> str: + rel_key = self._col_key(name).lstrip("/") + return os.path.join(self._root, rel_key + ".b2b") + def _col_key(self, name: str) -> str: return f"/{_COLS_DIR}/{name}" @@ -299,6 +331,17 @@ def create_column(self, name, *, dtype, shape, chunks, blocks, cparams, dparams) def open_column(self, name: str) -> blosc2.NDArray: return self._open_store()[self._col_key(name)] + def create_list_column(self, name, *, spec, cparams, dparams): + kwargs: dict[str, Any] = {"urlpath": self._list_col_path(name), "mode": "w", "contiguous": True} + if cparams is not None: + kwargs["cparams"] = cparams + if dparams is not None: + kwargs["dparams"] = dparams + return ListArray(spec=spec, **kwargs) + + def open_list_column(self, name: str) -> ListArray: + return blosc2.open(self._list_col_path(name), mode=self._mode) + def create_valid_rows(self, *, shape, chunks, blocks): valid_rows = blosc2.zeros( shape, @@ -358,15 +401,31 @@ def column_names_from_schema(self) -> list[str]: return [c["name"] for c in d["columns"]] def delete_column(self, name: str) -> None: - del self._open_store()[self._col_key(name)] + key = self._col_key(name) + if key in self._open_store(): + del self._open_store()[key] + return + list_path = self._list_col_path(name) + if os.path.exists(list_path): + blosc2.remove_urlpath(list_path) + return + raise KeyError(name) - def rename_column(self, old: str, new: str) -> blosc2.NDArray: + def rename_column(self, old: str, new: str): store = self._open_store() old_key = self._col_key(old) new_key = self._col_key(new) - store[new_key] = store[old_key] - del store[old_key] - return store[new_key] + if old_key in store: + store[new_key] = store[old_key] + del store[old_key] + return store[new_key] + old_path = self._list_col_path(old) + new_path = self._list_col_path(new) + if os.path.exists(old_path): + os.makedirs(os.path.dirname(new_path), exist_ok=True) + os.replace(old_path, new_path) + return blosc2.open(new_path, mode=self._mode) + raise KeyError(old) def close(self) -> None: if self._store is not None: diff --git a/src/blosc2/list_array.py b/src/blosc2/list_array.py new file mode 100644 index 00000000..a3d7d0d5 --- /dev/null +++ b/src/blosc2/list_array.py @@ -0,0 +1,480 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import copy +from bisect import bisect_right +from collections.abc import Iterable, Iterator +from functools import lru_cache +from typing import Any + +import numpy as np + +import blosc2 +from blosc2.batch_array import BatchArray +from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.schema import ListSpec, SchemaSpec +from blosc2.schema import list as list_spec_builder +from blosc2.vlarray import VLArray + +_SUPPORTED_SERIALIZERS = {"msgpack", "arrow"} +_SUPPORTED_STORAGES = {"batch", "vl"} + + +def _spec_label(spec: SchemaSpec) -> str: + if isinstance(spec, ListSpec): + return spec.display_label() + meta = spec.to_metadata_dict() + kind = meta.get("kind", type(spec).__name__) + if kind == "string": + return "string" + if kind == "bytes": + return "bytes" + return str(kind) + + +@lru_cache(maxsize=1) +def _require_pyarrow(): + try: + import pyarrow as pa + except ImportError as exc: + raise ImportError("ListArray serializer='arrow' requires pyarrow") from exc + return pa + + +def _validate_list_spec(spec: ListSpec) -> None: + if spec.storage not in _SUPPORTED_STORAGES: + raise ValueError(f"Unsupported list storage: {spec.storage!r}") + if spec.serializer not in _SUPPORTED_SERIALIZERS: + raise ValueError(f"Unsupported list serializer: {spec.serializer!r}") + if spec.storage == "vl" and spec.serializer != "msgpack": + raise ValueError("ListArray storage='vl' only supports serializer='msgpack'") + if spec.serializer == "arrow" and spec.storage != "batch": + raise ValueError("ListArray serializer='arrow' requires storage='batch'") + if isinstance(spec.item_spec, ListSpec): + raise TypeError("Nested list item specs are not supported in V1") + if spec.batch_rows is not None and spec.batch_rows <= 0: + raise ValueError("batch_rows must be a positive integer") + if spec.items_per_block is not None and spec.items_per_block <= 0: + raise ValueError("items_per_block must be a positive integer") + + +def _coerce_scalar_item(spec: SchemaSpec, value: Any) -> Any: + if value is None: + raise ValueError("ListArray does not support nullable items inside a list in V1") + + if getattr(spec, "python_type", None) is str: + if not isinstance(value, str): + value = str(value) + elif getattr(spec, "python_type", None) is bytes: + if isinstance(value, str): + value = value.encode() + elif not isinstance(value, (bytes, bytearray, memoryview)): + value = bytes(value) + value = bytes(value) + else: + dtype = getattr(spec, "dtype", None) + if dtype is None: + raise TypeError(f"Unsupported list item spec {type(spec).__name__!r}") + value = np.array(value, dtype=dtype).item() + + ge = getattr(spec, "ge", None) + if ge is not None and value < ge: + raise ValueError(f"List item {value!r} violates ge={ge}") + gt = getattr(spec, "gt", None) + if gt is not None and value <= gt: + raise ValueError(f"List item {value!r} violates gt={gt}") + le = getattr(spec, "le", None) + if le is not None and value > le: + raise ValueError(f"List item {value!r} violates le={le}") + lt = getattr(spec, "lt", None) + if lt is not None and value >= lt: + raise ValueError(f"List item {value!r} violates lt={lt}") + + max_length = getattr(spec, "max_length", None) + min_length = getattr(spec, "min_length", None) + if max_length is not None and len(value) > max_length: + raise ValueError(f"List item {value!r} exceeds max_length={max_length}") + if min_length is not None and len(value) < min_length: + raise ValueError(f"List item {value!r} is shorter than min_length={min_length}") + return value + + +def coerce_list_cell(spec: ListSpec, value: Any) -> list[Any] | None: + _validate_list_spec(spec) + if value is None: + if not spec.nullable: + raise ValueError("Null list cells are not allowed for this column") + return None + if isinstance(value, (str, bytes, bytearray, memoryview)): + raise TypeError("ListArray cells must be list-like, not strings or bytes") + if not isinstance(value, Iterable): + raise TypeError("ListArray cells must be list-like") + return [_coerce_scalar_item(spec.item_spec, item) for item in list(value)] + + +class ListArray: + """A row-oriented container for list-valued data. + + Backed internally by either :class:`blosc2.VLArray` or + :class:`blosc2.BatchArray`. + """ + + def __init__( + self, + spec: ListSpec | None = None, + *, + item_spec: SchemaSpec | None = None, + nullable: bool = False, + storage: str = "batch", + serializer: str = "msgpack", + batch_rows: int | None = None, + items_per_block: int | None = None, + _from_schunk=None, + **kwargs: Any, + ) -> None: + if _from_schunk is not None: + if spec is not None or item_spec is not None or kwargs: + raise ValueError("Cannot pass schema/storage arguments together with _from_schunk") + self._init_from_schunk(_from_schunk) + return + + if spec is None: + if item_spec is None: + raise ValueError("ListArray requires either spec=... or item_spec=...") + spec = list_spec_builder( + item_spec, + nullable=nullable, + storage=storage, + serializer=serializer, + batch_rows=batch_rows, + items_per_block=items_per_block, + ) + self.spec = spec + _validate_list_spec(self.spec) + self._pending_cells: list[list[Any] | None] = [] + self._persisted_row_count = 0 + + storage_obj = self._coerce_storage(kwargs) + fixed_meta = dict(storage_obj.meta or {}) + fixed_meta["listarray"] = self.spec.to_listarray_metadata() + storage_obj.meta = fixed_meta + + if self.spec.storage == "vl": + self._backend = VLArray(storage=storage_obj, **kwargs) + else: + self._backend = BatchArray( + storage=storage_obj, + serializer=self.spec.serializer, + items_per_block=self.spec.items_per_block, + **kwargs, + ) + self._persisted_row_count = self._persisted_rows_count() + + @staticmethod + def _coerce_storage(kwargs: dict[str, Any]) -> blosc2.Storage: + storage = kwargs.pop("storage", None) + if storage is None: + storage_kwargs = { + name: kwargs.pop(name) for name in list(blosc2.Storage.__annotations__) if name in kwargs + } + return blosc2.Storage(**storage_kwargs) + if isinstance(storage, blosc2.Storage): + return copy.deepcopy(storage) + return blosc2.Storage(**storage) + + def _init_from_schunk(self, schunk) -> None: + meta = schunk.meta + if "listarray" not in meta: + raise ValueError("The supplied SChunk is not tagged as a ListArray") + la_meta = meta["listarray"] + self.spec = ListSpec.from_metadata_dict(la_meta) + self._pending_cells = [] + if self.spec.storage == "vl": + if "vlarray" not in meta: + raise ValueError("ListArray metadata says backend='vl' but VLArray tag is missing") + self._backend = VLArray(_from_schunk=schunk) + self._persisted_row_count = len(self._backend) + else: + if "batcharray" not in meta: + raise ValueError("ListArray metadata says backend='batch' but BatchArray tag is missing") + self._backend = BatchArray(_from_schunk=schunk) + self._persisted_row_count = self._persisted_rows_count() + + def _persisted_rows_count(self) -> int: + if self.spec.storage == "vl": + return len(self._backend) + lengths = self._backend._load_or_compute_batch_lengths() + return int(sum(lengths)) + + def _persisted_prefix_sums(self) -> list[int]: + lengths = self._backend._load_or_compute_batch_lengths() + prefix = [0] + total = 0 + for length in lengths: + total += int(length) + prefix.append(total) + return prefix + + def _normalize_index(self, index: int) -> int: + if not isinstance(index, int): + raise TypeError("ListArray indices must be integers") + n = len(self) + if index < 0: + index += n + if index < 0 or index >= n: + raise IndexError("ListArray index out of range") + return index + + def _locate_persisted_row(self, row_index: int) -> tuple[int, int]: + prefix = self._persisted_prefix_sums() + batch_index = bisect_right(prefix, row_index) - 1 + inner_index = row_index - prefix[batch_index] + return batch_index, inner_index + + def _flush_full_batches(self) -> None: + if self.spec.storage != "batch": + return + batch_rows = self.batch_rows + if batch_rows is None: + return + while len(self._pending_cells) >= batch_rows: + batch = self._pending_cells[:batch_rows] + self._backend.append(batch) + self._pending_cells = self._pending_cells[batch_rows:] + self._persisted_row_count += len(batch) + + def append(self, value: Any) -> int: + cell = coerce_list_cell(self.spec, value) + if self.spec.storage == "vl": + self._backend.append(cell) + self._persisted_row_count = len(self._backend) + return len(self) + self._pending_cells.append(cell) + self._flush_full_batches() + return len(self) + + def extend(self, values: Iterable[Any]) -> None: + if self.spec.storage == "vl": + self._backend.extend(coerce_list_cell(self.spec, value) for value in values) + self._persisted_row_count = len(self._backend) + return + for value in values: + self._pending_cells.append(coerce_list_cell(self.spec, value)) + self._flush_full_batches() + + def flush(self) -> None: + if self.spec.storage != "batch": + return + if self._pending_cells: + batch = list(self._pending_cells) + self._backend.append(batch) + self._persisted_row_count += len(batch) + self._pending_cells.clear() + + def close(self) -> None: + self.flush() + + def __getitem__(self, index: int | slice) -> Any: + if isinstance(index, slice): + return [self[i] for i in range(*index.indices(len(self)))] + index = self._normalize_index(index) + if self.spec.storage == "vl": + return self._backend[index] + if index >= self._persisted_row_count: + return self._pending_cells[index - self._persisted_row_count] + batch_index, inner_index = self._locate_persisted_row(index) + return self._backend[batch_index][inner_index] + + def __setitem__(self, index: int, value: Any) -> None: + cell = coerce_list_cell(self.spec, value) + index = self._normalize_index(index) + if self.spec.storage == "vl": + self._backend[index] = cell + return + if index >= self._persisted_row_count: + self._pending_cells[index - self._persisted_row_count] = cell + return + batch_index, inner_index = self._locate_persisted_row(index) + batch = self._backend[batch_index][:] + batch[inner_index] = cell + self._backend[batch_index] = batch + + def __len__(self) -> int: + if self.spec.storage == "vl": + return len(self._backend) + return self._persisted_row_count + len(self._pending_cells) + + def __iter__(self) -> Iterator[Any]: + for i in range(len(self)): + yield self[i] + + def copy(self, **kwargs: Any) -> ListArray: + out = ListArray(spec=self.spec, **kwargs) + out.extend(self) + if self.spec.storage == "batch": + out.flush() + return out + + @property + def schunk(self): + return self._backend.schunk + + @property + def meta(self): + return self._backend.meta + + @property + def vlmeta(self): + return self._backend.vlmeta + + @property + def cparams(self): + return self._backend.cparams + + @property + def dparams(self): + return self._backend.dparams + + @property + def urlpath(self) -> str | None: + return self._backend.urlpath + + @property + def contiguous(self) -> bool: + return self._backend.contiguous + + @property + def batch_rows(self) -> int | None: + if self.spec.batch_rows is not None: + return self.spec.batch_rows + return None + + @property + def nbytes(self) -> int: + return self._backend.nbytes + + @property + def cbytes(self) -> int: + return self._backend.cbytes + + @property + def cratio(self) -> float: + return self._backend.cratio + + @property + def info(self) -> InfoReporter: + return InfoReporter(self) + + @property + def info_items(self) -> list: + return [ + ("type", "ListArray"), + ("logical_type", self.spec.display_label()), + ("backend", self.spec.storage), + ("serializer", self.spec.serializer), + ("rows", len(self)), + ("pending_rows", len(self._pending_cells) if self.spec.storage == "batch" else 0), + ("nbytes", format_nbytes_info(self.nbytes)), + ("cbytes", format_nbytes_info(self.cbytes)), + ("cratio", f"{self.cratio:.2f}"), + ] + + def to_cframe(self) -> bytes: + self.flush() + return self._backend.to_cframe() + + def _arrow_item_type(self): + pa = _require_pyarrow() + kind = self.spec.item_spec.to_metadata_dict()["kind"] + mapping = { + "int8": pa.int8(), + "int16": pa.int16(), + "int32": pa.int32(), + "int64": pa.int64(), + "uint8": pa.uint8(), + "uint16": pa.uint16(), + "uint32": pa.uint32(), + "uint64": pa.uint64(), + "float32": pa.float32(), + "float64": pa.float64(), + "bool": pa.bool_(), + "string": pa.string(), + "bytes": pa.large_binary(), + } + return mapping.get(kind) + + def to_arrow(self): + pa = _require_pyarrow() + self.flush() + item_type = self._arrow_item_type() + if item_type is not None: + return pa.array(list(self), type=pa.list_(item_type)) + return pa.array(list(self)) + + @classmethod + def from_arrow( + cls, + arrow_array, + *, + item_spec: SchemaSpec | None = None, + nullable: bool = True, + storage: str = "batch", + serializer: str = "msgpack", + batch_rows: int | None = None, + items_per_block: int | None = None, + **kwargs: Any, + ) -> ListArray: + pa = _require_pyarrow() + if isinstance(arrow_array, pa.ChunkedArray): + arrow_array = arrow_array.combine_chunks() + if item_spec is None: + value_type = arrow_array.type.value_type + import blosc2.schema as b2s + + mapping = { + pa.int8(): b2s.int8(), + pa.int16(): b2s.int16(), + pa.int32(): b2s.int32(), + pa.int64(): b2s.int64(), + pa.uint8(): b2s.uint8(), + pa.uint16(): b2s.uint16(), + pa.uint32(): b2s.uint32(), + pa.uint64(): b2s.uint64(), + pa.float32(): b2s.float32(), + pa.float64(): b2s.float64(), + pa.bool_(): b2s.bool(), + pa.string(): b2s.string(), + pa.large_string(): b2s.string(), + pa.binary(): b2s.bytes(), + pa.large_binary(): b2s.bytes(), + } + item_spec = mapping.get(value_type) + if item_spec is None: + raise TypeError(f"Unsupported Arrow list item type {value_type!r}") + arr = cls( + item_spec=item_spec, + nullable=nullable, + storage=storage, + serializer=serializer, + batch_rows=batch_rows, + items_per_block=items_per_block, + **kwargs, + ) + arr.extend(arrow_array.to_pylist()) + return arr + + def __enter__(self) -> ListArray: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + self.close() + return False + + def __repr__(self) -> str: + return f"ListArray(type={self.spec.display_label()}, len={len(self)}, urlpath={self.urlpath!r})" diff --git a/src/blosc2/schema.py b/src/blosc2/schema.py index 560d7117..60da49de 100644 --- a/src/blosc2/schema.py +++ b/src/blosc2/schema.py @@ -22,6 +22,7 @@ # after our spec classes shadow them. _builtin_bool = bool _builtin_bytes = bytes +_builtin_list = list # --------------------------------------------------------------------------- @@ -318,6 +319,107 @@ def to_metadata_dict(self) -> dict[str, Any]: return d +# --------------------------------------------------------------------------- +# List spec +# --------------------------------------------------------------------------- + + +class ListSpec(SchemaSpec): + """Logical schema descriptor for a list-valued column.""" + + python_type = _builtin_list + dtype = None + + def __init__( + self, + item_spec: SchemaSpec, + *, + nullable: bool = False, + storage: str = "batch", + serializer: str = "msgpack", + batch_rows: int | None = None, + items_per_block: int | None = None, + ): + if not isinstance(item_spec, SchemaSpec): + raise TypeError("ListSpec item_spec must be a SchemaSpec instance") + if isinstance(item_spec, ListSpec): + raise TypeError("Nested list item specs are not supported in V1") + if storage not in {"batch", "vl"}: + raise ValueError("storage must be 'batch' or 'vl'") + if serializer not in {"msgpack", "arrow"}: + raise ValueError("serializer must be 'msgpack' or 'arrow'") + if storage == "vl" and serializer != "msgpack": + raise ValueError("storage='vl' only supports serializer='msgpack'") + if serializer == "arrow" and storage != "batch": + raise ValueError("serializer='arrow' requires storage='batch'") + self.item_spec = item_spec + self.nullable = nullable + self.storage = storage + self.serializer = serializer + self.batch_rows = batch_rows + self.items_per_block = items_per_block + + def to_pydantic_kwargs(self) -> dict[str, Any]: + return {} + + def to_metadata_dict(self) -> dict[str, Any]: + d = { + "kind": "list", + "item": self.item_spec.to_metadata_dict(), + "nullable": self.nullable, + "storage": self.storage, + "serializer": self.serializer, + } + if self.batch_rows is not None: + d["batch_rows"] = self.batch_rows + if self.items_per_block is not None: + d["items_per_block"] = self.items_per_block + return d + + def to_listarray_metadata(self) -> dict[str, Any]: + d = {"version": 1, **self.to_metadata_dict()} + d["backend"] = d.pop("storage") + return d + + def display_label(self) -> str: + item_kind = self.item_spec.to_metadata_dict().get("kind", type(self.item_spec).__name__) + return f"list[{item_kind}]" + + @classmethod + def from_metadata_dict(cls, data: dict[str, Any]) -> ListSpec: + from blosc2.schema_compiler import spec_from_metadata_dict + + backend = data.get("backend") + return cls( + spec_from_metadata_dict(data["item_spec"] if "item_spec" in data else data["item"]), + nullable=data.get("nullable", False), + storage=backend if backend is not None else data.get("storage", "batch"), + serializer=data.get("serializer", "msgpack"), + batch_rows=data.get("batch_rows"), + items_per_block=data.get("items_per_block"), + ) + + +def list( + item_spec: SchemaSpec, + *, + nullable: bool = False, + storage: str = "batch", + serializer: str = "msgpack", + batch_rows: int | None = None, + items_per_block: int | None = None, +) -> ListSpec: + """Build a list-valued schema descriptor for CTable and ListArray.""" + return ListSpec( + item_spec, + nullable=nullable, + storage=storage, + serializer=serializer, + batch_rows=batch_rows, + items_per_block=items_per_block, + ) + + # --------------------------------------------------------------------------- # Field helper # --------------------------------------------------------------------------- diff --git a/src/blosc2/schema_compiler.py b/src/blosc2/schema_compiler.py index 19a3d0c1..2e0ca357 100644 --- a/src/blosc2/schema_compiler.py +++ b/src/blosc2/schema_compiler.py @@ -19,6 +19,7 @@ from blosc2.schema import ( BLOSC2_FIELD_METADATA_KEY, + ListSpec, SchemaSpec, complex64, complex128, @@ -88,6 +89,8 @@ def compute_display_width(spec: SchemaSpec) -> int: """Return a reasonable terminal display width for *spec*'s column.""" + if isinstance(spec, ListSpec): + return max(18, len(spec.display_label()) + 4) dtype = spec.dtype if dtype.kind == "U": # fixed-width unicode (string spec) return max(10, min(dtype.itemsize // 4, 50)) @@ -132,7 +135,7 @@ class CompiledColumn: name: str py_type: Any spec: SchemaSpec - dtype: np.dtype + dtype: np.dtype | None default: Any # MISSING means required (no default) config: ColumnConfig display_width: int = 20 # terminal column width for __str__ / info() @@ -183,17 +186,25 @@ def infer_spec_from_annotation(annotation: Any) -> SchemaSpec: def validate_annotation_matches_spec(name: str, annotation: Any, spec: SchemaSpec) -> None: - """Raise :exc:`TypeError` if *annotation* is incompatible with *spec*. + """Raise :exc:`TypeError` if *annotation* is incompatible with *spec*.""" + if isinstance(spec, ListSpec): + origin = typing.get_origin(annotation) + if origin not in (list, list): + raise TypeError( + f"Column {name!r}: annotation {annotation!r} is incompatible with list spec; expected list[T]." + ) + args = typing.get_args(annotation) + if len(args) != 1: + raise TypeError(f"Column {name!r}: list annotations must specify exactly one item type.") + item_annotation = args[0] + expected = spec.item_spec.python_type + if item_annotation is not expected: + raise TypeError( + f"Column {name!r}: list item annotation {item_annotation!r} is incompatible with " + f"item spec {type(spec.item_spec).__name__!r} (expected {expected.__name__!r})." + ) + return - Parameters - ---------- - name: - Column name, used only in the error message. - annotation: - The resolved Python type from the dataclass field. - spec: - The :class:`SchemaSpec` attached via ``b2.field(...)``. - """ expected = spec.python_type if annotation is not expected: raise TypeError( @@ -305,7 +316,7 @@ def compile_schema(row_cls: type[Any]) -> CompiledSchema: name=name, py_type=annotation, spec=spec, - dtype=spec.dtype, + dtype=getattr(spec, "dtype", None), default=default, config=config, display_width=compute_display_width(spec), @@ -342,6 +353,19 @@ def _default_from_json(value: Any) -> Any: return value +def spec_from_metadata_dict(data: dict[str, Any]) -> SchemaSpec: + """Reconstruct one SchemaSpec from serialized metadata.""" + data = dict(data) + kind = data.pop("kind") + if kind == "list": + item_spec = spec_from_metadata_dict(data.pop("item")) + return ListSpec(item_spec, **data) + spec_cls = _KIND_TO_SPEC.get(kind) + if spec_cls is None: + raise ValueError(f"Unknown column kind {kind!r}") + return spec_cls(**data) + + def schema_to_dict(schema: CompiledSchema) -> dict[str, Any]: """Serialize *schema* to a JSON-compatible dict. @@ -410,19 +434,14 @@ def schema_from_dict(data: dict[str, Any]) -> CompiledSchema: chunks = tuple(entry.pop("chunks")) if "chunks" in entry else None blocks = tuple(entry.pop("blocks")) if "blocks" in entry else None - spec_cls = _KIND_TO_SPEC.get(kind) - if spec_cls is None: - raise ValueError(f"Unknown column kind {kind!r}") - - # Remaining keys in entry are constraint kwargs (ge, le, max_length, …) - spec = spec_cls(**entry) + spec = spec_from_metadata_dict({"kind": kind, **entry}) columns.append( CompiledColumn( name=name, py_type=spec.python_type, spec=spec, - dtype=spec.dtype, + dtype=getattr(spec, "dtype", None), default=default, config=ColumnConfig(cparams=cparams, dparams=dparams, chunks=chunks, blocks=blocks), display_width=compute_display_width(spec), diff --git a/src/blosc2/schema_validation.py b/src/blosc2/schema_validation.py index 91f157f7..88260701 100644 --- a/src/blosc2/schema_validation.py +++ b/src/blosc2/schema_validation.py @@ -19,6 +19,8 @@ from pydantic import BaseModel, Field, ValidationError, create_model +from blosc2.list_array import coerce_list_cell +from blosc2.schema import ListSpec from blosc2.schema_compiler import CompiledSchema # noqa: TC001 @@ -41,8 +43,14 @@ def build_validator_model(schema: CompiledSchema) -> type[BaseModel]: field_definitions: dict[str, Any] = {} for col in schema.columns: pydantic_kwargs = col.spec.to_pydantic_kwargs() - is_nullable = getattr(col.spec, "null_value", None) is not None - py_type = col.py_type | None if is_nullable else col.py_type + is_nullable = getattr(col.spec, "null_value", None) is not None or ( + isinstance(col.spec, ListSpec) and col.spec.nullable + ) + if isinstance(col.spec, ListSpec): + item_type = col.spec.item_spec.python_type + py_type = list[item_type] | None if is_nullable else list[item_type] + else: + py_type = col.py_type | None if is_nullable else col.py_type if col.default is MISSING: default = None if is_nullable else MISSING @@ -118,7 +126,11 @@ def validate_row(schema: CompiledSchema, row: dict[str, Any]) -> dict[str, Any]: name and the violated constraint. """ model_cls = build_validator_model(schema) - masked_row, nulled = _mask_nulls(schema, row) + normalized = dict(row) + for col in schema.columns: + if isinstance(col.spec, ListSpec) and col.name in normalized: + normalized[col.name] = coerce_list_cell(col.spec, normalized[col.name]) + masked_row, nulled = _mask_nulls(schema, normalized) try: instance = model_cls(**masked_row) except ValidationError as exc: @@ -148,7 +160,11 @@ def validate_rows_rowwise(schema: CompiledSchema, rows: list[dict[str, Any]]) -> model_cls = build_validator_model(schema) result = [] for i, row in enumerate(rows): - masked_row, nulled = _mask_nulls(schema, row) + normalized = dict(row) + for col in schema.columns: + if isinstance(col.spec, ListSpec) and col.name in normalized: + normalized[col.name] = coerce_list_cell(col.spec, normalized[col.name]) + masked_row, nulled = _mask_nulls(schema, normalized) try: instance = model_cls(**masked_row) except ValidationError as exc: diff --git a/src/blosc2/schema_vectorized.py b/src/blosc2/schema_vectorized.py index c15f2dd2..e2e6de6b 100644 --- a/src/blosc2/schema_vectorized.py +++ b/src/blosc2/schema_vectorized.py @@ -19,6 +19,8 @@ import numpy as np +from blosc2.list_array import coerce_list_cell +from blosc2.schema import ListSpec from blosc2.schema_compiler import CompiledColumn, CompiledSchema # noqa: TC001 @@ -75,6 +77,11 @@ def validate_column_values(col: CompiledColumn, values: Any) -> None: If any value violates a constraint declared on the column's spec. """ spec = col.spec + if isinstance(spec, ListSpec): + for value in values: + coerce_list_cell(spec, value) + return + arr = np.asarray(values) # Compute null mask so sentinels bypass constraint checks diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 55bf31db..4d2731d0 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -1653,6 +1653,11 @@ def process_opened_object(res): if "b2o" in meta: return blosc2.open_b2object(res) + if "listarray" in meta: + from blosc2.list_array import ListArray + + return ListArray(_from_schunk=getattr(res, "schunk", res)) + if "vlarray" in meta: from blosc2.vlarray import VLArray diff --git a/tests/ctable/test_varlen_columns.py b/tests/ctable/test_varlen_columns.py new file mode 100644 index 00000000..c05efc22 --- /dev/null +++ b/tests/ctable/test_varlen_columns.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import pytest + +import blosc2 + + +@dataclass +class Product: + code: str = blosc2.field(blosc2.string(max_length=8)) + qty: int = blosc2.field(blosc2.int32()) + tags: list[str] = blosc2.field( # noqa: RUF009 + blosc2.list(blosc2.string(max_length=16), nullable=True, batch_rows=2) + ) + + +DATA = [ + ("a", 1, ["x", "y"]), + ("b", 2, []), + ("c", 3, None), + ("d", 4, ["z"]), +] + + +def test_ctable_varlen_append_extend_and_reads(): + t = blosc2.CTable(Product) + t.append(DATA[0]) + t.extend(DATA[1:]) + + assert len(t) == 4 + assert t.tags[0] == ["x", "y"] + assert t.tags[1:4] == [[], None, ["z"]] + assert t.row[2].tags[0] is None + + t.tags[2] = ["r", "s"] + assert t.tags[2] == ["r", "s"] + + +def test_ctable_varlen_where_select_head_tail_and_compact(): + t = blosc2.CTable(Product, new_data=DATA) + view = t.where(t.qty >= 2) + assert view.tags[:] == [[], None, ["z"]] + sel = t.select(["code", "tags"]) + assert sel.tags[:] == [["x", "y"], [], None, ["z"]] + assert t.head(2).tags[:] == [["x", "y"], []] + assert t.tail(2).tags[:] == [None, ["z"]] + + t.delete([1]) + t.compact() + assert t.tags[:] == [["x", "y"], None, ["z"]] + + +def test_ctable_varlen_persistence_save_load_open(tmp_path): + path = tmp_path / "products.b2d" + t = blosc2.CTable(Product, new_data=DATA, urlpath=str(path), mode="w") + t.close() + + opened = blosc2.CTable.open(str(path), mode="r") + assert opened.tags[:] == [["x", "y"], [], None, ["z"]] + + loaded = blosc2.CTable.load(str(path)) + assert loaded.tags[:] == [["x", "y"], [], None, ["z"]] + loaded.tags[1] = ["changed"] + assert loaded.tags[1] == ["changed"] + + save_path = tmp_path / "products-save.b2d" + loaded.save(str(save_path)) + reopened = blosc2.CTable.open(str(save_path), mode="r") + assert reopened.tags[:] == [["x", "y"], ["changed"], None, ["z"]] + + +def test_ctable_varlen_arrow_roundtrip(): + pytest.importorskip("pyarrow") + + t = blosc2.CTable(Product, new_data=DATA) + arrow = t.to_arrow() + assert arrow.column("tags").to_pylist() == [["x", "y"], [], None, ["z"]] + + roundtrip = blosc2.CTable.from_arrow(arrow) + assert roundtrip.tags[:] == [["x", "y"], [], None, ["z"]] diff --git a/tests/ctable/test_varlen_schema_compiler.py b/tests/ctable/test_varlen_schema_compiler.py new file mode 100644 index 00000000..80e8d3cb --- /dev/null +++ b/tests/ctable/test_varlen_schema_compiler.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import pytest + +import blosc2 +from blosc2.schema import ListSpec +from blosc2.schema_compiler import compile_schema, schema_from_dict, schema_to_dict + + +@dataclass +class Product: + code: str = blosc2.field(blosc2.string(max_length=8)) + tags: list[str] = blosc2.field( # noqa: RUF009 + blosc2.list(blosc2.string(max_length=16), nullable=True, batch_rows=32) + ) + + +def test_list_builder_and_compile_schema(): + spec = blosc2.list(blosc2.string(max_length=10), nullable=True, storage="batch", serializer="msgpack") + assert isinstance(spec, ListSpec) + assert spec.nullable is True + assert spec.display_label() == "list[string]" + + schema = compile_schema(Product) + assert isinstance(schema.columns_by_name["tags"].spec, ListSpec) + assert schema.columns_by_name["tags"].dtype is None + + +def test_list_schema_roundtrip(): + schema = compile_schema(Product) + d = schema_to_dict(schema) + tags = next(c for c in d["columns"] if c["name"] == "tags") + assert tags["kind"] == "list" + assert tags["item"]["kind"] == "string" + restored = schema_from_dict(d) + assert isinstance(restored.columns_by_name["tags"].spec, ListSpec) + assert restored.columns_by_name["tags"].spec.batch_rows == 32 + + +def test_list_annotation_mismatch_rejected(): + @dataclass + class Bad: + tags: str = blosc2.field(blosc2.list(blosc2.string())) + + with pytest.raises(TypeError, match="list spec"): + compile_schema(Bad) diff --git a/tests/test_list_array.py b/tests/test_list_array.py new file mode 100644 index 00000000..e77bfff6 --- /dev/null +++ b/tests/test_list_array.py @@ -0,0 +1,76 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import pytest + +import blosc2 + + +@pytest.mark.parametrize("storage", ["vl", "batch"]) +def test_listarray_append_extend_and_replace(storage, tmp_path): + urlpath = tmp_path / f"values-{storage}.b2b" + arr = blosc2.ListArray( + item_spec=blosc2.string(max_length=16), + nullable=True, + storage=storage, + batch_rows=2, + urlpath=str(urlpath), + mode="w", + ) + arr.append(["a", "b"]) + arr.append([]) + arr.append(None) + arr.extend([["c"], ["d", "e"]]) + + assert len(arr) == 5 + assert arr[0] == ["a", "b"] + assert arr[1] == [] + assert arr[2] is None + assert arr[1:4] == [[], None, ["c"]] + + arr[3] = ["x", "y"] + assert arr[3] == ["x", "y"] + + arr.flush() + reopened = blosc2.open(str(urlpath), mode="r") + assert isinstance(reopened, blosc2.ListArray) + assert reopened[:] == [["a", "b"], [], None, ["x", "y"], ["d", "e"]] + + restored = blosc2.from_cframe(arr.to_cframe()) + assert isinstance(restored, blosc2.ListArray) + assert restored[:] == reopened[:] + + +def test_listarray_batch_pending_rows_visible_before_flush(): + arr = blosc2.ListArray(item_spec=blosc2.int32(), storage="batch", batch_rows=4) + arr.append([1, 2]) + arr.append([]) + arr.append([3]) + + assert len(arr) == 3 + assert arr[:] == [[1, 2], [], [3]] + + +def test_listarray_rejects_invalid_cells(): + arr = blosc2.ListArray(item_spec=blosc2.int32(), nullable=False) + with pytest.raises(ValueError): + arr.append(None) + with pytest.raises(TypeError): + arr.append("abc") + with pytest.raises(ValueError): + arr.append([1, None]) + + +def test_listarray_arrow_roundtrip(): + pa = pytest.importorskip("pyarrow") + + values = pa.array([["a"], None, ["b", "c"]]) + arr = blosc2.ListArray.from_arrow(values, item_spec=blosc2.string(), nullable=True) + assert arr[:] == [["a"], None, ["b", "c"]] + assert arr.to_arrow().to_pylist() == [["a"], None, ["b", "c"]] From cb18236b159ce3aa1eb6abcbf963c2be6d2471ab Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 09:36:29 +0200 Subject: [PATCH 3/6] First optimizations using a new benchmark --- bench/ctable/varlen.py | 241 +++++++++++++++++++++++++++++++++++++++ src/blosc2/ctable.py | 25 ++-- src/blosc2/list_array.py | 30 ++++- 3 files changed, 283 insertions(+), 13 deletions(-) create mode 100644 bench/ctable/varlen.py diff --git a/bench/ctable/varlen.py b/bench/ctable/varlen.py new file mode 100644 index 00000000..59e4d150 --- /dev/null +++ b/bench/ctable/varlen.py @@ -0,0 +1,241 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +"""Benchmark variable-length CTable columns. + +Covers: +1. append / extend performance +2. full-scan query performance +3. getitem performance for single rows and small slices + +Examples +-------- +python bench/ctable/varlen.py --rows 200000 --batch-size 1000 +python bench/ctable/varlen.py --rows 500000 --storages batch vl --repeats 5 +""" + +from __future__ import annotations + +import argparse +import gc +import statistics +import time +from dataclasses import dataclass + +import blosc2 + + +@dataclass +class RowBatch: + id: int = blosc2.field(blosc2.int64()) + group: int = blosc2.field(blosc2.int32()) + score: float = blosc2.field(blosc2.float64()) + tags: list[str] = blosc2.field( # noqa: RUF009 + blosc2.list(blosc2.string(max_length=24), nullable=True, storage="batch", batch_rows=1024) + ) + + +@dataclass +class RowVL: + id: int = blosc2.field(blosc2.int64()) + group: int = blosc2.field(blosc2.int32()) + score: float = blosc2.field(blosc2.float64()) + tags: list[str] = blosc2.field( # noqa: RUF009 + blosc2.list(blosc2.string(max_length=24), nullable=True, storage="vl") + ) + + +def make_row(i: int) -> tuple[int, int, float, list[str] | None]: + group = i % 97 + score = float((i * 13) % 1000) / 10.0 + mod = i % 11 + if mod == 0: + tags = None + elif mod == 1: + tags = [] + elif mod <= 4: + tags = [f"t{i % 1000}"] + elif mod <= 7: + tags = [f"t{i % 1000}", f"g{group}"] + else: + tags = [f"t{i % 1000}", f"g{group}", f"s{int(score)}"] + return i, group, score, tags + + +def make_rows(nrows: int) -> list[tuple[int, int, float, list[str] | None]]: + return [make_row(i) for i in range(nrows)] + + +def choose_row_type(storage: str): + if storage == "batch": + return RowBatch + if storage == "vl": + return RowVL + raise ValueError(f"Unsupported storage: {storage!r}") + + +def best_time(fn, *, repeats: int) -> float: + best = float("inf") + for _ in range(repeats): + gc.collect() + t0 = time.perf_counter() + fn() + dt = time.perf_counter() - t0 + best = min(best, dt) + return best + + +def median_time(fn, *, repeats: int) -> float: + samples = [] + for _ in range(repeats): + gc.collect() + t0 = time.perf_counter() + fn() + samples.append(time.perf_counter() - t0) + return statistics.median(samples) + + +def build_table_by_append(row_type, rows) -> blosc2.CTable: + t = blosc2.CTable(row_type, expected_size=len(rows)) + for row in rows: + t.append(row) + return t + + +def build_table_by_extend(row_type, rows, batch_size: int) -> blosc2.CTable: + t = blosc2.CTable(row_type, expected_size=len(rows)) + for start in range(0, len(rows), batch_size): + t.extend(rows[start : start + batch_size]) + return t + + +def query_count(table: blosc2.CTable) -> int: + view = table.where((table.group >= 10) & (table.group < 50) & (table.score >= 25.0)) + return len(view) + + +def query_with_list_touch(table: blosc2.CTable) -> int: + view = table.where((table.group >= 10) & (table.group < 50) & (table.score >= 25.0)) + total = 0 + for cell in view.tags: + total += 0 if cell is None else len(cell) + return total + + +def bench_getitem_single(table: blosc2.CTable, indices: list[int]) -> int: + total = 0 + col = table.tags + for idx in indices: + cell = col[idx] + total += 0 if cell is None else len(cell) + return total + + +def bench_getitem_slices(table: blosc2.CTable, starts: list[int], width: int) -> int: + total = 0 + col = table.tags + for start in starts: + cells = col[start : start + width] + for cell in cells: + total += 0 if cell is None else len(cell) + return total + + +def format_rate(n: int, seconds: float) -> str: + if seconds <= 0: + return "inf" + return f"{n / seconds:,.0f}/s" + + +def run_storage_bench(storage: str, rows, *, batch_size: int, repeats: int, nsamples: int, slice_width: int) -> None: + row_type = choose_row_type(storage) + print(f"\n=== storage={storage} ===") + + append_time = best_time(lambda: build_table_by_append(row_type, rows), repeats=repeats) + extend_time = best_time(lambda: build_table_by_extend(row_type, rows, batch_size), repeats=repeats) + + table = build_table_by_extend(row_type, rows, batch_size) + + q1 = query_count(table) + scan_count_time = median_time(lambda: query_count(table), repeats=repeats) + + q2 = query_with_list_touch(table) + scan_touch_time = median_time(lambda: query_with_list_touch(table), repeats=repeats) + + max_start = max(1, len(table) - slice_width - 1) + indices = [((i * 104729) % max_start) for i in range(nsamples)] + starts = [((i * 8191) % max_start) for i in range(nsamples)] + + single_sum = bench_getitem_single(table, indices) + single_time = median_time(lambda: bench_getitem_single(table, indices), repeats=repeats) + + slice_sum = bench_getitem_slices(table, starts, slice_width) + slice_time = median_time(lambda: bench_getitem_slices(table, starts, slice_width), repeats=repeats) + + print("append/extend") + print(f" append rows: {append_time:8.4f} s {format_rate(len(rows), append_time)}") + print(f" extend rows: {extend_time:8.4f} s {format_rate(len(rows), extend_time)}") + print(f" append/extend: {append_time / extend_time:8.2f}x slower") + + print("scan queries") + print(f" count only: {scan_count_time:8.4f} s matches={q1:,}") + print(f" count+list use: {scan_touch_time:8.4f} s payload={q2:,}") + + print("getitem") + print( + f" single row: {single_time:8.4f} s " + f"{format_rate(nsamples, single_time)} checksum={single_sum}" + ) + print( + f" slice[{slice_width}]: {slice_time:8.4f} s " + f"{format_rate(nsamples, slice_time)} checksum={slice_sum}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--rows", type=int, default=200_000, help="Number of rows to generate.") + parser.add_argument("--batch-size", type=int, default=1_000, help="Batch size used for extend().") + parser.add_argument("--repeats", type=int, default=5, help="Repetitions per benchmark.") + parser.add_argument( + "--storages", + nargs="+", + choices=("batch", "vl"), + default=["batch", "vl"], + help="List-column storage backends to benchmark.", + ) + parser.add_argument( + "--getitem-samples", + type=int, + default=20_000, + help="Number of random single-row / slice probes.", + ) + parser.add_argument("--slice-width", type=int, default=8, help="Width of small-slice getitem benchmark.") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + rows = make_rows(args.rows) + + print("CTable variable-length column benchmark") + print(f"rows={args.rows:,} batch_size={args.batch_size:,} repeats={args.repeats}") + print(f"getitem_samples={args.getitem_samples:,} slice_width={args.slice_width}") + + for storage in args.storages: + run_storage_bench( + storage, + rows, + batch_size=args.batch_size, + repeats=args.repeats, + nsamples=args.getitem_samples, + slice_width=args.slice_width, + ) + + +if __name__ == "__main__": + main() diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index ef52d284..acf2f349 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -4589,11 +4589,6 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> # Resolve effective validate flag: per-call override takes precedence do_validate = self._validate if validate is None else validate - if any(self._is_list_column(col) for col in self._schema.columns): - for row in data: - self.append(row) - return - start_pos = self._resolve_last_pos() current_col_names = self._stored_col_names # skip computed columns @@ -4632,11 +4627,15 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> validate_column_batch(self._schema, raw_columns) - processed_cols = [] + scalar_processed_cols: dict[str, blosc2.NDArray] = {} + list_processed_cols: dict[str, list] = {} for name in current_col_names: - target_dtype = self._cols[name].dtype - b2_arr = blosc2.asarray(raw_columns[name], dtype=target_dtype) - processed_cols.append(b2_arr) + col_meta = self._schema.columns_by_name[name] + if self._is_list_column(col_meta): + list_processed_cols[name] = list(raw_columns[name]) + else: + target_dtype = self._cols[name].dtype + scalar_processed_cols[name] = blosc2.asarray(raw_columns[name], dtype=target_dtype) end_pos = start_pos + new_nrows @@ -4648,8 +4647,12 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> while end_pos > len(self._valid_rows): self._grow() - for j, name in enumerate(current_col_names): - self._cols[name][start_pos:end_pos] = processed_cols[j][:] + for name in current_col_names: + col_meta = self._schema.columns_by_name[name] + if self._is_list_column(col_meta): + self._cols[name].extend(list_processed_cols[name]) + else: + self._cols[name][start_pos:end_pos] = scalar_processed_cols[name][:] self._valid_rows[start_pos:end_pos] = True self._last_pos = end_pos diff --git a/src/blosc2/list_array.py b/src/blosc2/list_array.py index a3d7d0d5..dcf3c701 100644 --- a/src/blosc2/list_array.py +++ b/src/blosc2/list_array.py @@ -159,6 +159,9 @@ def __init__( _validate_list_spec(self.spec) self._pending_cells: list[list[Any] | None] = [] self._persisted_row_count = 0 + self._persisted_prefix_cache: list[int] | None = None + self._cached_batch_index: int | None = None + self._cached_batch_values: list[list[Any] | None] | None = None storage_obj = self._coerce_storage(kwargs) fixed_meta = dict(storage_obj.meta or {}) @@ -195,6 +198,9 @@ def _init_from_schunk(self, schunk) -> None: la_meta = meta["listarray"] self.spec = ListSpec.from_metadata_dict(la_meta) self._pending_cells = [] + self._persisted_prefix_cache = None + self._cached_batch_index = None + self._cached_batch_values = None if self.spec.storage == "vl": if "vlarray" not in meta: raise ValueError("ListArray metadata says backend='vl' but VLArray tag is missing") @@ -206,6 +212,11 @@ def _init_from_schunk(self, schunk) -> None: self._backend = BatchArray(_from_schunk=schunk) self._persisted_row_count = self._persisted_rows_count() + def _invalidate_batch_caches(self) -> None: + self._persisted_prefix_cache = None + self._cached_batch_index = None + self._cached_batch_values = None + def _persisted_rows_count(self) -> int: if self.spec.storage == "vl": return len(self._backend) @@ -213,14 +224,25 @@ def _persisted_rows_count(self) -> int: return int(sum(lengths)) def _persisted_prefix_sums(self) -> list[int]: + if self._persisted_prefix_cache is not None: + return self._persisted_prefix_cache lengths = self._backend._load_or_compute_batch_lengths() prefix = [0] total = 0 for length in lengths: total += int(length) prefix.append(total) + self._persisted_prefix_cache = prefix return prefix + def _get_batch_values(self, batch_index: int) -> list[list[Any] | None]: + if self._cached_batch_index == batch_index and self._cached_batch_values is not None: + return self._cached_batch_values + batch_values = self._backend[batch_index][:] + self._cached_batch_index = batch_index + self._cached_batch_values = batch_values + return batch_values + def _normalize_index(self, index: int) -> int: if not isinstance(index, int): raise TypeError("ListArray indices must be integers") @@ -248,6 +270,7 @@ def _flush_full_batches(self) -> None: self._backend.append(batch) self._pending_cells = self._pending_cells[batch_rows:] self._persisted_row_count += len(batch) + self._invalidate_batch_caches() def append(self, value: Any) -> int: cell = coerce_list_cell(self.spec, value) @@ -276,6 +299,7 @@ def flush(self) -> None: self._backend.append(batch) self._persisted_row_count += len(batch) self._pending_cells.clear() + self._invalidate_batch_caches() def close(self) -> None: self.flush() @@ -289,7 +313,7 @@ def __getitem__(self, index: int | slice) -> Any: if index >= self._persisted_row_count: return self._pending_cells[index - self._persisted_row_count] batch_index, inner_index = self._locate_persisted_row(index) - return self._backend[batch_index][inner_index] + return self._get_batch_values(batch_index)[inner_index] def __setitem__(self, index: int, value: Any) -> None: cell = coerce_list_cell(self.spec, value) @@ -301,9 +325,11 @@ def __setitem__(self, index: int, value: Any) -> None: self._pending_cells[index - self._persisted_row_count] = cell return batch_index, inner_index = self._locate_persisted_row(index) - batch = self._backend[batch_index][:] + batch = self._get_batch_values(batch_index).copy() batch[inner_index] = cell self._backend[batch_index] = batch + self._cached_batch_index = batch_index + self._cached_batch_values = batch def __len__(self) -> int: if self.spec.storage == "vl": From 0b66978a090d19c4389c8587e7754feaac123235 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 11:18:10 +0200 Subject: [PATCH 4/6] Support for fancy indexing in CTable --- bench/ctable/varlen.py | 20 ++++++++++++++++++-- src/blosc2/ctable.py | 11 +++++------ src/blosc2/list_array.py | 41 ++++++++++++++++++++++++++++++++++++---- tests/test_list_array.py | 8 ++++++++ 4 files changed, 68 insertions(+), 12 deletions(-) diff --git a/bench/ctable/varlen.py b/bench/ctable/varlen.py index 59e4d150..549f8a08 100644 --- a/bench/ctable/varlen.py +++ b/bench/ctable/varlen.py @@ -169,12 +169,20 @@ def run_storage_bench(storage: str, rows, *, batch_size: int, repeats: int, nsam max_start = max(1, len(table) - slice_width - 1) indices = [((i * 104729) % max_start) for i in range(nsamples)] starts = [((i * 8191) % max_start) for i in range(nsamples)] + clustered_indices = [i % min(max_start, 4096) for i in range(nsamples)] + clustered_starts = [i % min(max_start, 2048) for i in range(nsamples)] single_sum = bench_getitem_single(table, indices) single_time = median_time(lambda: bench_getitem_single(table, indices), repeats=repeats) + single_clustered_time = median_time( + lambda: bench_getitem_single(table, clustered_indices), repeats=repeats + ) slice_sum = bench_getitem_slices(table, starts, slice_width) slice_time = median_time(lambda: bench_getitem_slices(table, starts, slice_width), repeats=repeats) + slice_clustered_time = median_time( + lambda: bench_getitem_slices(table, clustered_starts, slice_width), repeats=repeats + ) print("append/extend") print(f" append rows: {append_time:8.4f} s {format_rate(len(rows), append_time)}") @@ -187,13 +195,21 @@ def run_storage_bench(storage: str, rows, *, batch_size: int, repeats: int, nsam print("getitem") print( - f" single row: {single_time:8.4f} s " + f" single row random: {single_time:8.4f} s " f"{format_rate(nsamples, single_time)} checksum={single_sum}" ) print( - f" slice[{slice_width}]: {slice_time:8.4f} s " + f" single row local: {single_clustered_time:8.4f} s " + f"{format_rate(nsamples, single_clustered_time)}" + ) + print( + f" slice[{slice_width}] random: {slice_time:8.4f} s " f"{format_rate(nsamples, slice_time)} checksum={slice_sum}" ) + print( + f" slice[{slice_width}] local: {slice_clustered_time:8.4f} s " + f"{format_rate(nsamples, slice_clustered_time)}" + ) def parse_args() -> argparse.Namespace: diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index acf2f349..dbcc0739 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -627,7 +627,7 @@ def _values_from_key(self, key): # noqa: C901 chunk = np.asarray(self._raw_col[lo : hi + 1]) return chunk[selected_pos - lo] if self.is_list: - return [self._raw_col[int(pos)] for pos in selected_pos] + return self._raw_col[selected_pos] return np.asarray(self._raw_col[selected_pos]) elif isinstance(key, np.ndarray) and key.dtype == np.bool_: @@ -642,7 +642,7 @@ def _values_from_key(self, key): # noqa: C901 raw_np = np.asarray(self._raw_col[:]) return raw_np[phys_indices] if self.is_list: - return [self._raw_col[int(pos)] for pos in phys_indices] + return self._raw_col[phys_indices] return self._raw_col[phys_indices] elif isinstance(key, (list, tuple, np.ndarray)): @@ -652,7 +652,7 @@ def _values_from_key(self, key): # noqa: C901 raw_np = np.asarray(self._raw_col[:]) return raw_np[phys_indices] if self.is_list: - return [self._raw_col[int(pos)] for pos in phys_indices] + return self._raw_col[phys_indices] return self._raw_col[phys_indices] raise TypeError(f"Invalid index type: {type(key)}") @@ -780,8 +780,7 @@ def __iter__(self): yield from self._iter_chunks_computed(size=None) return if self.is_list: - for pos in np.where(self._valid_rows[:])[0]: - yield self._raw_col[int(pos)] + yield from self._raw_col[np.where(self._valid_rows[:])[0]] return arr = self._valid_rows chunk_size = arr.chunks[0] @@ -2906,7 +2905,7 @@ def _fetch_col_at_positions(self, name: str, positions: np.ndarray): col = self._cols[name] spec = self._schema.columns_by_name[name].spec if self._is_list_spec(spec): - return [col[int(p)] for p in positions] + return col[positions] return col[positions] def _schema_dict_with_computed(self) -> dict: diff --git a/src/blosc2/list_array.py b/src/blosc2/list_array.py index dcf3c701..091c3586 100644 --- a/src/blosc2/list_array.py +++ b/src/blosc2/list_array.py @@ -9,6 +9,7 @@ import copy from bisect import bisect_right +from collections import defaultdict from collections.abc import Iterable, Iterator from functools import lru_cache from typing import Any @@ -253,6 +254,9 @@ def _normalize_index(self, index: int) -> int: raise IndexError("ListArray index out of range") return index + def _normalize_indices(self, indices: Iterable[int]) -> list[int]: + return [self._normalize_index(int(index)) for index in indices] + def _locate_persisted_row(self, row_index: int) -> tuple[int, int]: prefix = self._persisted_prefix_sums() batch_index = bisect_right(prefix, row_index) - 1 @@ -304,9 +308,39 @@ def flush(self) -> None: def close(self) -> None: self.flush() - def __getitem__(self, index: int | slice) -> Any: + def _get_many(self, indices: list[int]) -> list[Any]: + if self.spec.storage == "vl": + return [self._backend[index] for index in indices] + + out: list[Any] = [None] * len(indices) + grouped: dict[int, list[tuple[int, int]]] = defaultdict(list) + for out_i, index in enumerate(indices): + if index >= self._persisted_row_count: + out[out_i] = self._pending_cells[index - self._persisted_row_count] + else: + batch_index, inner_index = self._locate_persisted_row(index) + grouped[batch_index].append((out_i, inner_index)) + + for batch_index, refs in grouped.items(): + batch_values = self._get_batch_values(batch_index) + for out_i, inner_index in refs: + out[out_i] = batch_values[inner_index] + return out + + def __getitem__(self, index: int | slice | list[int] | tuple[int, ...] | np.ndarray) -> Any: if isinstance(index, slice): - return [self[i] for i in range(*index.indices(len(self)))] + indices = list(range(*index.indices(len(self)))) + return self._get_many(indices) + if isinstance(index, np.ndarray): + if index.dtype == np.bool_: + if len(index) != len(self): + raise IndexError( + f"Boolean mask length {len(index)} does not match ListArray length {len(self)}" + ) + return self._get_many(np.flatnonzero(index).tolist()) + return self._get_many(self._normalize_indices(index.tolist())) + if isinstance(index, (list, tuple)): + return self._get_many(self._normalize_indices(index)) index = self._normalize_index(index) if self.spec.storage == "vl": return self._backend[index] @@ -337,8 +371,7 @@ def __len__(self) -> int: return self._persisted_row_count + len(self._pending_cells) def __iter__(self) -> Iterator[Any]: - for i in range(len(self)): - yield self[i] + yield from self[:] def copy(self, **kwargs: Any) -> ListArray: out = ListArray(spec=self.spec, **kwargs) diff --git a/tests/test_list_array.py b/tests/test_list_array.py index e77bfff6..649762a2 100644 --- a/tests/test_list_array.py +++ b/tests/test_list_array.py @@ -33,6 +33,7 @@ def test_listarray_append_extend_and_replace(storage, tmp_path): assert arr[1] == [] assert arr[2] is None assert arr[1:4] == [[], None, ["c"]] + assert arr[[0, 2, 4]] == [["a", "b"], None, ["d", "e"]] arr[3] = ["x", "y"] assert arr[3] == ["x", "y"] @@ -67,6 +68,13 @@ def test_listarray_rejects_invalid_cells(): arr.append([1, None]) +def test_listarray_boolean_fancy_indexing(): + arr = blosc2.ListArray(item_spec=blosc2.int32(), nullable=True, storage="batch", batch_rows=2) + arr.extend([[1], None, [], [2, 3]]) + assert arr[[3, 0]] == [[2, 3], [1]] + assert arr[blosc2.asarray([True, False, True, False])[:]] == [[1], []] + + def test_listarray_arrow_roundtrip(): pa = pytest.importorskip("pyarrow") From 90263755f930e28af316d218ae5a06a464e5c528 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 11:39:59 +0200 Subject: [PATCH 5/6] Better performance for monotonic indexing --- src/blosc2/list_array.py | 60 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/src/blosc2/list_array.py b/src/blosc2/list_array.py index 091c3586..0a84bd5a 100644 --- a/src/blosc2/list_array.py +++ b/src/blosc2/list_array.py @@ -308,10 +308,48 @@ def flush(self) -> None: def close(self) -> None: self.flush() - def _get_many(self, indices: list[int]) -> list[Any]: - if self.spec.storage == "vl": - return [self._backend[index] for index in indices] + def _get_many_monotonic(self, indices: list[int]) -> list[Any]: + out: list[Any] = [None] * len(indices) + prefix = self._persisted_prefix_sums() + batch_index = 0 + batch_values: list[list[Any] | None] | None = None + + i = 0 + while i < len(indices): + index = indices[i] + if index >= self._persisted_row_count: + pending_start = index - self._persisted_row_count + j = i + 1 + while ( + j < len(indices) + and indices[j] >= self._persisted_row_count + and indices[j] == indices[j - 1] + 1 + ): + j += 1 + span = j - i + out[i:j] = self._pending_cells[pending_start : pending_start + span] + i = j + continue + + while batch_index + 1 < len(prefix) and index >= prefix[batch_index + 1]: + batch_index += 1 + batch_values = None + if batch_values is None: + batch_values = self._get_batch_values(batch_index) + + batch_start = prefix[batch_index] + batch_end = prefix[batch_index + 1] + local_start = index - batch_start + j = i + 1 + while j < len(indices) and indices[j] == indices[j - 1] + 1 and indices[j] < batch_end: + j += 1 + span = j - i + out[i:j] = batch_values[local_start : local_start + span] + i = j + return out + + def _get_many_grouped(self, indices: list[int]) -> list[Any]: out: list[Any] = [None] * len(indices) grouped: dict[int, list[tuple[int, int]]] = defaultdict(list) for out_i, index in enumerate(indices): @@ -327,6 +365,22 @@ def _get_many(self, indices: list[int]) -> list[Any]: out[out_i] = batch_values[inner_index] return out + def _get_many(self, indices: list[int]) -> list[Any]: + if self.spec.storage == "vl": + return [self._backend[index] for index in indices] + if len(indices) <= 1: + return self._get_many_grouped(indices) + monotonic = True + prev = indices[0] + for index in indices[1:]: + if index < prev: + monotonic = False + break + prev = index + if monotonic: + return self._get_many_monotonic(indices) + return self._get_many_grouped(indices) + def __getitem__(self, index: int | slice | list[int] | tuple[int, ...] | np.ndarray) -> Any: if isinstance(index, slice): indices = list(range(*index.indices(len(self)))) From e055c53fb28d4bbb41e345e0f4d6a67d61ac5796 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 23 Apr 2026 13:14:18 +0200 Subject: [PATCH 6/6] Add a new SChunk.reorder_offsets() API --- src/blosc2/blosc2_ext.pyx | 7 +++ src/blosc2/schunk.py | 29 +++++++++++ tests/test_schunk_reorder_offsets.py | 73 ++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 tests/test_schunk_reorder_offsets.py diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index d1e65037..bddb5a0a 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -1963,6 +1963,13 @@ cdef class SChunk: raise RuntimeError("Could not update the desired chunk") return rc + def reorder_offsets(self, order): + cdef np.ndarray[np.int64_t, ndim=1] offsets_order = np.ascontiguousarray(order, dtype=np.int64) + rc = blosc2_schunk_reorder_offsets(self.schunk, offsets_order.data) + if rc < 0: + raise RuntimeError("Could not reorder the chunk offsets") + return None + def update_data(self, nchunk, data, copy): cdef Py_buffer buf PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE) diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 4d2731d0..7ede9bd0 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -876,6 +876,35 @@ def update_chunk(self, nchunk: int, chunk: bytes) -> int: blosc2_ext.check_access_mode(self.urlpath, self.mode) return super().update_chunk(nchunk, chunk) + def reorder_offsets(self, order: Any) -> None: + """Reorder the chunk offsets of the SChunk in place. + + This is a low-level storage operation that changes the physical chunk + order of the underlying SChunk. Higher-level containers backed by this + SChunk will observe the reordered chunk traversal afterwards. + + Parameters + ---------- + order: array-like of int + A one-dimensional permutation of ``range(self.nchunks)`` describing + the new chunk order. + + Raises + ------ + ValueError + If ``order`` is not one-dimensional or its length does not match + the number of chunks in the SChunk. + RuntimeError + If the underlying reorder operation fails. + """ + blosc2_ext.check_access_mode(self.urlpath, self.mode) + order = np.asarray(order, dtype=np.int64) + if order.ndim != 1: + raise ValueError("`order` must be a one-dimensional sequence") + if len(order) != self.nchunks: + raise ValueError("`order` must have exactly `self.nchunks` elements") + super().reorder_offsets(order) + def update_data(self, nchunk: int, data: object, copy: bool) -> int: """Update the chunk in the specified position with the given data. diff --git a/tests/test_schunk_reorder_offsets.py b/tests/test_schunk_reorder_offsets.py new file mode 100644 index 00000000..16db3782 --- /dev/null +++ b/tests/test_schunk_reorder_offsets.py @@ -0,0 +1,73 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +import numpy as np +import pytest + +import blosc2 + + +@pytest.mark.parametrize("contiguous", [True, False]) +@pytest.mark.parametrize("urlpath", [None, "reorder_offsets.b2frame"]) +@pytest.mark.parametrize("nchunks", [1, 5, 12]) +def test_schunk_reorder_offsets(contiguous, urlpath, nchunks): + blosc2.remove_urlpath(urlpath) + schunk = blosc2.SChunk( + chunksize=200 * 1000 * 4, + contiguous=contiguous, + urlpath=urlpath, + cparams={"typesize": 4, "nthreads": 2}, + dparams={"nthreads": 2}, + ) + + for i in range(nchunks): + buffer = np.arange(200 * 1000, dtype=np.int32) + i * 200 * 1000 + assert schunk.append_data(buffer) == (i + 1) + + order = np.array([(i + 3) % nchunks for i in range(nchunks)], dtype=np.int64) + schunk.reorder_offsets(order) + + for i in range(nchunks): + expected = np.arange(200 * 1000, dtype=np.int32) + order[i] * 200 * 1000 + dest = np.empty(200 * 1000, dtype=np.int32) + schunk.decompress_chunk(i, dest) + assert np.array_equal(dest, expected) + + blosc2.remove_urlpath(urlpath) + + +@pytest.mark.parametrize( + "order", + [ + [[0, 1]], + [0, 1], + [0, 0, 1], + [0, 1, 3], + ], +) +def test_schunk_reorder_offsets_invalid_order(order): + schunk = blosc2.SChunk(chunksize=16, cparams={"typesize": 1}) + for payload in (b"a" * 16, b"b" * 16, b"c" * 16): + schunk.append_data(payload) + + if order == [[0, 1]] or order == [0, 1]: + with pytest.raises(ValueError): + schunk.reorder_offsets(order) + else: + with pytest.raises(RuntimeError): + schunk.reorder_offsets(order) + + +def test_schunk_reorder_offsets_read_only(tmp_path): + urlpath = tmp_path / "reorder_offsets_read_only.b2frame" + schunk = blosc2.SChunk(chunksize=16, urlpath=urlpath, contiguous=True, cparams={"typesize": 1}) + schunk.append_data(b"a" * 16) + schunk.append_data(b"b" * 16) + + reopened = blosc2.open(urlpath, mode="r") + with pytest.raises(ValueError, match="reading mode"): + reopened.reorder_offsets([1, 0])