Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ default String getNameAsString() {

/** Returns current prefetch ratio of this region on this server */
float getCurrentRegionCachedRatio();

/**
* Returns the ratio of tiered cold data size to total region (HFiles) size on this server, in the
* range [0,1].
*/
float getCurrentRegionColdDataRatio();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio
.setUncompressedStoreFileSize(
new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE))
.setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build();
.setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio())
.setCurrentRegionColdDataRatio(regionLoadPB.getCurrentRegionColdDataRatio()).build();
}

private static List<ClusterStatusProtos.StoreSequenceId>
Expand Down Expand Up @@ -122,7 +123,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe
.setStoreUncompressedSizeMB(
(int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE))
.setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build();
.setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio())
.setCurrentRegionColdDataRatio(regionMetrics.getCurrentRegionColdDataRatio()).build();
}

public static RegionMetricsBuilder newBuilder(byte[] name) {
Expand Down Expand Up @@ -158,6 +160,7 @@ public static RegionMetricsBuilder newBuilder(byte[] name) {
private CompactionState compactionState;
private Size regionSizeMB = Size.ZERO;
private float currentRegionCachedRatio;
private float currentRegionColdDataRatio;

private RegionMetricsBuilder(byte[] name) {
this.name = name;
Expand Down Expand Up @@ -303,14 +306,20 @@ public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) {
return this;
}

public RegionMetricsBuilder setCurrentRegionColdDataRatio(float value) {
this.currentRegionColdDataRatio = value;
return this;
}

public RegionMetrics build() {
return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount,
maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize,
memStoreSize, indexSize, rootLevelIndexSize, uncompressedDataIndexSize, bloomFilterSize,
uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount,
filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality,
lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight,
blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio);
blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio,
currentRegionColdDataRatio);
}

