Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.codec.Lz4RawCodec;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
Expand Down Expand Up @@ -170,11 +171,12 @@ public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOEx
}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

// We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to
// avoid off-heap memory fragmentation issue, see https://github.com/apache/parquet-format/issues/398.
// This change will load the decompressor stream into heap a little earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for ZSTD streams.
if (codec instanceof ZstandardCodec) {
// Eagerly materialize the decompressed stream for codecs that require all input in a single buffer.
// ZSTD: releases off-heap resources early to avoid fragmentation (see parquet-format#398).
// LZ4_RAW: requires one-shot decompression; the lazy StreamBytesInput.writeInto() path reads via
// Channels.newChannel() in ~8KB chunks, causing the decompressor to be called with an undersized
// output buffer (see #3478).
if (codec instanceof ZstandardCodec || codec instanceof Lz4RawCodec) {
decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
is.close();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,22 @@
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestCompressionCodec {

private final int pageSize = 64 * 1024;

@Test
public void testLz4RawBlock() throws IOException {
testBlock(CompressionCodecName.LZ4_RAW);
Expand Down Expand Up @@ -177,6 +185,35 @@ private CompressionCodec getCodec(CompressionCodecName codecName, int bufferSize
}
}

/**
* Regression test for #3478: LZ4_RAW heap decompression fails when the decompressed page
* exceeds the ~8KB chunk size used by stream materialization in BytesInput.copy().
*/
@Test
public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException {
final int size = 16 * 1024;
final byte[] raw = new byte[size];
new Random(42).nextBytes(raw);

try (TrackingByteBufferAllocator allocator =
TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), pageSize);
BytesInputCompressor compressor = heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW);
BytesInputDecompressor decompressor = heapCodecFactory.getDecompressor(CompressionCodecName.LZ4_RAW);

BytesInput compressed = compressor.compress(BytesInput.from(raw));
BytesInput decompressed = decompressor.decompress(compressed, size);

BytesInput copied = decompressed.copy(releaser);
Assert.assertArrayEquals(raw, copied.toByteArray());

compressor.release();
decompressor.release();
heapCodecFactory.release();
}
}

@Test
public void TestDecompressorInvalidState() throws IOException {
// Create a mock Decompressor that returns 0 when decompress is called.
Expand Down