diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java b/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java
new file mode 100644
index 000000000000..89677a86e7a8
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import java.util.Map;
+
+/** Writer capability for adding format-specific file metadata before closing the file. */
+public interface SupportsWriterMetadata {
+
+ /**
+ * Adds raw metadata entries to the file footer.
+ *
+ *
This method must be called before {@link FormatWriter#close()}.
+ */
+ void addMetadata(Map metadata);
+}
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index b4136797b8d7..8927cbc77b20 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -47,6 +47,12 @@ under the License.
provided
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
org.xerial.snappy
snappy-java
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java
new file mode 100644
index 000000000000..18f29796ea50
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Utilities for format metadata encoded at file boundaries. */
+public class FormatMetadataUtils {
+
+ public static final String ARROW_SCHEMA_METADATA_KEY = "ARROW:schema";
+
+ private FormatMetadataUtils() {}
+
+ public static Map encodeMetadata(Map metadata) {
+ Map encoded = new LinkedHashMap<>();
+ for (Map.Entry entry : metadata.entrySet()) {
+ encoded.put(entry.getKey(), Base64.getEncoder().encodeToString(entry.getValue()));
+ }
+ return encoded;
+ }
+
+ /**
+ * Decodes base64-encoded metadata values. Values that are not valid base64 are returned as
+ * UTF-8 bytes.
+ */
+ public static Map decodeMetadata(Map metadata) {
+ Map decoded = new LinkedHashMap<>();
+ for (Map.Entry entry : metadata.entrySet()) {
+ try {
+ decoded.put(entry.getKey(), Base64.getDecoder().decode(entry.getValue()));
+ } catch (IllegalArgumentException e) {
+ decoded.put(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return decoded;
+ }
+
+ public static Optional readArrowSchema(@Nullable String encodedSchema) {
+ if (encodedSchema == null) {
+ return Optional.empty();
+ }
+ byte[] schemaBytes = Base64.getDecoder().decode(encodedSchema);
+ return Optional.of(Schema.deserializeMessage(ByteBuffer.wrap(schemaBytes)));
+ }
+
+ /** Returns metadata for top-level Arrow fields only. */
+ public static Map> readFieldMetadata(Schema arrowSchema) {
+ Map> result = new LinkedHashMap<>();
+ for (Field field : arrowSchema.getFields()) {
+ result.put(
+ field.getName(),
+ Collections.unmodifiableMap(new LinkedHashMap<>(field.getMetadata())));
+ }
+ return Collections.unmodifiableMap(result);
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java b/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java
new file mode 100644
index 000000000000..43e525ddd90b
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Reader capability for formats that can recover Arrow schema from file metadata. */
+public interface SupportsReaderArrowSchema {
+
+ Optional readArrowSchema() throws IOException;
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 81bfdd318535..c0010b26ce15 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -24,8 +24,10 @@
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
+import org.apache.paimon.format.FormatMetadataUtils;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.OrcFormatReaderContext;
+import org.apache.paimon.format.SupportsReaderArrowSchema;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.FileIO;
@@ -39,6 +41,7 @@
import org.apache.paimon.utils.Pool;
import org.apache.paimon.utils.RoaringBitmap32;
+import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -54,7 +57,9 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Optional;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
@@ -104,7 +109,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
Pool poolOfBatches =
createPoolOfBatches(context.filePath(), poolSize, context.fileIO());
- RecordReader orcReader =
+ OrcRecordReader orcReader =
createRecordReader(
hadoopConfig,
schema,
@@ -224,12 +229,14 @@ private ColumnarRowIterator convertAndGetIterator(
* batch is addressed by the starting row number of the batch, plus the number of records to be
* skipped before.
*/
- private static final class OrcVectorizedReader implements FileRecordReader {
+ private static final class OrcVectorizedReader
+ implements FileRecordReader, SupportsReaderArrowSchema {
- private final RecordReader orcReader;
+ private final OrcRecordReader orcReader;
private final Pool pool;
- private OrcVectorizedReader(final RecordReader orcReader, final Pool pool) {
+ private OrcVectorizedReader(
+ final OrcRecordReader orcReader, final Pool pool) {
this.orcReader = checkNotNull(orcReader, "orcReader");
this.pool = checkNotNull(pool, "pool");
}
@@ -240,8 +247,8 @@ public ColumnarRowIterator readBatch() throws IOException {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();
- long rowNumber = orcReader.getRowNumber();
- if (!nextBatch(orcReader, orcVectorBatch)) {
+ long rowNumber = orcReader.recordReader.getRowNumber();
+ if (!nextBatch(orcReader.recordReader, orcVectorBatch)) {
batch.recycle();
return null;
}
@@ -249,9 +256,31 @@ public ColumnarRowIterator readBatch() throws IOException {
return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
}
+ @Override
+ public Optional readArrowSchema() {
+ org.apache.orc.Reader fileReader = orcReader.fileReader;
+ if (!fileReader
+ .getMetadataKeys()
+ .contains(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)) {
+ return Optional.empty();
+ }
+ return FormatMetadataUtils.readArrowSchema(
+ StandardCharsets.UTF_8
+ .decode(
+ fileReader
+ .getMetadataValue(
+ FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)
+ .duplicate())
+ .toString());
+ }
+
@Override
public void close() throws IOException {
- orcReader.close();
+ try {
+ orcReader.recordReader.close();
+ } finally {
+ orcReader.fileReader.close();
+ }
}
private OrcReaderBatch getCachedEntry() throws IOException {
@@ -264,7 +293,18 @@ private OrcReaderBatch getCachedEntry() throws IOException {
}
}
- private static RecordReader createRecordReader(
+ private static final class OrcRecordReader {
+
+ private final org.apache.orc.Reader fileReader;
+ private final RecordReader recordReader;
+
+ private OrcRecordReader(org.apache.orc.Reader fileReader, RecordReader recordReader) {
+ this.fileReader = fileReader;
+ this.recordReader = recordReader;
+ }
+ }
+
+ private static OrcRecordReader createRecordReader(
org.apache.hadoop.conf.Configuration conf,
TypeDescription schema,
List conjunctPredicates,
@@ -313,7 +353,7 @@ private static RecordReader createRecordReader(
// assign ids
schema.getId();
- return orcRowsReader;
+ return new OrcRecordReader(orcReader, orcRowsReader);
} catch (IOException e) {
// exception happened, we need to close the reader
IOUtils.closeQuietly(orcReader);
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
index c44e3f26d671..f4cdbaa33f13 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
@@ -20,7 +20,9 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatMetadataUtils;
import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.SupportsWriterMetadata;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.MemorySize;
@@ -28,19 +30,27 @@
import org.apache.orc.Writer;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** A {@link FormatWriter} implementation that writes data in ORC format. */
-public class OrcBulkWriter implements FormatWriter {
+public class OrcBulkWriter implements FormatWriter, SupportsWriterMetadata {
private final Writer writer;
private final Vectorizer vectorizer;
private final VectorizedRowBatch rowBatch;
private final PositionOutputStream underlyingStream;
+ private final Map metadata;
private long currentBatchMemoryUsage = 0;
private final long memoryLimit;
+ private boolean closed = false;
public OrcBulkWriter(
Vectorizer vectorizer,
@@ -54,6 +64,16 @@ public OrcBulkWriter(
this.rowBatch = vectorizer.getSchema().createRowBatch(batchSize);
this.underlyingStream = underlyingStream;
this.memoryLimit = memoryLimit.getBytes();
+ this.metadata = new HashMap<>();
+ }
+
+ @Override
+ public void addMetadata(Map metadata) {
+ checkState(!closed, "Cannot add metadata after writer is closed.");
+ for (Map.Entry entry : metadata.entrySet()) {
+ this.metadata.put(
+ entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length));
+ }
}
@Override
@@ -75,7 +95,14 @@ private void flush() throws IOException {
@Override
public void close() throws IOException {
flush();
+ for (Map.Entry entry :
+ FormatMetadataUtils.encodeMetadata(metadata).entrySet()) {
+ writer.addUserMetadata(
+ entry.getKey(),
+ ByteBuffer.wrap(entry.getValue().getBytes(StandardCharsets.UTF_8)));
+ }
writer.close();
+ this.closed = true;
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
index 282805897a5f..7747be7a3b0b 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
@@ -34,6 +34,8 @@
import org.apache.parquet.io.OutputFile;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
/** A factory that creates a Parquet {@link FormatWriter}. */
public class ParquetWriterFactory implements FormatWriterFactory, SupportsVariantInference {
@@ -57,8 +59,10 @@ public FormatWriter create(PositionOutputStream stream, String compression) thro
compression = null;
}
- final ParquetWriter writer = writerBuilder.createWriter(out, compression);
- return new ParquetBulkWriter(writer);
+ Map metadata = new HashMap<>();
+ final ParquetWriter writer =
+ writerBuilder.createWriter(out, compression, () -> metadata);
+ return new ParquetBulkWriter(writer, metadata);
}
@Override
@@ -73,7 +77,9 @@ public FormatWriter createWithShreddingSchema(
ParquetBuilder newBuilder =
((RowDataParquetBuilder) writerBuilder)
.withShreddingSchemas(inferredShreddingSchema);
- final ParquetWriter writer = newBuilder.createWriter(out, compression);
- return new ParquetBulkWriter(writer);
+ Map metadata = new HashMap<>();
+ final ParquetWriter writer =
+ newBuilder.createWriter(out, compression, () -> metadata);
+ return new ParquetBulkWriter(writer, metadata);
}
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
index af63a7d9f96b..047027538628 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
@@ -20,6 +20,8 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.FormatMetadataUtils;
+import org.apache.paimon.format.SupportsReaderArrowSchema;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
import org.apache.paimon.fs.FileIO;
@@ -41,6 +43,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -48,7 +51,8 @@
import static org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVectors;
/** Record reader for parquet. */
-public class VectorizedParquetRecordReader implements FileRecordReader {
+public class VectorizedParquetRecordReader
+ implements FileRecordReader, SupportsReaderArrowSchema {
private ParquetFileReader reader;
@@ -269,6 +273,16 @@ private void initColumnReader(PageReadStore pages, ParquetColumnVector cv) throw
}
}
+ @Override
+ public Optional readArrowSchema()
+ throws IOException {
+ return FormatMetadataUtils.readArrowSchema(
+ reader.getFooter()
+ .getFileMetaData()
+ .getKeyValueMetaData()
+ .get(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY));
+ }
+
@Override
public void close() throws IOException {
if (reader != null) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java
index 808febd9a2cc..e1dc534969ee 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Supplier;
/**
* A builder to create a {@link ParquetWriter} from a Parquet {@link OutputFile}.
@@ -34,4 +36,11 @@ public interface ParquetBuilder extends Serializable {
/** Creates and configures a parquet writer to the given output file. */
ParquetWriter createWriter(OutputFile out, String compression) throws IOException;
+
+ default ParquetWriter createWriter(
+ OutputFile out, String compression, Supplier