diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index eee5fa6083..98b49835a6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -39,6 +39,7 @@ import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.conf.HadoopParquetConfiguration; import org.apache.parquet.conf.ParquetConfiguration; +import org.apache.parquet.hadoop.codec.Lz4RawCodec; import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; @@ -170,11 +171,12 @@ public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOEx } InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - // We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to - // avoid off-heap memory fragmentation issue, see https://github.com/apache/parquet-format/issues/398. - // This change will load the decompressor stream into heap a little earlier, since the problem it solves - // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. - if (codec instanceof ZstandardCodec) { + // Eagerly materialize the decompressed stream for codecs that require all input in a single buffer. + // ZSTD: releases off-heap resources early to avoid fragmentation (see parquet-format#398). + // LZ4_RAW: requires one-shot decompression; the lazy StreamBytesInput.writeInto() path reads via + // Channels.newChannel() in ~8KB chunks, causing the decompressor to be called with an undersized + // output buffer (see #3478). + if (codec instanceof ZstandardCodec || codec instanceof Lz4RawCodec) { decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize)); is.close(); } else { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java index f202c63501..ae2bd87ac9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java @@ -33,7 +33,13 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.Assert; import org.junit.Test; @@ -41,6 +47,8 @@ public class TestCompressionCodec { + private final int pageSize = 64 * 1024; + @Test public void testLz4RawBlock() throws IOException { testBlock(CompressionCodecName.LZ4_RAW); @@ -177,6 +185,35 @@ private CompressionCodec getCodec(CompressionCodecName codecName, int bufferSize } } + /** + * Regression test for #3478: LZ4_RAW heap decompression fails when the decompressed page + * exceeds the ~8KB chunk size used by stream materialization in BytesInput.copy(). + */ + @Test + public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException { + final int size = 16 * 1024; + final byte[] raw = new byte[size]; + new Random(42).nextBytes(raw); + + try (TrackingByteBufferAllocator allocator = + TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) { + CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor compressor = heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW); + BytesInputDecompressor decompressor = heapCodecFactory.getDecompressor(CompressionCodecName.LZ4_RAW); + + BytesInput compressed = compressor.compress(BytesInput.from(raw)); + BytesInput decompressed = decompressor.decompress(compressed, size); + + BytesInput copied = decompressed.copy(releaser); + Assert.assertArrayEquals(raw, copied.toByteArray()); + + compressor.release(); + decompressor.release(); + heapCodecFactory.release(); + } + } + @Test public void TestDecompressorInvalidState() throws IOException { // Create a mock Decompressor that returns 0 when decompress is called.