diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index f7a41a3178aa..89460900e21d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -1171,9 +1171,13 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare if (getCacheOnWrite()) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); - cache.cacheBlock( - new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()), - blockForCaching); + try { + cache.cacheBlock( + new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()), + blockForCaching); + } finally { + blockForCaching.release(); + } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index b8b147f32fe5..b635c2cfec63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -578,11 +578,11 @@ private void writeInlineBlocks(boolean closing) throws IOException { private void doCacheOnWrite(long offset) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); - BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); - if (!shouldCacheBlock(cache, key)) { - return; - } try { + BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); + if (!shouldCacheBlock(cache, key)) { + return; + } cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index fbc7f3bedd1d..bb5ab50dc143 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -38,8 +38,10 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -93,6 +96,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; + /** * test hfile features. */ @@ -202,6 +207,60 @@ public void testReaderWithLRUBlockCache() throws Exception { lru.shutdown(); } + @Test + public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception { + int bufCount = 32; + int blockSize = 4 * 1024; + ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); + fillByteBuffAllocator(alloc, bufCount); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + Configuration myConf = HBaseConfiguration.create(conf); + myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true); + myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false); + myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false); + final AtomicInteger counter = new AtomicInteger(); + RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() { + @Override + public void onLeak(String s, String s1) { + counter.incrementAndGet(); + } + }); + BlockCache cache = Mockito.mock(BlockCache.class); + Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any())) + .thenReturn(Optional.of(false)); + Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak"); + HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build(); + + try { + Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc)) + .withPath(fs, hfilePath).withFileContext(context).create(); + try { + writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"), + HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"))); + } finally { + writer.close(); + } + + Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(), + Mockito.anyBoolean(), Mockito.anyBoolean()); + for (int i = 0; i < 30 && counter.get() == 0; i++) { + System.gc(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + alloc.allocate(128 * 1024).release(); + } + assertEquals(0, counter.get()); + } finally { + fs.delete(hfilePath, false); + alloc.clean(); + } + } + private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception { assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 7ad83ba99cb2..c257b1967962 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -207,13 +207,78 @@ public void onLeak(String s, String s1) { new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, path.getName(), null); writeDataBlocksAndCreateIndex(hbw, outputStream, biw); + for (int i = 0; i < 30 && counter.get() == 0; i++) { + System.gc(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + allocator.allocate(128 * 1024).release(); + } + assertEquals(0, counter.get()); + } - System.gc(); - Thread.sleep(1000); + @Test + public void testIntermediateIndexCacheOnWriteDoesNotLeak() throws Exception { + Configuration localConf = new Configuration(TEST_UTIL.getConfiguration()); + localConf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + localConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true); + localConf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 4096); + localConf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 32); + localConf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); + ByteBuffAllocator allocator = ByteBuffAllocator.create(localConf, true); + List buffers = new ArrayList<>(); + for (int i = 0; i < allocator.getTotalBufferCount(); i++) { + buffers.add(allocator.allocateOneBuffer()); + assertEquals(0, allocator.getFreeBufferCount()); + } + buffers.forEach(ByteBuff::release); + assertEquals(allocator.getTotalBufferCount(), allocator.getFreeBufferCount()); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + final AtomicInteger counter = new AtomicInteger(); + RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() { + @Override + public void onLeak(String s, String s1) { + counter.incrementAndGet(); + } + }); - allocator.allocate(128 * 1024).release(); + Path localPath = new Path(TEST_UTIL.getDataTestDir(), + "block_index_testIntermediateIndexCacheOnWriteDoesNotLeak_" + compr); + HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true) + .withIncludesMvcc(includesMemstoreTS).withIncludesTags(true).withCompression(compr) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); + HFileBlock.Writer hbw = + new HFileBlock.Writer(localConf, null, meta, allocator, meta.getBlocksize()); + FSDataOutputStream outputStream = fs.create(localPath); + LruBlockCache cache = new LruBlockCache(8 * 1024 * 1024, 1024, true, localConf); + CacheConfig cacheConfig = new CacheConfig(localConf, null, cache, allocator); + HFileBlockIndex.BlockIndexWriter biw = + new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, localPath.getName(), null); + biw.setMaxChunkSize(512); - assertEquals(0, counter.get()); + try { + writeDataBlocksAndCreateIndex(hbw, outputStream, biw); + assertTrue(biw.getNumLevels() >= 3); + for (int i = 0; i < 30 && counter.get() == 0; i++) { + System.gc(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + allocator.allocate(128 * 1024).release(); + } + assertEquals(0, counter.get()); + } finally { + hbw.release(); + cache.shutdown(); + allocator.clean(); + fs.delete(localPath, false); + } } private void clear() throws IOException {