Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.arrow.vector.ipc.message;

import java.util.Map;

/** Interface for Arrow IPC messages (https://arrow.apache.org/docs/format/IPC.html). */
public interface ArrowMessage extends FBSerializable, AutoCloseable {

Expand All @@ -26,6 +28,15 @@ public interface ArrowMessage extends FBSerializable, AutoCloseable {
/** Returns the flatbuffer enum value indicating the type of the message. */
byte getMessageType();

/**
* Returns custom metadata for this message, or null if none.
*
* @return custom metadata map, or null if no custom metadata is present
*/
default Map<String, String> getCustomMetadata() {
return null;
}

/**
* Visitor interface for implementations of {@link ArrowMessage}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.flatbuffers.FlatBufferBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.ArrowBuf;
Expand Down Expand Up @@ -52,6 +54,8 @@ public class ArrowRecordBatch implements ArrowMessage {

private final List<Long> variadicBufferCounts;

private final Map<String, String> customMetadata;

private boolean closed = false;

public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
Expand All @@ -66,6 +70,30 @@ public ArrowRecordBatch(
this(length, nodes, buffers, bodyCompression, null, true);
}

/**
* Construct a record batch from nodes with custom metadata.
*
* @param length how many rows in this batch
* @param nodes field level info
* @param buffers will be retained until this recordBatch is closed
* @param customMetadata custom metadata for this record batch
*/
public ArrowRecordBatch(
int length,
List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
Map<String, String> customMetadata) {
this(
length,
nodes,
buffers,
NoCompressionCodec.DEFAULT_BODY_COMPRESSION,
null,
true,
true,
customMetadata);
}

/**
* Construct a record batch from nodes.
*
Expand Down Expand Up @@ -132,6 +160,36 @@ public ArrowRecordBatch(
true);
}

/**
* Construct a record batch from nodes.
*
* @param length how many rows in this batch
* @param nodes field level info
* @param buffers will be retained until this recordBatch is closed
* @param bodyCompression compression info.
* @param variadicBufferCounts the number of buffers in each variadic section.
* @param alignBuffers Whether to align buffers to an 8 byte boundary.
* @param customMetadata custom metadata for this record batch.
*/
public ArrowRecordBatch(
int length,
List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression,
List<Long> variadicBufferCounts,
boolean alignBuffers,
Map<String, String> customMetadata) {
this(
length,
nodes,
buffers,
bodyCompression,
variadicBufferCounts,
alignBuffers, /*retainBuffers*/
true,
customMetadata);
}

/**
* Construct a record batch from nodes.
*
Expand All @@ -152,13 +210,50 @@ public ArrowRecordBatch(
List<Long> variadicBufferCounts,
boolean alignBuffers,
boolean retainBuffers) {
this(
length,
nodes,
buffers,
bodyCompression,
variadicBufferCounts,
alignBuffers,
retainBuffers,
null);
}

/**
* Construct a record batch from nodes.
*
* @param length how many rows in this batch
* @param nodes field level info
* @param buffers will be retained until this recordBatch is closed
* @param bodyCompression compression info.
* @param variadicBufferCounts the number of buffers in each variadic section.
* @param alignBuffers Whether to align buffers to an 8 byte boundary.
* @param retainBuffers Whether to retain() each source buffer in the constructor. If false, the
* caller is responsible for retaining the buffers beforehand.
* @param customMetadata custom metadata for this record batch.
*/
public ArrowRecordBatch(
int length,
List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression,
List<Long> variadicBufferCounts,
boolean alignBuffers,
boolean retainBuffers,
Map<String, String> customMetadata) {
super();
this.length = length;
this.nodes = nodes;
this.buffers = buffers;
Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null");
this.bodyCompression = bodyCompression;
this.variadicBufferCounts = variadicBufferCounts;
this.customMetadata =
customMetadata == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(customMetadata));
List<ArrowBuffer> arrowBuffers = new ArrayList<>(buffers.size());
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
Expand Down Expand Up @@ -188,13 +283,18 @@ private ArrowRecordBatch(
List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression,
List<Long> variadicBufferCounts) {
List<Long> variadicBufferCounts,
Map<String, String> customMetadata) {
this.length = length;
this.nodes = nodes;
this.buffers = buffers;
Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null");
this.bodyCompression = bodyCompression;
this.variadicBufferCounts = variadicBufferCounts;
this.customMetadata =
customMetadata == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(customMetadata));
this.closed = false;
List<ArrowBuffer> arrowBuffers = new ArrayList<>();
long offset = 0;
Expand All @@ -218,6 +318,16 @@ public ArrowBodyCompression getBodyCompression() {
return bodyCompression;
}

/**
* Get the custom metadata for this record batch.
*
* @return the custom metadata as an unmodifiable map
*/
@Override
public Map<String, String> getCustomMetadata() {
return customMetadata;
}

/**
* Get the nodes in this record batch.
*
Expand Down Expand Up @@ -268,7 +378,7 @@ public ArrowRecordBatch cloneWithTransfer(final BufferAllocator allocator) {
.collect(Collectors.toList());
close();
return new ArrowRecordBatch(
false, length, nodes, newBufs, bodyCompression, variadicBufferCounts);
false, length, nodes, newBufs, bodyCompression, variadicBufferCounts, customMetadata);
}

/**
Expand Down Expand Up @@ -346,6 +456,7 @@ public String toString() {
if (variadicBufferCounts != null && !variadicBufferCounts.isEmpty()) {
variadicBufCount = variadicBufferCounts.size();
}
String meta = customMetadata.isEmpty() ? "" : "(metadata: " + customMetadata + ")";
return "ArrowRecordBatch [length="
+ length
+ ", nodes="
Expand All @@ -358,7 +469,8 @@ public String toString() {
+ buffersLayout
+ ", closed="
+ closed
+ "]";
+ "]"
+ meta;
}

/** Computes the size of the serialized body for this recordBatch. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.flatbuf.Buffer;
import org.apache.arrow.flatbuf.DictionaryBatch;
import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.KeyValue;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.MetadataVersion;
Expand Down Expand Up @@ -326,7 +329,12 @@ public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption write
FlatBufferBuilder builder = new FlatBufferBuilder();
int batchOffset = message.writeTo(builder);
return serializeMessage(
builder, message.getMessageType(), batchOffset, message.computeBodyLength(), writeOption);
builder,
message.getMessageType(),
batchOffset,
message.computeBodyLength(),
writeOption,
message.getCustomMetadata());
}

/**
Expand All @@ -340,7 +348,8 @@ public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption write
public static ArrowRecordBatch deserializeRecordBatch(
Message recordBatchMessage, ArrowBuf bodyBuffer) throws IOException {
RecordBatch recordBatchFB = (RecordBatch) recordBatchMessage.header(new RecordBatch());
return deserializeRecordBatch(recordBatchFB, bodyBuffer);
return deserializeRecordBatch(
recordBatchFB, bodyBuffer, deserializeCustomMetadata(recordBatchMessage));
}

/**
Expand Down Expand Up @@ -398,7 +407,7 @@ public static ArrowRecordBatch deserializeRecordBatch(
// Now read the body
final ArrowBuf body =
buffer.slice(block.getMetadataLength(), totalLen - block.getMetadataLength());
return deserializeRecordBatch(recordBatchFB, body);
return deserializeRecordBatch(recordBatchFB, body, deserializeCustomMetadata(messageFB));
}

/**
Expand All @@ -411,6 +420,23 @@ public static ArrowRecordBatch deserializeRecordBatch(
*/
public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf body)
throws IOException {
// RecordBatch is not encapsulated in a Message, so there is no custom metadata
return deserializeRecordBatch(recordBatchFB, body, null);
}

/**
* Deserializes an ArrowRecordBatch given the Flatbuffer metadata, in-memory body, and custom
* metadata.
*
* @param recordBatchFB Deserialized FlatBuffer record batch
* @param body Read body of the record batch
* @param customMetadata Custom metadata from the Message
* @return ArrowRecordBatch from metadata and in-memory body
* @throws IOException on error
*/
public static ArrowRecordBatch deserializeRecordBatch(
RecordBatch recordBatchFB, ArrowBuf body, Map<String, String> customMetadata)
throws IOException {
// Now read the body
int nodesLength = recordBatchFB.nodesLength();
List<ArrowFieldNode> nodes = new ArrayList<>();
Expand Down Expand Up @@ -452,7 +478,8 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB,
buffers,
bodyCompression,
variadicBufferCounts,
/*alignBuffers*/ true);
/*alignBuffers*/ true,
customMetadata);
body.getReferenceManager().release();
return arrowRecordBatch;
}
Expand Down Expand Up @@ -676,11 +703,37 @@ public static ByteBuffer serializeMessage(
int headerOffset,
long bodyLength,
IpcOption writeOption) {
return serializeMessage(builder, headerType, headerOffset, bodyLength, writeOption, null);
}

