diff --git a/compression/pom.xml b/compression/pom.xml index 29f8b41788..81af86ef0f 100644 --- a/compression/pom.xml +++ b/compression/pom.xml @@ -57,5 +57,10 @@ under the License. zstd-jni 1.5.7-6 + + org.xerial.snappy + snappy-java + 1.1.10.7 + diff --git a/compression/src/main/java/module-info.java b/compression/src/main/java/module-info.java index 113a1dba9d..13bae6951d 100644 --- a/compression/src/main/java/module-info.java +++ b/compression/src/main/java/module-info.java @@ -24,6 +24,7 @@ requires org.apache.arrow.memory.core; requires org.apache.arrow.vector; requires org.apache.commons.compress; + requires snappy.java; // Also defined under META-INF/services to support non-modular applications provides CompressionCodec.Factory with diff --git a/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java index f15c139df1..6ab2603922 100644 --- a/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java +++ b/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -33,6 +33,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { switch (codecType) { case LZ4_FRAME: return new Lz4CompressionCodec(); + case SNAPPY: + return new SnappyCompressionCodec(); case ZSTD: return new ZstdCompressionCodec(); default: @@ -45,6 +47,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com switch (codecType) { case LZ4_FRAME: return new Lz4CompressionCodec(); + case SNAPPY: + return new SnappyCompressionCodec(); case ZSTD: return new ZstdCompressionCodec(compressionLevel); default: diff --git a/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java new file mode 100644 index 0000000000..161d7e10d8 --- /dev/null +++ b/compression/src/main/java/org/apache/arrow/compression/SnappyCompressionCodec.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.compression; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.xerial.snappy.Snappy; + +/** Compression codec for the Snappy algorithm. */ +public class SnappyCompressionCodec extends AbstractCompressionCodec { + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument( + uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit %s.", + Integer.MAX_VALUE); + + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + uncompressedBuffer.getBytes(/* index= */ 0, inBytes); + + final byte[] outBytes; + try { + outBytes = Snappy.compress(inBytes); + } catch (Exception e) { + throw new RuntimeException("Error compressing with Snappy", e); + } + + ArrowBuf compressedBuffer = + allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument( + compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit %s", + Integer.MAX_VALUE); + + long decompressedLength = readUncompressedLength(compressedBuffer); + + byte[] inBytes = + new byte + [(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; + compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes); + + final byte[] outBytes; + try { + outBytes = Snappy.uncompress(inBytes); + } catch (Exception e) { + throw new RuntimeException("Error decompressing with Snappy", e); + } + + if (outBytes.length != decompressedLength) { + throw new RuntimeException( + "Expected != actual decompressed length: " + + decompressedLength + + " != " + + outBytes.length); + } + + ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength); + decompressedBuffer.setBytes(/* index= */ 0, outBytes); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.SNAPPY; + } +} diff --git a/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java index b8fb4e28b9..d326e11b6d 100644 --- a/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java +++ b/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -88,6 +88,9 @@ static Collection codecs() { CompressionCodec lz4Codec = new Lz4CompressionCodec(); params.add(Arguments.arguments(len, lz4Codec)); + CompressionCodec snappyCodec = new SnappyCompressionCodec(); + params.add(Arguments.arguments(len, snappyCodec)); + CompressionCodec zstdCodec = new ZstdCompressionCodec(); params.add(Arguments.arguments(len, zstdCodec)); diff --git a/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java b/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java index 7b3b27701b..7aec4fb255 100644 --- a/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java +++ b/format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java @@ -21,8 +21,9 @@ public final class CompressionType { private CompressionType() { } public static final byte LZ4_FRAME = 0; public static final byte ZSTD = 1; + public static final byte SNAPPY = 2; - public static final String[] names = { "LZ4_FRAME", "ZSTD", }; + public static final String[] names = { "LZ4_FRAME", "ZSTD", "SNAPPY", }; public static String name(int e) { return names[e]; } } diff --git a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 03763611e0..4a26e356ae 100644 --- a/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -33,7 +33,9 @@ public enum CodecType { LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME), - ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD); + ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD), + + SNAPPY(org.apache.arrow.flatbuf.CompressionType.SNAPPY); private final byte type; diff --git a/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 4debce335a..e699febc4b 100644 --- a/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -59,6 +59,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { case NO_COMPRESSION: return NoCompressionCodec.INSTANCE; case LZ4_FRAME: + case SNAPPY: case ZSTD: throw new IllegalArgumentException( "Please add arrow-compression module to use CommonsCompressionFactory for "