private static class RegionMetricsImpl implements RegionMetrics {
Expand Down Expand Up @@ -343,6 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics {
private final CompactionState compactionState;
private final Size regionSizeMB;
private final float currentRegionCachedRatio;
private final float currentRegionColdDataRatio;

RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount,
int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount,
Expand All @@ -352,7 +362,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
long filteredReadRequestCount, long completedSequenceId, Map<byte[], Long> storeSequenceIds,
float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd,
long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight,
CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio) {
CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio,
float currentRegionColdDataRatio) {
this.name = Preconditions.checkNotNull(name);
this.storeCount = storeCount;
this.storeFileCount = storeFileCount;
Expand Down Expand Up @@ -382,6 +393,7 @@ private static class RegionMetricsImpl implements RegionMetrics {
this.compactionState = compactionState;
this.regionSizeMB = regionSizeMB;
this.currentRegionCachedRatio = currentRegionCachedRatio;
this.currentRegionColdDataRatio = currentRegionColdDataRatio;
}

@Override
Expand Down Expand Up @@ -529,6 +541,11 @@ public float getCurrentRegionCachedRatio() {
return currentRegionCachedRatio;
}

@Override
public float getCurrentRegionColdDataRatio() {
return currentRegionColdDataRatio;
}

@Override
public String toString() {
StringBuilder sb =
Expand Down Expand Up @@ -571,6 +588,7 @@ public String toString() {
Strings.appendKeyValue(sb, "compactionState", compactionState);
Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB);
Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio);
Strings.appendKeyValue(sb, "currentRegionColdDataRatio", currentRegionColdDataRatio);
return sb.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String CURRENT_REGION_CACHE_RATIO = "currentRegionCacheRatio";
String CURRENT_REGION_CACHE_RATIO_DESC = "The percentage of caching completed for this region.";

String CURRENT_REGION_COLD_DATA_RATIO = "currentRegionColdDataRatio";

String CURRENT_REGION_COLD_DATA_RATIO_DESC = "The percentage of data in this region that "
+ "is marked as cold by the configured time based priority logic.";

String EXCLUDE_DATA_NODES_COUNT = "excludedDataNodesCount";
String EXCLUDE_DATA_NODES_COUNT_DESC =
"Count of slow/connect error DataNodes excluded during WAL write operation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
Interns.info(regionNamePrefix + MetricsRegionServerSource.CURRENT_REGION_CACHE_RATIO,
MetricsRegionServerSource.CURRENT_REGION_CACHE_RATIO_DESC),
this.regionWrapper.getCurrentRegionCacheRatio());
mrb.addGauge(
Interns.info(regionNamePrefix + MetricsRegionServerSource.CURRENT_REGION_COLD_DATA_RATIO,
MetricsRegionServerSource.CURRENT_REGION_COLD_DATA_RATIO_DESC),
this.regionWrapper.getCurrentRegionColdDataRatio());
mrb.addCounter(
Interns.info(regionNamePrefix + MetricsRegionSource.COMPACTIONS_COMPLETED_COUNT,
MetricsRegionSource.COMPACTIONS_COMPLETED_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public interface MetricsRegionWrapper {
*/
float getCurrentRegionCacheRatio();

/**
* Gets the current cold data % ratio for this region.
*/
float getCurrentRegionColdDataRatio();

/**
* Get the total number of read requests that have been issued against this region
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public float getCurrentRegionCacheRatio() {
return 0;
}

@Override
public float getCurrentRegionColdDataRatio() {
return 0;
}

@Override
public long getReadRequestCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ message RegionLoad {

/** Current region cache ratio on this server */
optional float current_region_cached_ratio = 29;

/**
* Ratio of tiered cold data size to total region size (HFiles) on this server,
* in the range [0,1]. See DataTieringManager region cold data tracking.
*/
optional float current_region_cold_data_ratio = 30;
}

message UserLoad {
Expand Down Expand Up @@ -316,7 +322,7 @@ message ServerLoad {
* The metrics for write requests on this region server
*/
optional uint64 write_requests_count = 14;

/**
* The active monitored tasks
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock bl
public static HFileContext cloneContext(HFileContext context) {
HFileContext newContext = new HFileContextBuilder().withBlockSize(context.getBlocksize())
.withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
.withCompression(context.getCompression())
.withCompression(context.getCompression()).withHFileName(context.getHFileName())
.withDataBlockEncoding(context.getDataBlockEncoding())
.withHBaseCheckSum(context.isUseHBaseChecksum()).withCompressTags(context.isCompressTags())
.withIncludesMvcc(context.isIncludesMvcc()).withIncludesTags(context.isIncludesTags())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class HFileInfo implements SortedMap<byte[], byte[]> {
static final byte[] KEY_OF_BIGGEST_CELL = Bytes.toBytes(RESERVED_PREFIX + "KEY_OF_BIGGEST_CELL");
static final byte[] LEN_OF_BIGGEST_CELL = Bytes.toBytes(RESERVED_PREFIX + "LEN_OF_BIGGEST_CELL");
public static final byte[] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
public static final byte[] FILE_SIZE = Bytes.toBytes(RESERVED_PREFIX + "FILE_SIZE");
public static final byte[] FILE_PATH = Bytes.toBytes(RESERVED_PREFIX + "FILE_PATH");
private final SortedMap<byte[], byte[]> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);

/**
Expand Down Expand Up @@ -397,6 +399,9 @@ public void initMetaAndIndex(HFile.Reader reader) throws IOException {
}
// close the block reader
context.getInputStreamWrapper().unbuffer();

put(FILE_SIZE, Bytes.toBytes(context.getFileSize()));
put(FILE_PATH, Bytes.toBytes(context.getFilePath().toString()));
} catch (Throwable t) {
IOUtils.closeQuietly(context.getInputStreamWrapper(),
e -> LOG.warn("failed to close input stream wrapper", e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,10 +784,9 @@ private void updateRegionCachedSize(BlockCacheKey key, long cachedSize) {
String regionName = key.getRegionName();
regionCachedSize.merge(regionName, cachedSize,
(previousSize, newBlockSize) -> previousSize + newBlockSize);
LOG.trace("Updating region cached size for region: {}", regionName);
// If all the blocks for a region are evicted from the cache,
// remove the entry for that region from regionCachedSize map.
if (regionCachedSize.get(regionName) <= 0) {
if (regionCachedSize.getOrDefault(regionName, 0L) <= 0) {
regionCachedSize.remove(regionName);
}
}
Expand Down Expand Up @@ -1645,13 +1644,7 @@ private void updateRegionSizeMapWhileRetrievingFromFile() {
dumpPrefetchList();
}
regionCachedSize.clear();
fullyCachedFiles.forEach((hFileName, hFileSize) -> {
// Get the region name for each file
String regionEncodedName = hFileSize.getFirst();
long cachedFileSize = hFileSize.getSecond();
regionCachedSize.merge(regionEncodedName, cachedFileSize,
(oldpf, fileSize) -> oldpf + fileSize);
});
backingMap.forEach((k, v) -> updateRegionCachedSize(k, v.getLength()));
}

private void dumpPrefetchList() {
Expand Down Expand Up @@ -1723,7 +1716,10 @@ private void updateCacheIndex(BucketCacheProtos.BackingMap chunk,
java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler);
backingMap.putAll(pair2.getFirst());
pair2.getFirst().forEach((k, v) -> {
backingMap.put(k, v);
updateRegionCachedSize(k, v.getLength());
});
blocksByHFile.addAll(pair2.getSecond());
}

Expand Down Expand Up @@ -1774,6 +1770,7 @@ private void retrieveChunkedBackingMap(FileInputStream in) throws IOException {

backingMap.clear();
blocksByHFile.clear();
regionCachedSize.clear();

// Read the backing map entries in batches.
int numChunks = 0;
Expand All @@ -1787,7 +1784,6 @@ private void retrieveChunkedBackingMap(FileInputStream in) throws IOException {
verifyFileIntegrity(cacheEntry);
verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(),
cacheEntry.getMapClass());
updateRegionSizeMapWhileRetrievingFromFile();
}

/**
Expand Down Expand Up @@ -1949,6 +1945,10 @@ public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long e
// split references, we might be evicting just half of the blocks
LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(),
hfileName, initOffset, endOffset);
return evictBlockSet(keySet);
}

private int evictBlockSet(Set<BlockCacheKey> keySet) {
int numEvicted = 0;
for (BlockCacheKey key : keySet) {
if (evictBlock(key)) {
Expand Down Expand Up @@ -2490,7 +2490,17 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
String fileName = hFileInfo.getHFileContext().getHFileName();
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
LOG.debug("Custom tiering is enabled for file: '{}' and it is not hot data", fileName);
// If custom tiering has been just enabled for a file that was cached, we now need
// to evict it.
Set<BlockCacheKey> keySet =
getAllCacheKeysForFile(hFileInfo.getHFileContext().getHFileName(), 0, Long.MAX_VALUE);
int evictedBlocks = evictBlockSet(keySet);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name shouldCacheFile suggests this just a check but it actually evicts the blocks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not ideal, but I still think it should be BucketCache responsibility to evict blocks from files that became classified as cold, upon changing time based priority configuration. Moving this to DataTieringManager would tighter couple it with BucketCache.

if (evictedBlocks > 0) {
LOG.debug(
"Evicted {} blocks for file {} as it is now considered cold by DataTieringManager",
evictedBlocks, fileName);
}
return Optional.of(false);
}
// if we don't have the file in fullyCachedFiles, we should cache it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,16 @@ private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entr
}

private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName())
.setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary())
.setBlockType(toPB(key.getBlockType())).build();
BucketCacheProtos.BlockCacheKey.Builder builder = BucketCacheProtos.BlockCacheKey.newBuilder()
.setHfilename(key.getHfileName()).setOffset(key.getOffset())
.setPrimaryReplicaBlock(key.isPrimary()).setBlockType(toPB(key.getBlockType()));
if (key.getCfName() != null) {
builder.setFamilyName(key.getCfName());
}
if (key.getRegionName() != null) {
builder.setRegionName(key.getRegionName());
}
Comment on lines +106 to +111
Copy link
Copy Markdown
Contributor

@taklwu taklwu Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why weren't the cf and regionname filled before ? is it because the cold data needs for log message or other compute usage?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is required not only by the new "coldDataRatio" metric we are adding, but also the existing "regionCachedRatio" that is critical for the CacheAwareLoadBalancer. Without this change here, we cannot calculate these metrics when recovering the persistent cache. IMO, it's a bug in the current CacheAwareLoadBalancer implementation.

return builder.build();
}

private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
Expand Down
Loading