From 618812548ced81d9eef9e76057a927fbeead74e3 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 26 Feb 2026 18:21:47 +0800 Subject: [PATCH 1/4] GH-1041: Implement snappy compression --- compression/pom.xml | 5 + compression/src/main/java/module-info.java | 1 + .../CommonsCompressionFactory.java | 4 + .../compression/SnappyCompressionCodec.java | 93 +++++++++++++++++++ .../compression/TestCompressionCodec.java | 3 + .../apache/arrow/flatbuf/CompressionType.java | 3 +- .../vector/compression/CompressionUtil.java | 4 +- .../compression/NoCompressionCodec.java | 1 + 8 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java diff --git a/compression/pom.xml b/compression/pom.xml index 29f8b41788..81af86ef0f 100644 --- a/compression/pom.xml +++ b/compression/pom.xml @@ -57,5 +57,10 @@ under the License. zstd-jni 1.5.7-6 + + org.xerial.snappy + snappy-java + 1.1.10.7 + diff --git a/compression/src/main/java/module-info.java b/compression/src/main/java/module-info.java index 113a1dba9d..8a91e92754 100644 --- a/compression/src/main/java/module-info.java +++ b/compression/src/main/java/module-info.java @@ -24,6 +24,7 @@ requires org.apache.arrow.memory.core; requires org.apache.arrow.vector; requires org.apache.commons.compress; + requires org.xerial.snappy; // Also defined under META-INF/services to support non-modular applications provides CompressionCodec.Factory with diff --git a/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java index f15c139df1..6ab2603922 100644 --- a/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java +++ b/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -33,6 +33,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { switch (codecType) { case LZ4_FRAME: return new Lz4CompressionCodec(); + case SNAPPY: + return new SnappyCompressionCodec(); case ZSTD: return new ZstdCompressionCodec(); default: @@ -45,6 +47,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com switch (codecType) { case LZ4_FRAME: return new Lz4CompressionCodec(); + case SNAPPY: + return new SnappyCompressionCodec(); case ZSTD: return new ZstdCompressionCodec(compressionLevel); default: diff --git a/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java new file mode 100644 index 0000000000..ca3891d6b9 --- /dev/null +++ b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.compression; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.xerial.snappy.Snappy; + +/** Compression codec for the Snappy algorithm. */ +public class SnappyCompressionCodec extends AbstractCompressionCodec { + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument( + uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit %s.", + Integer.MAX_VALUE); + + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + uncompressedBuffer.getBytes(/* index= */ 0, inBytes); + + final byte[] outBytes; + try { + outBytes = Snappy.compress(inBytes); + } catch (Exception e) { + throw new RuntimeException("Error compressing with Snappy", e); + } + + ArrowBuf compressedBuffer = + allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument( + compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit %s", + Integer.MAX_VALUE); + + long decompressedLength = readUncompressedLength(compressedBuffer); + + byte[] inBytes = + new byte + [(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; + compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes); + + final byte[] outBytes; + try { + outBytes = Snappy.uncompress(inBytes); + } catch (Exception e) { + throw new RuntimeException("Error decompressing with Snappy", e); + } + + if (outBytes.length != decompressedLength) { + throw new RuntimeException( + "Expected != actual decompressed length: " + + decompressedLength + + " != " + + outBytes.length); + } + + ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength); + decompressedBuffer.setBytes(/* index= */ 0, outBytes); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.SNAPPY; + } +} + diff --git a/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java index b8fb4e28b9..d326e11b6d 100644 --- a/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java +++ b/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -88,6 +88,9 @@ static Collection codecs() { CompressionCodec lz4Codec = new Lz4CompressionCodec(); params.add(Arguments.arguments(len, lz4Codec)); + CompressionCodec snappyCodec = new SnappyCompressionCodec(); + params.add(Arguments.arguments(len, snappyCodec)); + CompressionCodec zstdCodec = new ZstdCompressionCodec(); params.add(Arguments.arguments(len, zstdCodec)); diff --git a/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java b/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java index 7b3b27701b..7aec4fb255 100644 --- a/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java +++ b/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java @@ -21,8 +21,9 @@ public final class CompressionType { private CompressionType() { } public static final byte LZ4_FRAME = 0; public static final byte ZSTD = 1; + public static final byte SNAPPY = 2; - public static final String[] names = { "LZ4_FRAME", "ZSTD", }; + public static final String[] names = { "LZ4_FRAME", "ZSTD", "SNAPPY", }; public static String name(int e) { return names[e]; } } diff --git a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 03763611e0..3b24c26fb5 100644 --- a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -33,7 +33,9 @@ public enum CodecType { LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME), - ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD); + ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD), + + SNAPPY(org.apache.arrow.flatbuf.CompressionType.SNAPPY); private final byte type; diff --git a/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 4debce335a..e699febc4b 100644 --- a/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -59,6 +59,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { case NO_COMPRESSION: return NoCompressionCodec.INSTANCE; case LZ4_FRAME: + case SNAPPY: case ZSTD: throw new IllegalArgumentException( "Please add arrow-compression module to use CommonsCompressionFactory for " From 3f6d0e55e7533eba8a6cfe1abbdf5ebf6e07e67b Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 26 Feb 2026 18:37:08 +0800 Subject: [PATCH 2/4] Addressed --- .../org/apache/arrow/compression/SnappyCompressionCodec.java | 1 - .../org/apache/arrow/vector/compression/CompressionUtil.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java index ca3891d6b9..161d7e10d8 100644 --- a/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java +++ b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java @@ -90,4 +90,3 @@ public CompressionUtil.CodecType getCodecType() { return CompressionUtil.CodecType.SNAPPY; } } - diff --git a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 3b24c26fb5..4a26e356ae 100644 --- a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -34,7 +34,7 @@ public enum CodecType { LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME), ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD), - + SNAPPY(org.apache.arrow.flatbuf.CompressionType.SNAPPY); private final byte type; From a01400d01e763e805ba0a0be3710c883becdca0e Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 26 Feb 2026 18:42:17 +0800 Subject: [PATCH 3/4] Addressed --- compression/src/main/java/module-info.java | 1 - 1 file changed, 1 deletion(-) diff --git a/compression/src/main/java/module-info.java b/compression/src/main/java/module-info.java index 8a91e92754..113a1dba9d 100644 --- a/compression/src/main/java/module-info.java +++ b/compression/src/main/java/module-info.java @@ -24,7 +24,6 @@ requires org.apache.arrow.memory.core; requires org.apache.arrow.vector; requires org.apache.commons.compress; - requires org.xerial.snappy; // Also defined under META-INF/services to support non-modular applications provides CompressionCodec.Factory with From caf9052d7d847f731eca094fcb389147a2aae11b Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 26 Feb 2026 18:49:50 +0800 Subject: [PATCH 4/4] Addressed --- compression/src/main/java/module-info.java | 1 + 1 file changed, 1 insertion(+) diff --git a/compression/src/main/java/module-info.java b/compression/src/main/java/module-info.java index 113a1dba9d..13bae6951d 100644 --- a/compression/src/main/java/module-info.java +++ b/compression/src/main/java/module-info.java @@ -24,6 +24,7 @@ requires org.apache.arrow.memory.core; requires org.apache.arrow.vector; requires org.apache.commons.compress; + requires snappy.java; // Also defined under META-INF/services to support non-modular applications provides CompressionCodec.Factory with