diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..24cb925ab7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1150,6 +1150,7 @@ public PageReadStore readNextRowGroup() throws IOException { if (rowGroup == null) { return null; } + closeCurrentRowGroup(); this.currentRowGroup = rowGroup; // avoid re-reading bytes the dictionary reader is used after this call if (nextDictionaryReader != null) { @@ -1406,6 +1407,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { return readNextRowGroup(); } + closeCurrentRowGroup(); this.currentRowGroup = internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock)); // avoid re-reading bytes the dictionary reader is used after this call @@ -1819,11 +1821,19 @@ public void close() throws IOException { f.close(); } } finally { - AutoCloseables.uncheckedClose(nextDictionaryReader, crcAllocator); + AutoCloseables.uncheckedClose(currentRowGroup, nextDictionaryReader, crcAllocator); + currentRowGroup = null; options.getCodecFactory().release(); } } + private void closeCurrentRowGroup() { + if (currentRowGroup != null) { + currentRowGroup.close(); + currentRowGroup = null; + } + } + /* * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a * result of the column-index based filtering when some pages might be skipped at reading. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderBufferLeak.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderBufferLeak.java new file mode 100644 index 0000000000..6ddb52eb37 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderBufferLeak.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for GH-3487: Verify that PageReadStore buffers are properly released when + * reading multiple row groups sequentially. Before the fix, calling readNextRowGroup() + * or readNextFilteredRowGroup() multiple times would leak the ByteBuffers of previous + * row groups because the old currentRowGroup was replaced without being closed first. + *

+ * These tests use {@link TrackingByteBufferAllocator} which throws + * {@link TrackingByteBufferAllocator.LeakedByteBufferException} if any allocated + * ByteBuffers remain unreleased when the allocator is closed. + */ +public class TestParquetFileReaderBufferLeak { + + private static final MessageType SCHEMA = Types.buildMessage() + .required(INT64) + .named("id") + .required(INT64) + .named("value") + .named("msg"); + + private static final Configuration CONF = new Configuration(); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + /** + * Helper: write a parquet file with multiple row groups using the high-level writer API. + * Uses a small page size and row group size to ensure multiple row groups are created. + * Returns the path and the number of row groups written. + */ + private Path writeMultiRowGroupFile(int numRecords) throws IOException { + GroupWriteSupport.setSchema(SCHEMA, CONF); + + File testFile = temp.newFile(); + testFile.delete(); + Path path = new Path(testFile.toURI()); + + try (TrackingByteBufferAllocator writeAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + try (ParquetWriter writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(path, CONF)) + .withAllocator(writeAllocator) + .withConf(CONF) + .withType(SCHEMA) + .withWriteMode(OVERWRITE) + .withRowGroupSize(200) // Small row group size to force multiple row groups + .withPageSize(100) // Small page size + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + for (int i = 0; i < numRecords; i++) { + writer.write(factory.newGroup().append("id", (long) i).append("value", (long) (i * 10))); + } + } + } + + return path; + } + + /** + * Helper to verify a file has the expected number of row groups. + */ + private int getRowGroupCount(Path path) throws IOException { + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + return reader.getFooter().getBlocks().size(); + } + } + + /** + * Verify that reading all row groups via readNextRowGroup() does not leak buffers. + * Before the fix, each call to readNextRowGroup() would replace currentRowGroup + * without closing the previous one, leaking its ByteBuffers. + */ + @Test + public void testReadNextRowGroupReleasesBuffersOfPreviousRowGroup() throws Exception { + Path path = writeMultiRowGroupFile(500); + int expectedRowGroups = getRowGroupCount(path); + assertTrue("Expected multiple row groups but got " + expectedRowGroups, expectedRowGroups > 1); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(readAllocator).build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + PageReadStore pages; + int rowGroupCount = 0; + while ((pages = reader.readNextRowGroup()) != null) { + assertNotNull(pages); + rowGroupCount++; + } + assertEquals(expectedRowGroups, rowGroupCount); + } + // After reader.close(), all buffers including the last row group's should be released. + // readAllocator.close() will throw LeakedByteBufferException if any leak remains. + } + } + + /** + * Verify that close() releases the buffers of the last row group read via readNextRowGroup(). + * This tests the case where only some row groups are read before the reader is closed. + */ + @Test + public void testCloseReleasesCurrentRowGroupBuffers() throws Exception { + Path path = writeMultiRowGroupFile(500); + int expectedRowGroups = getRowGroupCount(path); + assertTrue("Expected multiple row groups but got " + expectedRowGroups, expectedRowGroups > 1); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(readAllocator).build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + // Read only the first row group + PageReadStore pages = reader.readNextRowGroup(); + assertNotNull(pages); + assertTrue(pages.getRowCount() > 0); + // Don't read remaining row groups - close should release the current row group's buffers + } + } + } + + /** + * Verify that readNextFilteredRowGroup() releases buffers of the previous row group + * when the filter does not trigger column-index filtering (falls back to readNextRowGroup). + */ + @Test + public void testReadNextFilteredRowGroupReleasesBuffersWhenNoFilter() throws Exception { + Path path = writeMultiRowGroupFile(500); + int expectedRowGroups = getRowGroupCount(path); + assertTrue("Expected multiple row groups but got " + expectedRowGroups, expectedRowGroups > 1); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + // No record filter configured -> readNextFilteredRowGroup falls back to readNextRowGroup + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(readAllocator).build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + PageReadStore pages; + int rowGroupCount = 0; + while ((pages = reader.readNextFilteredRowGroup()) != null) { + assertNotNull(pages); + rowGroupCount++; + } + assertEquals(expectedRowGroups, rowGroupCount); + } + } + } + + /** + * Verify that readNextFilteredRowGroup() with a filter and column-index filtering enabled + * still properly releases previous row group buffers. The filter is set to match all rows, + * so it falls back to readNextRowGroup via the all-rows-match path. + */ + @Test + public void testReadNextFilteredRowGroupReleasesBuffersWithFilter() throws Exception { + Path path = writeMultiRowGroupFile(500); + int expectedRowGroups = getRowGroupCount(path); + assertTrue("Expected multiple row groups but got " + expectedRowGroups, expectedRowGroups > 1); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + // Use a filter that matches all rows: id > -1 (all values are >= 0) + FilterCompat.Filter filter = FilterCompat.get(FilterApi.gt(FilterApi.longColumn("id"), -1L)); + ParquetReadOptions options = ParquetReadOptions.builder() + .withAllocator(readAllocator) + .withRecordFilter(filter) + .useColumnIndexFilter(true) + .useStatsFilter(true) + .build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + PageReadStore pages; + int rowGroupCount = 0; + while ((pages = reader.readNextFilteredRowGroup()) != null) { + assertNotNull(pages); + rowGroupCount++; + } + // All row groups should be read since the filter matches everything + assertEquals(expectedRowGroups, rowGroupCount); + } + } + } + + /** + * Verify that readNextFilteredRowGroup() with a filter that eliminates some rows + * through column-index filtering releases buffers properly. This exercises the + * internalReadFilteredRowGroup code path where rowCount != block.getRowCount(). + */ + @Test + public void testReadNextFilteredRowGroupWithColumnIndexFilteringReleasesBuffers() throws Exception { + Path path = writeMultiRowGroupFile(500); + int totalRowGroups = getRowGroupCount(path); + assertTrue("Expected multiple row groups but got " + totalRowGroups, totalRowGroups > 1); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + // Use a filter that eliminates some rows based on statistics/column index: + // id < 10 should only match early row groups and exclude pages with higher values + FilterCompat.Filter filter = FilterCompat.get(FilterApi.lt(FilterApi.longColumn("id"), 10L)); + ParquetReadOptions options = ParquetReadOptions.builder() + .withAllocator(readAllocator) + .withRecordFilter(filter) + .useColumnIndexFilter(true) + .useStatsFilter(true) + .build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + PageReadStore pages; + int rowGroupCount = 0; + while ((pages = reader.readNextFilteredRowGroup()) != null) { + assertNotNull(pages); + assertTrue(pages.getRowCount() > 0); + rowGroupCount++; + } + // At least some row groups should be returned (those containing id < 10) + assertTrue("Expected at least 1 row group returned", rowGroupCount >= 1); + } + } + } + + /** + * Verify that buffers are released even when only some row groups are read + * and the reader is closed without reading to the end. + */ + @Test + public void testPartialReadThenCloseReleasesBuffers() throws Exception { + Path path = writeMultiRowGroupFile(500); + int expectedRowGroups = getRowGroupCount(path); + assertTrue("Expected at least 3 row groups but got " + expectedRowGroups, expectedRowGroups >= 3); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(readAllocator).build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + // Read row group 1 + PageReadStore pages = reader.readNextRowGroup(); + assertNotNull(pages); + + // Read row group 2 (should release row group 1 buffers) + pages = reader.readNextRowGroup(); + assertNotNull(pages); + + // Close without reading remaining row groups - close() should release row group 2 buffers + } + } + } + + /** + * Verify that calling readNextRowGroup after all groups are exhausted doesn't cause issues. + * The null return when there are no more row groups should not leak any buffers. + */ + @Test + public void testExhaustedReaderNoLeak() throws Exception { + Path path = writeMultiRowGroupFile(500); + + try (TrackingByteBufferAllocator readAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(readAllocator).build(); + InputFile inputFile = HadoopInputFile.fromPath(path, CONF); + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + // Read all row groups + while (reader.readNextRowGroup() != null) { + // drain + } + // Additional calls should return null without leaking + assertNull(reader.readNextRowGroup()); + assertNull(reader.readNextRowGroup()); + } + } + } +}