/**
* Serializes an Arrow message with metadata and custom metadata into a ByteBuffer.
*
* @param builder to write the flatbuf to
* @param headerType the type of the header
* @param headerOffset the offset in the buffer where the header starts
* @param bodyLength the length of the body
* @param writeOption IPC write options
* @param customMetadata custom metadata to attach to the message
* @return the corresponding ByteBuffer
*/
public static ByteBuffer serializeMessage(
FlatBufferBuilder builder,
byte headerType,
int headerOffset,
long bodyLength,
IpcOption writeOption,
Map<String, String> customMetadata) {
int customMetadataOffset = getCustomMetadataOffset(builder, customMetadata);

Message.startMessage(builder);
Message.addHeaderType(builder, headerType);
Message.addHeader(builder, headerOffset);
Message.addVersion(builder, writeOption.metadataVersion.toFlatbufID());
Message.addBodyLength(builder, bodyLength);
if (customMetadataOffset != 0) {
Message.addCustomMetadata(builder, customMetadataOffset);
}
builder.finish(Message.endMessage(builder));
return builder.dataBuffer();
}
Expand Down Expand Up @@ -754,4 +807,34 @@ public static ArrowBuf readMessageBody(ReadChannel in, long bodyLength, BufferAl
}
return bodyBuffer;
}

private static Map<String, String> deserializeCustomMetadata(Message message) {
if (message.customMetadataLength() == 0) {
return null;
}
Map<String, String> customMetadata = new HashMap<>();
for (int i = 0; i < message.customMetadataLength(); i++) {
KeyValue kv = message.customMetadata(i);
String key = kv.key();
String value = kv.value();
customMetadata.put(key == null ? "" : key, value == null ? "" : value);
}
return customMetadata;
}

private static int getCustomMetadataOffset(
FlatBufferBuilder builder, Map<String, String> customMetadata) {
int customMetadataOffset = 0;
if (customMetadata != null && !customMetadata.isEmpty()) {
int[] metadataOffsets = new int[customMetadata.size()];
int i = 0;
for (Map.Entry<String, String> entry : customMetadata.entrySet()) {
int keyOffset = builder.createString(entry.getKey());
int valueOffset = builder.createString(entry.getValue());
metadataOffsets[i++] = KeyValue.createKeyValue(builder, keyOffset, valueOffset);
}
customMetadataOffset = Message.createCustomMetadataVector(builder, metadataOffsets);
}
return customMetadataOffset;
}
}
Loading
Loading