diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index ed2916703e..6565454a1e 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -143,8 +143,8 @@ io.airlift - aircompressor - 2.0.2 + aircompressor-v3 + 3.5 commons-pool diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java index 3b8d10851b..68f766be61 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java @@ -18,25 +18,53 @@ */ package org.apache.parquet.hadoop.codec; -import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.v3.lz4.Lz4Compressor; import java.io.IOException; import java.nio.ByteBuffer; public class Lz4RawCompressor extends NonBlockedCompressor { - private Lz4Compressor compressor = new Lz4Compressor(); + private final Lz4Compressor compressor = Lz4Compressor.create(); + + /** Reused for direct buffers; lazily allocated and grown when needed. */ + private byte[] inputBuf; + /** Reused for direct buffers; lazily allocated and grown when needed. */ + private byte[] outputBuf; @Override protected int maxCompressedLength(int byteSize) { - return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize); + return compressor.maxCompressedLength(byteSize); } @Override protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException { - compressor.compress(uncompressed, compressed); - int compressedSize = compressed.position(); - compressed.limit(compressedSize); - compressed.rewind(); + int startPos = compressed.position(); + int inputLen = uncompressed.remaining(); + int maxOut = compressed.remaining(); + + final int compressedSize; + if (uncompressed.hasArray() && compressed.hasArray()) { + int inputOffset = uncompressed.arrayOffset() + uncompressed.position(); + int outputOffset = compressed.arrayOffset() + compressed.position(); + compressedSize = compressor.compress( + uncompressed.array(), inputOffset, inputLen, + compressed.array(), outputOffset, maxOut); + // Advance positions to match the direct-buffer path (where get/put do this) + uncompressed.position(uncompressed.position() + inputLen); + } else { + if (inputBuf == null || inputBuf.length < inputLen) { + inputBuf = new byte[inputLen]; + } + if (outputBuf == null || outputBuf.length < maxOut) { + outputBuf = new byte[maxOut]; + } + uncompressed.get(inputBuf, 0, inputLen); + compressedSize = compressor.compress(inputBuf, 0, inputLen, outputBuf, 0, maxOut); + compressed.put(outputBuf, 0, compressedSize); + } + + compressed.limit(startPos + compressedSize); + compressed.position(startPos); return compressedSize; } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java index 68839d2814..e13fb811c0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java @@ -18,14 +18,19 @@ */ package org.apache.parquet.hadoop.codec; -import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.compress.v3.lz4.Lz4Decompressor; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.io.compress.DirectDecompressor; public class Lz4RawDecompressor extends NonBlockedDecompressor implements DirectDecompressor { - private Lz4Decompressor decompressor = new Lz4Decompressor(); + private final Lz4Decompressor decompressor = Lz4Decompressor.create(); + + /** Reused for direct buffers; lazily allocated and grown when needed. */ + private byte[] inputBuf; + /** Reused for direct buffers; lazily allocated and grown when needed. */ + private byte[] outputBuf; @Override protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException { @@ -36,10 +41,34 @@ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLe @Override protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException { - decompressor.decompress(compressed, uncompressed); - int uncompressedSize = uncompressed.position(); - uncompressed.limit(uncompressedSize); - uncompressed.rewind(); + int startPos = uncompressed.position(); + int compressedLen = compressed.remaining(); + int maxOut = uncompressed.remaining(); + + final int uncompressedSize; + if (compressed.hasArray() && uncompressed.hasArray()) { + int inputOffset = compressed.arrayOffset() + compressed.position(); + int outputOffset = uncompressed.arrayOffset() + uncompressed.position(); + uncompressedSize = decompressor.decompress( + compressed.array(), inputOffset, compressedLen, + uncompressed.array(), outputOffset, maxOut); + // Advance positions to match the direct-buffer path (where get/put do this) + compressed.position(compressed.position() + compressedLen); + } else { + if (inputBuf == null || inputBuf.length < compressedLen) { + inputBuf = new byte[compressedLen]; + } + if (outputBuf == null || outputBuf.length < maxOut) { + outputBuf = new byte[maxOut]; + } + compressed.get(inputBuf, 0, compressedLen); + uncompressedSize = decompressor.decompress( + inputBuf, 0, compressedLen, outputBuf, 0, maxOut); + uncompressed.put(outputBuf, 0, uncompressedSize); + } + + uncompressed.limit(startPos + uncompressedSize); + uncompressed.position(startPos); return uncompressedSize; }