diff --git a/.gitignore b/.gitignore index ef8c522482..41dadee2e2 100644 --- a/.gitignore +++ b/.gitignore @@ -45,5 +45,6 @@ htmlcov .ipynb_checkpoints/ pyiceberg/avro/decoder_fast.c +pyiceberg/avro/encoder_fast.c pyiceberg/avro/*.html pyiceberg/avro/*.so diff --git a/MANIFEST.in b/MANIFEST.in index 7215e0d6e3..f01e41c217 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -20,6 +20,7 @@ recursive-include pyiceberg *.pyx *.c # Exclude generated Cython C file exclude pyiceberg/avro/decoder_fast.c +exclude pyiceberg/avro/encoder_fast.c # Include test files in sdist recursive-include tests * diff --git a/pyiceberg/avro/encoder.py b/pyiceberg/avro/encoder.py index 43a711fa6b..f77d6e3757 100644 --- a/pyiceberg/avro/encoder.py +++ b/pyiceberg/avro/encoder.py @@ -78,3 +78,29 @@ def write_uuid(self, uuid: UUID) -> None: def write_unknown(self, _: Any) -> None: """Nulls are written as 0 bytes in avro, so we do nothing.""" + + +class MemoryBinaryEncoder(BinaryEncoder): + """BinaryEncoder that writes to an owned in-memory buffer.""" + + def __init__(self) -> None: + import io + + self._buffer = io.BytesIO() + super().__init__(self._buffer) + + def getvalue(self) -> bytes: + return self._buffer.getvalue() + + +def new_memory_encoder() -> "CythonBinaryEncoder | MemoryBinaryEncoder": # type: ignore[name-defined] # noqa: F821 + try: + from pyiceberg.avro.encoder_fast import CythonBinaryEncoder + + return CythonBinaryEncoder() + except ModuleNotFoundError: + import warnings + + warnings.warn("Falling back to pure Python Avro encoder, missing Cython implementation", stacklevel=2) + + return MemoryBinaryEncoder() diff --git a/pyiceberg/avro/encoder_fast.pyi b/pyiceberg/avro/encoder_fast.pyi new file mode 100644 index 0000000000..ad4d49d9dc --- /dev/null +++ b/pyiceberg/avro/encoder_fast.pyi @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Any +from uuid import UUID + +class CythonBinaryEncoder: + def __init__(self) -> None: ... + def getvalue(self) -> bytes: ... + def write(self, b: bytes) -> None: ... + def write_boolean(self, v: bool) -> None: ... + def write_int(self, v: int) -> None: ... + def write_float(self, v: float) -> None: ... + def write_double(self, v: float) -> None: ... + def write_bytes(self, b: bytes) -> None: ... + def write_utf8(self, s: str) -> None: ... + def write_uuid(self, uuid: UUID) -> None: ... + def write_unknown(self, _: Any) -> None: ... diff --git a/pyiceberg/avro/encoder_fast.pyx b/pyiceberg/avro/encoder_fast.pyx new file mode 100644 index 0000000000..862639d876 --- /dev/null +++ b/pyiceberg/avro/encoder_fast.pyx @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import cython +from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING, PyBytes_GET_SIZE +from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free +from libc.string cimport memcpy +from libc.stdint cimport uint8_t, uint64_t, int64_t + +from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT + +cdef uint64_t _INITIAL_CAPACITY = 1024 + + +@cython.final +cdef class CythonBinaryEncoder: + """In-memory BinaryEncoder that writes to a growable C buffer. + + Drop-in replacement for BinaryEncoder for the block-encoding path: + exposes the same write_* methods the Writer tree calls, plus + getvalue() to materialise the encoded bytes once at the end. + """ + + cdef unsigned char *_data + cdef uint64_t _size + cdef uint64_t _capacity + + def __cinit__(self): + self._data = PyMem_Malloc(_INITIAL_CAPACITY) + if not self._data: + raise MemoryError() + self._size = 0 + self._capacity = _INITIAL_CAPACITY + + def __dealloc__(self): + PyMem_Free(self._data) + + cdef inline int _ensure(self, uint64_t n) except -1: + cdef uint64_t need = self._size + n + if need <= self._capacity: + return 0 + cdef uint64_t cap = self._capacity + while cap < need: + cap <<= 1 + cdef unsigned char *grown = PyMem_Realloc(self._data, cap) + if not grown: + raise MemoryError() + self._data = grown + self._capacity = cap + return 0 + + cpdef bytes getvalue(self): + return PyBytes_FromStringAndSize( self._data, self._size) + + cpdef void write(self, bytes b): + cdef Py_ssize_t n = PyBytes_GET_SIZE(b) + self._ensure(n) + memcpy(self._data + self._size, PyBytes_AS_STRING(b), n) + self._size += n + + cpdef void write_boolean(self, bint v): + self._ensure(1) + self._data[self._size] = 1 if v else 0 + self._size += 1 + + cpdef void write_int(self, int64_t v): + # zigzag then base-128 varint; a 64-bit value needs at most 10 bytes + self._ensure(10) + cdef uint64_t uv = v + cdef uint64_t datum = (uv << 1) ^ (0 - (uv >> 63)) + cdef unsigned char *p = self._data + self._size + while datum & ~0x7F: + p[0] = ((datum & 0x7F) | 0x80) + p += 1 + datum >>= 7 + p[0] = datum + p += 1 + self._size = p - self._data + + def write_float(self, v: float) -> None: + self.write(STRUCT_FLOAT.pack(v)) + + def write_double(self, v: float) -> None: + self.write(STRUCT_DOUBLE.pack(v)) + + cpdef void write_bytes(self, b): + cdef bytes bb = bytes(b) if type(b) is not bytes else b + cdef Py_ssize_t n = PyBytes_GET_SIZE(bb) + self.write_int(n) + self._ensure(n) + memcpy(self._data + self._size, PyBytes_AS_STRING(bb), n) + self._size += n + + def write_utf8(self, s) -> None: + self.write_bytes(s.encode("utf-8")) + + def write_uuid(self, uuid) -> None: + cdef bytes b = uuid.bytes + if PyBytes_GET_SIZE(b) != 16: + raise ValueError(f"Expected UUID to have 16 bytes, got: len({b!r})") + self._ensure(16) + memcpy(self._data + self._size, PyBytes_AS_STRING(b), 16) + self._size += 16 + + def write_unknown(self, _) -> None: + pass diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 7db92818fe..88e3b94c5e 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -19,7 +19,6 @@ from __future__ import annotations -import io import json import os from collections.abc import Callable @@ -34,7 +33,7 @@ from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder -from pyiceberg.avro.encoder import BinaryEncoder +from pyiceberg.avro.encoder import BinaryEncoder, new_memory_encoder from pyiceberg.avro.reader import Reader from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve_reader, resolve_writer from pyiceberg.avro.writer import Writer @@ -300,11 +299,10 @@ def compression_codec(self) -> type[Codec] | None: return KNOWN_CODECS[codec_name] # type: ignore def write_block(self, objects: list[D]) -> None: - in_memory = io.BytesIO() - block_content_encoder = BinaryEncoder(output_stream=in_memory) + block_content_encoder = new_memory_encoder() for obj in objects: self.writer.write(block_content_encoder, obj) - block_content = in_memory.getvalue() + block_content = block_content_encoder.getvalue() self.encoder.write_int(len(objects)) diff --git a/setup.py b/setup.py index 34eee94bbd..d4d504568c 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,13 @@ def make_release_tree(self, base_dir: str, files: list[str]) -> None: [os.path.join(package_path, "avro", "decoder_fast.pyx")], extra_compile_args=extra_compile_args, language="c", - ) + ), + Extension( + "pyiceberg.avro.encoder_fast", + [os.path.join(package_path, "avro", "encoder_fast.pyx")], + extra_compile_args=extra_compile_args, + language="c", + ), ] ext_modules = cythonize( diff --git a/tests/avro/test_encoder.py b/tests/avro/test_encoder.py index 5866719434..c5ece6842c 100644 --- a/tests/avro/test_encoder.py +++ b/tests/avro/test_encoder.py @@ -16,38 +16,42 @@ # under the License. from __future__ import annotations -import io import struct import uuid +from typing import Any -from pyiceberg.avro.encoder import BinaryEncoder +import pytest +from pyiceberg.avro.decoder import new_decoder +from pyiceberg.avro.encoder import MemoryBinaryEncoder -def test_write() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) +ENCODERS: list[Any] = [MemoryBinaryEncoder] +try: + from pyiceberg.avro.encoder_fast import CythonBinaryEncoder - _input = b"\x12\x34\x56" + ENCODERS.append(CythonBinaryEncoder) +except ModuleNotFoundError: + pass - encoder.write(_input) - assert output.getbuffer() == _input +@pytest.fixture(params=ENCODERS, ids=[c.__name__ for c in ENCODERS]) +def encoder(request: pytest.FixtureRequest) -> Any: + return request.param() + +def test_write(encoder: Any) -> None: + _input = b"\x12\x34\x56" + encoder.write(_input) + assert encoder.getvalue() == _input -def test_write_boolean() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) +def test_write_boolean(encoder: Any) -> None: encoder.write_boolean(True) encoder.write_boolean(False) + assert encoder.getvalue() == struct.pack("??", True, False) - assert output.getbuffer() == struct.pack("??", True, False) - - -def test_write_int() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) +def test_write_int(encoder: Any) -> None: _1byte_input = 2 _2byte_input = 7466 _3byte_input = 523490 @@ -66,7 +70,7 @@ def test_write_int() -> None: encoder.write_int(_7byte_input) encoder.write_int(_8byte_input) - buffer = output.getbuffer() + buffer = encoder.getvalue() assert buffer[0:1] == b"\x04" assert buffer[1:3] == b"\xd4\x74" @@ -78,57 +82,64 @@ def test_write_int() -> None: assert buffer[28:36] == b"\xc4\xa0\xb2\xae\x83\xf8\xe4\x7c" -def test_write_float() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - +def test_write_float(encoder: Any) -> None: _input = 3.14159265359 - encoder.write_float(_input) - - assert output.getbuffer() == struct.pack(" None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - +def test_write_double(encoder: Any) -> None: _input = 3.14159265359 - encoder.write_double(_input) - - assert output.getbuffer() == struct.pack(" None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - +def test_write_bytes(encoder: Any) -> None: _input = b"\x12\x34\x56" - encoder.write_bytes(_input) + assert encoder.getvalue() == b"".join([b"\x06", _input]) - assert output.getbuffer() == b"".join([b"\x06", _input]) - - -def test_write_utf8() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) +def test_write_utf8(encoder: Any) -> None: _input = "That, my liege, is how we know the Earth to be banana-shaped." bin_input = _input.encode() encoder.write_utf8(_input) - - assert output.getbuffer() == b"".join([b"\x7a", bin_input]) + assert encoder.getvalue() == b"".join([b"\x7a", bin_input]) -def test_write_uuid() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - +def test_write_uuid(encoder: Any) -> None: _input = uuid.UUID("12345678-1234-5678-1234-567812345678") encoder.write_uuid(_input) - - buf = output.getbuffer() + buf = encoder.getvalue() assert len(buf) == 16 - assert buf.tobytes() == b"\x124Vx\x124Vx\x124Vx\x124Vx" + assert buf == b"\x124Vx\x124Vx\x124Vx\x124Vx" + + +@pytest.mark.parametrize( + "v", + [0, 1, -1, 63, 64, -64, -65, 127, 128, -128, 2**31 - 1, -(2**31), 2**62, -(2**62), 2**63 - 1, -(2**63)], +) +def test_int_round_trip(encoder: Any, v: int) -> None: + encoder.write_int(v) + decoder = new_decoder(encoder.getvalue()) + assert decoder.read_int() == v + + +def test_encoders_byte_identical() -> None: + """Both encoder implementations must produce identical output.""" + if len(ENCODERS) < 2: + pytest.skip("Cython encoder not built") + + def fill(enc: Any) -> bytes: + enc.write_boolean(True) + for v in [0, 1, -1, 7466, -523490, 2**62, -(2**62)]: + enc.write_int(v) + enc.write_float(1.5) + enc.write_double(3.14159265359) + enc.write_bytes(b"\x00\xff" * 50) + enc.write_utf8("hello ☃ snowman") + enc.write_uuid(uuid.UUID("12345678-1234-5678-1234-567812345678")) + return enc.getvalue() + + outputs = [fill(cls()) for cls in ENCODERS] + assert outputs[0] == outputs[1]