diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index ed2916703e..6565454a1e 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -143,8 +143,8 @@
io.airlift
- aircompressor
- 2.0.2
+ aircompressor-v3
+ 3.5
commons-pool
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
index 3b8d10851b..68f766be61 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
@@ -18,25 +18,53 @@
*/
package org.apache.parquet.hadoop.codec;
-import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.v3.lz4.Lz4Compressor;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Lz4RawCompressor extends NonBlockedCompressor {
- private Lz4Compressor compressor = new Lz4Compressor();
+ private final Lz4Compressor compressor = Lz4Compressor.create();
+
+ /** Reused for direct buffers; lazily allocated and grown when needed. */
+ private byte[] inputBuf;
+ /** Reused for direct buffers; lazily allocated and grown when needed. */
+ private byte[] outputBuf;
@Override
protected int maxCompressedLength(int byteSize) {
- return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
+ return compressor.maxCompressedLength(byteSize);
}
@Override
protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
- compressor.compress(uncompressed, compressed);
- int compressedSize = compressed.position();
- compressed.limit(compressedSize);
- compressed.rewind();
+ int startPos = compressed.position();
+ int inputLen = uncompressed.remaining();
+ int maxOut = compressed.remaining();
+
+ final int compressedSize;
+ if (uncompressed.hasArray() && compressed.hasArray()) {
+ int inputOffset = uncompressed.arrayOffset() + uncompressed.position();
+ int outputOffset = compressed.arrayOffset() + compressed.position();
+ compressedSize = compressor.compress(
+ uncompressed.array(), inputOffset, inputLen,
+ compressed.array(), outputOffset, maxOut);
+ // Advance positions to match the direct-buffer path (where get/put do this)
+ uncompressed.position(uncompressed.position() + inputLen);
+ } else {
+ if (inputBuf == null || inputBuf.length < inputLen) {
+ inputBuf = new byte[inputLen];
+ }
+ if (outputBuf == null || outputBuf.length < maxOut) {
+ outputBuf = new byte[maxOut];
+ }
+ uncompressed.get(inputBuf, 0, inputLen);
+ compressedSize = compressor.compress(inputBuf, 0, inputLen, outputBuf, 0, maxOut);
+ compressed.put(outputBuf, 0, compressedSize);
+ }
+
+ compressed.limit(startPos + compressedSize);
+ compressed.position(startPos);
return compressedSize;
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
index 68839d2814..e13fb811c0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
@@ -18,14 +18,19 @@
*/
package org.apache.parquet.hadoop.codec;
-import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.v3.lz4.Lz4Decompressor;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.DirectDecompressor;
public class Lz4RawDecompressor extends NonBlockedDecompressor implements DirectDecompressor {
- private Lz4Decompressor decompressor = new Lz4Decompressor();
+ private final Lz4Decompressor decompressor = Lz4Decompressor.create();
+
+ /** Reused for direct buffers; lazily allocated and grown when needed. */
+ private byte[] inputBuf;
+ /** Reused for direct buffers; lazily allocated and grown when needed. */
+ private byte[] outputBuf;
@Override
protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
@@ -36,10 +41,34 @@ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLe
@Override
protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
- decompressor.decompress(compressed, uncompressed);
- int uncompressedSize = uncompressed.position();
- uncompressed.limit(uncompressedSize);
- uncompressed.rewind();
+ int startPos = uncompressed.position();
+ int compressedLen = compressed.remaining();
+ int maxOut = uncompressed.remaining();
+
+ final int uncompressedSize;
+ if (compressed.hasArray() && uncompressed.hasArray()) {
+ int inputOffset = compressed.arrayOffset() + compressed.position();
+ int outputOffset = uncompressed.arrayOffset() + uncompressed.position();
+ uncompressedSize = decompressor.decompress(
+ compressed.array(), inputOffset, compressedLen,
+ uncompressed.array(), outputOffset, maxOut);
+ // Advance positions to match the direct-buffer path (where get/put do this)
+ compressed.position(compressed.position() + compressedLen);
+ } else {
+ if (inputBuf == null || inputBuf.length < compressedLen) {
+ inputBuf = new byte[compressedLen];
+ }
+ if (outputBuf == null || outputBuf.length < maxOut) {
+ outputBuf = new byte[maxOut];
+ }
+ compressed.get(inputBuf, 0, compressedLen);
+ uncompressedSize = decompressor.decompress(
+ inputBuf, 0, compressedLen, outputBuf, 0, maxOut);
+ uncompressed.put(outputBuf, 0, uncompressedSize);
+ }
+
+ uncompressed.limit(startPos + uncompressedSize);
+ uncompressed.position(startPos);
return uncompressedSize;
}