* This number is 8 MiB, and is derived from the limitations of internal histograms.
*/
- private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
+ private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
+ 2 * 1024 * 1024 : // 2 MiB for systems with small heaps.
+ 8 * 1024 * 1024; // 8 MiB.
private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
/**
@@ -150,21 +166,9 @@ final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.Adaptiv
16384,
16896, // 16384 + 512
};
- private static final ChunkReleasePredicate CHUNK_RELEASE_ALWAYS = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return true;
- }
- };
- private static final ChunkReleasePredicate CHUNK_RELEASE_NEVER = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return false;
- }
- };
private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
- private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
+ private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
static {
if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
@@ -175,7 +179,7 @@ public boolean shouldReleaseChunk(int chunkSize) {
for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
int sizeClass = SIZE_CLASSES[i];
//noinspection ConstantValue
- assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
+ assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
int sizeIndex = sizeIndexOf(sizeClass);
Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
lastIndex = sizeIndex;
@@ -193,8 +197,10 @@ public boolean shouldReleaseChunk(int chunkSize) {
chunkRegistry = new ChunkRegistry();
sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
largeBufferMagazineGroup = new MagazineGroup(
- this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
- threadLocalGroup = new FastThreadLocal() {
+ this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
+
+ boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
+ threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal() {
@Override
protected MagazineGroup[] initialValue() {
if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
@@ -220,7 +226,7 @@ private static MagazineGroup[] createMagazineGroupSizeClasses(
for (int i = 0; i < SIZE_CLASSES.length; i++) {
int segmentSize = SIZE_CLASSES[i];
groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
- new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
+ new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
}
return groups;
}
@@ -245,7 +251,7 @@ private static MagazineGroup[] createMagazineGroupSizeClasses(
*
* @return A new multi-producer, multi-consumer queue.
*/
- private static Queue createSharedChunkQueue() {
+ private static Queue createSharedChunkQueue() {
return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
}
@@ -259,13 +265,14 @@ private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread
if (size <= MAX_POOLED_BUF_SIZE) {
final int index = sizeClassIndexOf(size);
MagazineGroup[] magazineGroups;
- if (!FastThreadLocalThread.willCleanupFastThreadLocals(currentThread) ||
+ if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread()) ||
+ IS_LOW_MEM ||
(magazineGroups = threadLocalGroup.get()) == null) {
magazineGroups = sizeClassedMagazineGroups;
}
if (index < magazineGroups.length) {
allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
- } else {
+ } else if (!IS_LOW_MEM) {
allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
}
}
@@ -292,8 +299,7 @@ static int[] getSizeClasses() {
return SIZE_CLASSES.clone();
}
- private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
- AdaptiveByteBuf buf) {
+ private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
// If we don't already have a buffer, obtain one from the most conveniently available magazine.
Magazine magazine;
if (buf != null) {
@@ -307,10 +313,11 @@ private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread curre
}
// Create a one-off chunk for this allocation.
AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
- Chunk chunk = new Chunk(innerChunk, magazine, false, CHUNK_RELEASE_ALWAYS);
+ Chunk chunk = new Chunk(innerChunk, magazine);
chunkRegistry.add(chunk);
try {
- chunk.readInitInto(buf, size, size, maxCapacity);
+ boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
+ assert success: "Failed to initialize ByteBuf with dedicated chunk";
} finally {
// As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
// will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
@@ -355,38 +362,37 @@ private void free() {
largeBufferMagazineGroup.free();
}
- static int sizeToBucket(int size) {
- return HistogramChunkController.sizeToBucket(size);
- }
-
@SuppressJava6Requirement(reason = "Guarded by version check")
private static final class MagazineGroup {
private final AdaptivePoolingAllocator allocator;
private final ChunkAllocator chunkAllocator;
- private final ChunkControllerFactory chunkControllerFactory;
- private final Queue chunkReuseQueue;
+ private final ChunkManagementStrategy chunkManagementStrategy;
+ private final ChunkCache chunkCache;
private final StampedLock magazineExpandLock;
private final Magazine threadLocalMagazine;
+ private Thread ownerThread;
private volatile Magazine[] magazines;
private volatile boolean freed;
MagazineGroup(AdaptivePoolingAllocator allocator,
ChunkAllocator chunkAllocator,
- ChunkControllerFactory chunkControllerFactory,
+ ChunkManagementStrategy chunkManagementStrategy,
boolean isThreadLocal) {
this.allocator = allocator;
this.chunkAllocator = chunkAllocator;
- this.chunkControllerFactory = chunkControllerFactory;
- chunkReuseQueue = createSharedChunkQueue();
+ this.chunkManagementStrategy = chunkManagementStrategy;
+ chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
if (isThreadLocal) {
+ ownerThread = Thread.currentThread();
magazineExpandLock = null;
- threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
+ threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
} else {
+ ownerThread = null;
magazineExpandLock = new StampedLock();
threadLocalMagazine = null;
Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
for (int i = 0; i < mags.length; i++) {
- mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
+ mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
}
magazines = mags;
}
@@ -446,12 +452,9 @@ private boolean tryExpandMagazines(int currentLength) {
if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
return true;
}
- Magazine firstMagazine = mags[0];
Magazine[] expanded = new Magazine[mags.length * 2];
for (int i = 0, l = expanded.length; i < l; i++) {
- Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
- firstMagazine.initializeSharedStateIn(m);
- expanded[i] = m;
+ expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
}
magazines = expanded;
} finally {
@@ -464,22 +467,32 @@ private boolean tryExpandMagazines(int currentLength) {
return true;
}
- boolean offerToQueue(Chunk buffer) {
+ Chunk pollChunk(int size) {
+ return chunkCache.pollChunk(size);
+ }
+
+ boolean offerChunk(Chunk chunk) {
if (freed) {
return false;
}
- boolean isAdded = chunkReuseQueue.offer(buffer);
+ if (chunk.hasUnprocessedFreelistEntries()) {
+ chunk.processFreelistEntries();
+ }
+ boolean isAdded = chunkCache.offerChunk(chunk);
+
if (freed && isAdded) {
// Help to free the reuse queue.
- freeChunkReuseQueue();
+ freeChunkReuseQueue(ownerThread);
}
return isAdded;
}
private void free() {
freed = true;
+ Thread ownerThread = this.ownerThread;
if (threadLocalMagazine != null) {
+ this.ownerThread = null;
threadLocalMagazine.free();
} else {
long stamp = magazineExpandLock.writeLock();
@@ -492,22 +505,153 @@ private void free() {
magazineExpandLock.unlockWrite(stamp);
}
}
- freeChunkReuseQueue();
+ freeChunkReuseQueue(ownerThread);
}
- private void freeChunkReuseQueue() {
- for (;;) {
- Chunk chunk = chunkReuseQueue.poll();
+ private void freeChunkReuseQueue(Thread ownerThread) {
+ Chunk chunk;
+ while ((chunk = chunkCache.pollChunk(0)) != null) {
+ if (ownerThread != null && chunk instanceof SizeClassedChunk) {
+ SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
+ assert ownerThread == threadLocalChunk.ownerThread;
+ // no release segment can ever happen from the owner Thread since it's not running anymore
+ // This is required to let the ownerThread to be GC'ed despite there are AdaptiveByteBuf
+ // that reference some thread local chunk
+ threadLocalChunk.ownerThread = null;
+ }
+ chunk.markToDeallocate();
+ }
+ }
+ }
+
+ private interface ChunkCache {
+ Chunk pollChunk(int size);
+ boolean offerChunk(Chunk chunk);
+ }
+
+ private static final class ConcurrentQueueChunkCache implements ChunkCache {
+ private final Queue queue;
+
+ private ConcurrentQueueChunkCache() {
+ queue = createSharedChunkQueue();
+ }
+
+ @Override
+ public SizeClassedChunk pollChunk(int size) {
+ // we really don't care about size here since the sized class chunk q
+ // just care about segments of fixed size!
+ Queue queue = this.queue;
+ for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
+ SizeClassedChunk chunk = queue.poll();
if (chunk == null) {
+ return null;
+ }
+ if (chunk.hasRemainingCapacity()) {
+ return chunk;
+ }
+ queue.offer(chunk);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean offerChunk(Chunk chunk) {
+ return queue.offer((SizeClassedChunk) chunk);
+ }
+ }
+
+ private static final class ConcurrentSkipListChunkCache implements ChunkCache {
+ private final ConcurrentSkipListIntObjMultimap chunks;
+
+ private ConcurrentSkipListChunkCache() {
+ chunks = new ConcurrentSkipListIntObjMultimap(-1);
+ }
+
+ @Override
+ public Chunk pollChunk(int size) {
+ if (chunks.isEmpty()) {
+ return null;
+ }
+ IntEntry entry = chunks.pollCeilingEntry(size);
+ if (entry != null) {
+ Chunk chunk = entry.getValue();
+ if (chunk.hasUnprocessedFreelistEntries()) {
+ chunk.processFreelistEntries();
+ }
+ return chunk;
+ }
+
+ Chunk bestChunk = null;
+ int bestRemainingCapacity = 0;
+ Iterator> itr = chunks.iterator();
+ while (itr.hasNext()) {
+ entry = itr.next();
+ final Chunk chunk;
+ if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
+ if (!chunks.remove(entry.getKey(), entry.getValue())) {
+ continue;
+ }
+ chunk.processFreelistEntries();
+ int remainingCapacity = chunk.remainingCapacity();
+ if (remainingCapacity >= size &&
+ (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
+ if (bestChunk != null) {
+ chunks.put(bestRemainingCapacity, bestChunk);
+ }
+ bestChunk = chunk;
+ bestRemainingCapacity = remainingCapacity;
+ } else {
+ chunks.put(remainingCapacity, chunk);
+ }
+ }
+ }
+
+ return bestChunk;
+ }
+
+ @Override
+ public boolean offerChunk(Chunk chunk) {
+ chunks.put(chunk.remainingCapacity(), chunk);
+
+ int size = chunks.size();
+ while (size > CHUNK_REUSE_QUEUE) {
+ // Deallocate the chunk with the fewest incoming references.
+ int key = -1;
+ Chunk toDeallocate = null;
+ for (IntEntry entry : chunks) {
+ Chunk candidate = entry.getValue();
+ if (candidate != null) {
+ if (toDeallocate == null) {
+ toDeallocate = candidate;
+ key = entry.getKey();
+ } else {
+ int candidateRefCnt = candidate.refCnt();
+ int toDeallocateRefCnt = toDeallocate.refCnt();
+ if (candidateRefCnt < toDeallocateRefCnt ||
+ candidateRefCnt == toDeallocateRefCnt &&
+ candidate.capacity() < toDeallocate.capacity()) {
+ toDeallocate = candidate;
+ key = entry.getKey();
+ }
+ }
+ }
+ }
+ if (toDeallocate == null) {
break;
}
- chunk.release();
+ if (chunks.remove(key, toDeallocate)) {
+ toDeallocate.markToDeallocate();
+ }
+ size = chunks.size();
}
+ return true;
}
}
- private interface ChunkControllerFactory {
- ChunkController create(MagazineGroup group);
+ private interface ChunkManagementStrategy {
+ ChunkController createController(MagazineGroup group);
+
+ ChunkCache createChunkCache(boolean isThreadLocal);
}
private interface ChunkController {
@@ -516,66 +660,75 @@ private interface ChunkController {
*/
int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
- /**
- * Initialize the given chunk factory with shared statistics state (if any) from this factory.
- */
- void initializeSharedStateIn(ChunkController chunkController);
-
/**
* Allocate a new {@link Chunk} for the given {@link Magazine}.
*/
Chunk newChunkAllocation(int promptingSize, Magazine magazine);
}
- private interface ChunkReleasePredicate {
- boolean shouldReleaseChunk(int chunkSize);
- }
-
- private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
+ private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
// To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
// We choose 32 because it seems neither too small nor too big.
// For segments of 16 KiB, the chunks will be half a megabyte.
private static final int MIN_SEGMENTS_PER_CHUNK = 32;
private final int segmentSize;
private final int chunkSize;
- private final int[] segmentOffsets;
- private SizeClassChunkControllerFactory(int segmentSize) {
+ private SizeClassChunkManagementStrategy(int segmentSize) {
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
- int segmentsCount = chunkSize / segmentSize;
- segmentOffsets = new int[segmentsCount];
- for (int i = 0; i < segmentsCount; i++) {
- segmentOffsets[i] = i * segmentSize;
- }
}
@Override
- public ChunkController create(MagazineGroup group) {
- return new SizeClassChunkController(group, segmentSize, chunkSize, segmentOffsets);
+ public ChunkController createController(MagazineGroup group) {
+ return new SizeClassChunkController(group, segmentSize, chunkSize);
+ }
+
+ @Override
+ public ChunkCache createChunkCache(boolean isThreadLocal) {
+ return new ConcurrentQueueChunkCache();
}
}
private static final class SizeClassChunkController implements ChunkController {
- private static final ChunkReleasePredicate FALSE_PREDICATE = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return false;
- }
- };
private final ChunkAllocator chunkAllocator;
private final int segmentSize;
private final int chunkSize;
private final ChunkRegistry chunkRegistry;
- private final int[] segmentOffsets;
- private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize, int[] segmentOffsets) {
+ private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
chunkAllocator = group.chunkAllocator;
this.segmentSize = segmentSize;
this.chunkSize = chunkSize;
chunkRegistry = group.allocator.chunkRegistry;
- this.segmentOffsets = segmentOffsets;
+ }
+
+ private MpscIntQueue createEmptyFreeList() {
+ return new MpscAtomicIntegerArrayQueue(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
+ }
+
+ private MpscIntQueue createFreeList() {
+ final int segmentsCount = chunkSize / segmentSize;
+ final MpscIntQueue freeList = new MpscAtomicIntegerArrayQueue(
+ segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
+ int segmentOffset = 0;
+ for (int i = 0; i < segmentsCount; i++) {
+ freeList.offer(segmentOffset);
+ segmentOffset += segmentSize;
+ }
+ return freeList;
+ }
+
+ private IntStack createLocalFreeList() {
+ final int segmentsCount = chunkSize / segmentSize;
+ int segmentOffset = chunkSize;
+ int[] offsets = new int[segmentsCount];
+ for (int i = 0; i < segmentsCount; i++) {
+ segmentOffset -= segmentSize;
+ offsets[i] = segmentOffset;
+ }
+ return new IntStack(offsets);
}
@Override
@@ -584,235 +737,59 @@ public int computeBufferCapacity(
return Math.min(segmentSize, maxCapacity);
}
- @Override
- public void initializeSharedStateIn(ChunkController chunkController) {
- // NOOP
- }
-
@Override
public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
assert chunkBuffer.capacity() == chunkSize;
- SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, true,
- segmentSize, segmentOffsets, FALSE_PREDICATE);
+ SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
chunkRegistry.add(chunk);
return chunk;
}
}
- private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
- private final boolean shareable;
+ private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
+ private final AtomicInteger maxChunkSize = new AtomicInteger();
- private HistogramChunkControllerFactory(boolean shareable) {
- this.shareable = shareable;
+ @Override
+ public ChunkController createController(MagazineGroup group) {
+ return new BuddyChunkController(group, maxChunkSize);
}
@Override
- public ChunkController create(MagazineGroup group) {
- return new HistogramChunkController(group, shareable);
+ public ChunkCache createChunkCache(boolean isThreadLocal) {
+ return new ConcurrentSkipListChunkCache();
}
}
- private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
- private static final int MIN_DATUM_TARGET = 1024;
- private static final int MAX_DATUM_TARGET = 65534;
- private static final int INIT_DATUM_TARGET = 9;
- private static final int HISTO_BUCKET_COUNT = 16;
- private static final int[] HISTO_BUCKETS = {
- 16 * 1024,
- 24 * 1024,
- 32 * 1024,
- 48 * 1024,
- 64 * 1024,
- 96 * 1024,
- 128 * 1024,
- 192 * 1024,
- 256 * 1024,
- 384 * 1024,
- 512 * 1024,
- 768 * 1024,
- 1024 * 1024,
- 1792 * 1024,
- 2048 * 1024,
- 3072 * 1024
- };
-
- private final MagazineGroup group;
- private final boolean shareable;
- private final short[][] histos = {
- new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
- new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
- };
+ private static final class BuddyChunkController implements ChunkController {
+ private final ChunkAllocator chunkAllocator;
private final ChunkRegistry chunkRegistry;
- private short[] histo = histos[0];
- private final int[] sums = new int[HISTO_BUCKET_COUNT];
-
- private int histoIndex;
- private int datumCount;
- private int datumTarget = INIT_DATUM_TARGET;
- private boolean hasHadRotation;
- private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
- private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
- private volatile int localUpperBufSize;
-
- private HistogramChunkController(MagazineGroup group, boolean shareable) {
- this.group = group;
- this.shareable = shareable;
- chunkRegistry = group.allocator.chunkRegistry;
- }
-
- @Override
- public int computeBufferCapacity(
- int requestedSize, int maxCapacity, boolean isReallocation) {
- if (!isReallocation) {
- // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
- // change of the buffer.
- recordAllocationSize(requestedSize);
- }
+ private final AtomicInteger maxChunkSize;
- // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
- // based on the requested size, because localUpperBufSize can potentially be quite large.
- int startCapLimits;
- if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
- startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
- } else {
- startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
- }
- int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
- startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
- return startingCapacity;
- }
-
- private void recordAllocationSize(int bufferSizeToRecord) {
- // Use the preserved size from the reused AdaptiveByteBuf, if available.
- // Otherwise, use the requested buffer size.
- // This way, we better take into account
- if (bufferSizeToRecord == 0) {
- return;
- }
- int bucket = sizeToBucket(bufferSizeToRecord);
- histo[bucket]++;
- if (datumCount++ == datumTarget) {
- rotateHistograms();
- }
- }
-
- static int sizeToBucket(int size) {
- int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
- return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
- }
-
- private static int binarySearchInsertionPoint(int index) {
- if (index < 0) {
- index = -(index + 1);
- }
- return index;
- }
-
- static int bucketToSize(int sizeBucket) {
- return HISTO_BUCKETS[sizeBucket];
- }
-
- private void rotateHistograms() {
- short[][] hs = histos;
- for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
- sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
- }
- int sum = 0;
- for (int count : sums) {
- sum += count;
- }
- int targetPercentile = (int) (sum * 0.99);
- int sizeBucket = 0;
- for (; sizeBucket < sums.length; sizeBucket++) {
- if (sums[sizeBucket] > targetPercentile) {
- break;
- }
- targetPercentile -= sums[sizeBucket];
- }
- hasHadRotation = true;
- int percentileSize = bucketToSize(sizeBucket);
- int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
- localUpperBufSize = percentileSize;
- localPrefChunkSize = prefChunkSize;
- if (shareable) {
- for (Magazine mag : group.magazines) {
- HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
- prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
- }
- }
- if (sharedPrefChunkSize != prefChunkSize) {
- // Preferred chunk size changed. Increase check frequency.
- datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
- sharedPrefChunkSize = prefChunkSize;
- } else {
- // Preferred chunk size did not change. Check less often.
- datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
- }
-
- histoIndex = histoIndex + 1 & 3;
- histo = histos[histoIndex];
- datumCount = 0;
- Arrays.fill(histo, (short) 0);
- }
-
- /**
- * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
- * allocation sizes.
- *
- * This method must be thread-safe.
- *
- * @return The currently preferred chunk allocation size.
- */
- int preferredChunkSize() {
- return sharedPrefChunkSize;
+ BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
+ chunkAllocator = group.chunkAllocator;
+ chunkRegistry = group.allocator.chunkRegistry;
+ this.maxChunkSize = maxChunkSize;
}
@Override
- public void initializeSharedStateIn(ChunkController chunkController) {
- HistogramChunkController statistics = (HistogramChunkController) chunkController;
- int sharedPrefChunkSize = this.sharedPrefChunkSize;
- statistics.localPrefChunkSize = sharedPrefChunkSize;
- statistics.sharedPrefChunkSize = sharedPrefChunkSize;
+ public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
+ return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
}
@Override
public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
- int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
- int minChunks = size / MIN_CHUNK_SIZE;
- if (MIN_CHUNK_SIZE * minChunks < size) {
- // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
- // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
- // to manage the memory in terms of whole pages. This reduces memory fragmentation,
- // but without the potentially high overhead that power-of-2 chunk sizes would bring.
- size = MIN_CHUNK_SIZE * (1 + minChunks);
- }
-
- // Limit chunks to the max size, even if the histogram suggests to go above it.
- size = Math.min(size, MAX_CHUNK_SIZE);
-
- // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
- if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
- sharedPrefChunkSize = size;
- }
-
- ChunkAllocator chunkAllocator = group.chunkAllocator;
- Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
+ int maxChunkSize = this.maxChunkSize.get();
+ int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
+ int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
+ if (chunkSize > maxChunkSize) {
+ // Update our stored max chunk size. It's fine that this is racy.
+ this.maxChunkSize.set(chunkSize);
+ }
+ BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
chunkRegistry.add(chunk);
return chunk;
}
-
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- int preferredSize = preferredChunkSize();
- int givenChunks = chunkSize / MIN_CHUNK_SIZE;
- int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
- int deviation = Math.abs(givenChunks - preferredChunks);
-
- // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
- return deviation != 0 &&
- ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
- }
}
@SuppressJava6Requirement(reason = "Guarded by version check")
@@ -823,13 +800,31 @@ private static final class Magazine {
}
private static final Chunk MAGAZINE_FREED = new Chunk();
- private static final ObjectPool EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
- new ObjectPool.ObjectCreator() {
- @Override
- public AdaptiveByteBuf newObject(ObjectPool.Handle handle) {
- return new AdaptiveByteBuf(handle);
- }
- });
+ private static final class AdaptiveRecycler extends Recycler {
+
+ private AdaptiveRecycler() {
+ }
+
+ private AdaptiveRecycler(int maxCapacity) {
+ // doesn't use fast thread local, shared
+ super(maxCapacity);
+ }
+
+ @Override
+ protected AdaptiveByteBuf newObject(final Handle handle) {
+ return new AdaptiveByteBuf((EnhancedHandle) handle);
+ }
+
+ public static AdaptiveRecycler threadLocal() {
+ return new AdaptiveRecycler();
+ }
+
+ public static AdaptiveRecycler sharedWith(int maxCapacity) {
+ return new AdaptiveRecycler(maxCapacity);
+ }
+ }
+
+ private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
private Chunk current;
@SuppressWarnings("unused") // updated via NEXT_IN_LINE
@@ -837,31 +832,20 @@ public AdaptiveByteBuf newObject(ObjectPool.Handle handle) {
private final MagazineGroup group;
private final ChunkController chunkController;
private final StampedLock allocationLock;
- private final Queue bufferQueue;
- private final ObjectPool.Handle handle;
- private final Queue sharedChunkQueue;
+ private final AdaptiveRecycler recycler;
- Magazine(MagazineGroup group, boolean shareable, Queue sharedChunkQueue,
- ChunkController chunkController) {
+ Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
this.group = group;
this.chunkController = chunkController;
if (shareable) {
// We only need the StampedLock if this Magazine will be shared across threads.
allocationLock = new StampedLock();
- bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
- handle = new ObjectPool.Handle() {
- @Override
- public void recycle(AdaptiveByteBuf self) {
- bufferQueue.offer(self);
- }
- };
+ recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
} else {
allocationLock = null;
- bufferQueue = null;
- handle = null;
+ recycler = null;
}
- this.sharedChunkQueue = sharedChunkQueue;
}
public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
@@ -890,7 +874,7 @@ private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf b
return false;
}
if (curr == null) {
- curr = sharedChunkQueue.poll();
+ curr = group.pollChunk(size);
if (curr == null) {
return false;
}
@@ -900,9 +884,10 @@ private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf b
int remainingCapacity = curr.remainingCapacity();
int startingCapacity = chunkController.computeBufferCapacity(
size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
- if (remainingCapacity >= size) {
- curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
+ if (remainingCapacity >= size &&
+ curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
allocated = true;
+ remainingCapacity = curr.remainingCapacity();
}
try {
if (remainingCapacity >= RETIRE_CAPACITY) {
@@ -921,33 +906,17 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
Chunk curr = current;
if (curr != null) {
- // We have a Chunk that has some space left.
+ boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
int remainingCapacity = curr.remainingCapacity();
- if (remainingCapacity > startingCapacity) {
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
- // We still have some bytes left that we can use for the next allocation, just early return.
- return true;
- }
-
- // At this point we know that this will be the last time current will be used, so directly set it to
- // null and release it once we are done.
- current = null;
- if (remainingCapacity >= size) {
- try {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
- return true;
- } finally {
- curr.releaseFromMagazine();
- }
- }
-
- // Check if we either retain the chunk in the nextInLine cache or releasing it.
- if (remainingCapacity < RETIRE_CAPACITY) {
- curr.releaseFromMagazine();
- } else {
- // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
- // This method will release curr if this is not the case
+ if (!success && remainingCapacity > 0) {
+ current = null;
transferToNextInLineOrRelease(curr);
+ } else if (remainingCapacity == 0) {
+ current = null;
+ curr.releaseFromMagazine();
+ }
+ if (success) {
+ return true;
}
}
@@ -969,32 +938,28 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
}
int remainingCapacity = curr.remainingCapacity();
- if (remainingCapacity > startingCapacity) {
+ if (remainingCapacity > startingCapacity &&
+ curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
// We have a Chunk that has some space left.
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
current = curr;
return true;
}
- if (remainingCapacity >= size) {
- // At this point we know that this will be the last time curr will be used, so directly set it to
- // null and release it once we are done.
- try {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
- return true;
- } finally {
- // Release in a finally block so even if readInitInto(...) would throw we would still correctly
- // release the current chunk before null it out.
- curr.releaseFromMagazine();
+ try {
+ if (remainingCapacity >= size) {
+ // At this point we know that this will be the last time curr will be used, so directly set it
+ // to null and release it once we are done.
+ return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
}
- } else {
- // Release it as it's too small.
+ } finally {
+ // Release in a finally block so even if readInitInto(...) would throw we would still correctly
+ // release the current chunk before null it out.
curr.releaseFromMagazine();
}
}
// Now try to poll from the central queue first
- curr = sharedChunkQueue.poll();
+ curr = group.pollChunk(size);
if (curr == null) {
curr = chunkController.newChunkAllocation(size, this);
} else {
@@ -1015,14 +980,15 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
}
current = curr;
+ boolean success;
try {
int remainingCapacity = curr.remainingCapacity();
assert remainingCapacity >= size;
if (remainingCapacity > startingCapacity) {
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
+ success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
curr = null;
} else {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
+ success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
}
} finally {
if (curr != null) {
@@ -1032,7 +998,7 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
current = null;
}
}
- return true;
+ return success;
}
private void restoreMagazineFreed() {
@@ -1063,10 +1029,6 @@ private void transferToNextInLineOrRelease(Chunk chunk) {
chunk.releaseFromMagazine();
}
- boolean trySetNextInLine(Chunk chunk) {
- return NEXT_IN_LINE.compareAndSet(this, null, chunk);
- }
-
void free() {
// Release the current Chunk and the next that was stored for later usage.
restoreMagazineFreed();
@@ -1084,26 +1046,15 @@ void free() {
}
public AdaptiveByteBuf newBuffer() {
- AdaptiveByteBuf buf;
- if (handle == null) {
- buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
- } else {
- buf = bufferQueue.poll();
- if (buf == null) {
- buf = new AdaptiveByteBuf(handle);
- }
- }
+ AdaptiveRecycler recycler = this.recycler;
+ AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
buf.resetRefCnt();
buf.discardMarks();
return buf;
}
boolean offerToQueue(Chunk chunk) {
- return group.offerToQueue(chunk);
- }
-
- public void initializeSharedStateIn(Magazine other) {
- chunkController.initializeSharedStateIn(other.chunkController);
+ return group.offerChunk(chunk);
}
}
@@ -1133,9 +1084,7 @@ private static class Chunk implements ReferenceCounted {
protected final AbstractByteBuf delegate;
protected Magazine magazine;
private final AdaptivePoolingAllocator allocator;
- private final ChunkReleasePredicate chunkReleasePredicate;
private final int capacity;
- private final boolean pooled;
protected int allocatedBytes;
private static final ReferenceCountUpdater updater =
@@ -1161,23 +1110,17 @@ protected long unsafeOffset() {
delegate = null;
magazine = null;
allocator = null;
- chunkReleasePredicate = null;
capacity = 0;
- pooled = false;
}
- Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
- ChunkReleasePredicate chunkReleasePredicate) {
+ Chunk(AbstractByteBuf delegate, Magazine magazine) {
this.delegate = delegate;
- this.pooled = pooled;
capacity = delegate.capacity();
updater.setInitialValue(this);
attachToMagazine(magazine);
// We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
allocator = magazine.group.allocator;
-
- this.chunkReleasePredicate = chunkReleasePredicate;
}
Magazine currentMagazine() {
@@ -1241,46 +1184,33 @@ public boolean release(int decrement) {
/**
* Called when a magazine is done using this chunk, probably because it was emptied.
*/
- boolean releaseFromMagazine() {
- return release();
+ void releaseFromMagazine() {
+ // Chunks can be reused before they become empty.
+ // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
+ Magazine mag = magazine;
+ detachFromMagazine();
+ if (!mag.offerToQueue(this)) {
+ markToDeallocate();
+ }
}
/**
* Called when a ByteBuf is done using its allocation in this chunk.
*/
- boolean releaseSegment(int ignoredSegmentId) {
- return release();
+ void releaseSegment(int ignoredSegmentId, int size) {
+ release();
}
- private void deallocate() {
- Magazine mag = magazine;
- int chunkSize = delegate.capacity();
- if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
- // Drop the chunk if the parent allocator is closed,
- // or if the chunk deviates too much from the preferred chunk size.
- detachFromMagazine();
- allocator.chunkRegistry.remove(this);
- delegate.release();
- } else {
- updater.resetRefCnt(this);
- delegate.setIndex(0, 0);
- allocatedBytes = 0;
- if (!mag.trySetNextInLine(this)) {
- // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
- detachFromMagazine();
- if (!mag.offerToQueue(this)) {
- // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
- // which did increase the reference count by 1.
- boolean released = updater.release(this);
- allocator.chunkRegistry.remove(this);
- delegate.release();
- assert released;
- }
- }
- }
+ void markToDeallocate() {
+ release();
}
- public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ protected void deallocate() {
+ allocator.chunkRegistry.remove(this);
+ delegate.release();
+ }
+
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
int startIndex = allocatedBytes;
allocatedBytes = startIndex + startingCapacity;
Chunk chunk = this;
@@ -1297,101 +1227,408 @@ public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, in
chunk.release();
}
}
+ return true;
}
public int remainingCapacity() {
return capacity - allocatedBytes;
}
+ public boolean hasUnprocessedFreelistEntries() {
+ return false;
+ }
+
+ public void processFreelistEntries() {
+ }
+
public int capacity() {
return capacity;
}
}
+ private static final class IntStack {
+
+ private final int[] stack;
+ private int top;
+
+ IntStack(int[] initialValues) {
+ stack = initialValues;
+ top = initialValues.length - 1;
+ }
+
+ public boolean isEmpty() {
+ return top == -1;
+ }
+
+ public int pop() {
+ final int last = stack[top];
+ top--;
+ return last;
+ }
+
+ public void push(int value) {
+ stack[top + 1] = value;
+ top++;
+ }
+
+ public int size() {
+ return top + 1;
+ }
+ }
+
+ /**
+ * Removes per-allocation retain()/release() atomic ops from the hot path by replacing ref counting
+ * with a segment-count state machine. Atomics are only needed on the cold deallocation path
+ * ({@link #markToDeallocate()}), which is rare for long-lived chunks that cycle segments many times.
+ * The tradeoff is a {@link MpscIntQueue#size()} call (volatile reads, no RMW) per remaining segment
+ * return after mark — acceptable since it avoids atomic RMWs entirely.
+ *
+ * State transitions:
+ *
+ *
{@link #AVAILABLE} (-1): chunk is in use, no deallocation tracking needed
+ *
0..N: local free list size at the time {@link #markToDeallocate()} was called;
+ * used to track when all segments have been returned
+ *
{@link #DEALLOCATED} (Integer.MIN_VALUE): all segments returned, chunk deallocated
+ *
+ *
+ * Ordering: external {@link #releaseSegment} pushes to the MPSC queue (which has an implicit
+ * StoreLoad barrier via its {@code offer()}), then reads {@code state} — this guarantees
+ * visibility of any preceding {@link #markToDeallocate()} write.
+ */
private static final class SizeClassedChunk extends Chunk {
private static final int FREE_LIST_EMPTY = -1;
+ private static final int AVAILABLE = -1;
+ // Integer.MIN_VALUE so that `DEALLOCATED + externalFreeList.size()` can never equal `segments`,
+ // making late-arriving releaseSegment calls on external threads arithmetically harmless.
+ private static final int DEALLOCATED = Integer.MIN_VALUE;
+ private static final AtomicIntegerFieldUpdater STATE =
+ AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
+ private volatile int state;
+ private final int segments;
private final int segmentSize;
- private final MpscIntQueue freeList;
-
- SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
- final int[] segmentOffsets, ChunkReleasePredicate shouldReleaseChunk) {
- super(delegate, magazine, pooled, shouldReleaseChunk);
- this.segmentSize = segmentSize;
- int segmentCount = segmentOffsets.length;
- assert delegate.capacity() / segmentSize == segmentCount;
- assert segmentCount > 0: "Chunk must have a positive number of segments";
- freeList = new MpscAtomicIntegerArrayQueue(segmentCount, FREE_LIST_EMPTY);
- freeList.fill(segmentCount, new IntSupplier() {
- int counter;
- @Override
- public int get() {
- return segmentOffsets[counter++];
- }
- });
+ private final MpscIntQueue externalFreeList;
+ private final IntStack localFreeList;
+ private Thread ownerThread;
+
+ SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
+ SizeClassChunkController controller) {
+ super(delegate, magazine);
+ segmentSize = controller.segmentSize;
+ segments = controller.chunkSize / segmentSize;
+ STATE.lazySet(this, AVAILABLE);
+ ownerThread = magazine.group.ownerThread;
+ if (ownerThread == null) {
+ externalFreeList = controller.createFreeList();
+ localFreeList = null;
+ } else {
+ externalFreeList = controller.createEmptyFreeList();
+ localFreeList = controller.createLocalFreeList();
+ }
}
@Override
- public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
- int startIndex = freeList.poll();
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ assert state == AVAILABLE;
+ final int startIndex = nextAvailableSegmentOffset();
if (startIndex == FREE_LIST_EMPTY) {
- throw new IllegalStateException("Free list is empty");
+ return false;
}
allocatedBytes += segmentSize;
+ try {
+ buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ } catch (Throwable t) {
+ allocatedBytes -= segmentSize;
+ releaseSegmentOffsetIntoFreeList(startIndex);
+ PlatformDependent.throwException(t);
+ }
+ return true;
+ }
+
+ private int nextAvailableSegmentOffset() {
+ final int startIndex;
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null) {
+ assert Thread.currentThread() == ownerThread;
+ if (localFreeList.isEmpty()) {
+ startIndex = externalFreeList.poll();
+ } else {
+ startIndex = localFreeList.pop();
+ }
+ } else {
+ startIndex = externalFreeList.poll();
+ }
+ return startIndex;
+ }
+
+ // this can be used by the ConcurrentQueueChunkCache to find the first buffer to use:
+ // it doesn't update the remaining capacity and it's not consider a single segmentSize
+ // case as not suitable to be reused
+ public boolean hasRemainingCapacity() {
+ int remaining = super.remainingCapacity();
+ if (remaining > 0) {
+ return true;
+ }
+ if (localFreeList != null) {
+ return !localFreeList.isEmpty();
+ }
+ return !externalFreeList.isEmpty();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ int remaining = super.remainingCapacity();
+ return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
+ }
+
+ private int updateRemainingCapacity(int snapshotted) {
+ int freeSegments = externalFreeList.size();
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null) {
+ freeSegments += localFreeList.size();
+ }
+ int updated = freeSegments * segmentSize;
+ if (updated != snapshotted) {
+ allocatedBytes = capacity() - updated;
+ }
+ return updated;
+ }
+
+ private void releaseSegmentOffsetIntoFreeList(int startIndex) {
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null && Thread.currentThread() == ownerThread) {
+ localFreeList.push(startIndex);
+ } else {
+ boolean segmentReturned = externalFreeList.offer(startIndex);
+ assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
+ }
+ }
+
+ @Override
+ void releaseSegment(int startIndex, int size) {
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null && Thread.currentThread() == ownerThread) {
+ localFreeList.push(startIndex);
+ int state = this.state;
+ if (state != AVAILABLE) {
+ updateStateOnLocalReleaseSegment(state, localFreeList);
+ }
+ } else {
+ boolean segmentReturned = externalFreeList.offer(startIndex);
+ assert segmentReturned;
+ // implicit StoreLoad barrier from MPSC offer()
+ int state = this.state;
+ if (state != AVAILABLE) {
+ deallocateIfNeeded(state);
+ }
+ }
+ }
+
+ private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
+ int newLocalSize = localFreeList.size();
+ boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
+ assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
+ deallocateIfNeeded(newLocalSize);
+ }
+
+ private void deallocateIfNeeded(int localSize) {
+ // Check if all segments have been returned.
+ int totalFreeSegments = localSize + externalFreeList.size();
+ if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
+ deallocate();
+ }
+ }
+
+ @Override
+ void markToDeallocate() {
+ IntStack localFreeList = this.localFreeList;
+ int localSize = localFreeList != null ? localFreeList.size() : 0;
+ STATE.set(this, localSize);
+ deallocateIfNeeded(localSize);
+ }
+ }
+
+ private static final class BuddyChunk extends Chunk implements IntConsumer {
+ private static final int MIN_BUDDY_SIZE = 32768;
+ private static final byte IS_CLAIMED = (byte) (1 << 7);
+ private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
+ private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
+ private static final int PACK_OFFSET_MASK = 0xFFFF;
+ private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
+
+ private final MpscIntQueue freeList;
+ // The bits of each buddy: [1: is claimed][1: has claimed children][30: MIN_BUDDY_SIZE shift to get size]
+ private final byte[] buddies;
+ private final int freeListCapacity;
+
+ BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
+ super(delegate, magazine);
+ freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
+ int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
+ assert maxShift <= 30; // The top 2 bits are used for marking.
+ // At most half of tree (all leaf nodes) can be freed.
+ freeList = new MpscAtomicIntegerArrayQueue(freeListCapacity, -1);
+ buddies = new byte[freeListCapacity << 1];
+
+ // Generate the buddies entries.
+ int index = 1;
+ int runLength = 1;
+ int currentRun = 0;
+ while (maxShift > 0) {
+ buddies[index++] = (byte) maxShift;
+ if (++currentRun == runLength) {
+ currentRun = 0;
+ runLength <<= 1;
+ maxShift--;
+ }
+ }
+ }
+
+ @Override
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ if (!freeList.isEmpty()) {
+ freeList.drain(freeListCapacity, this);
+ }
+ int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
+ if (startIndex == -1) {
+ return false;
+ }
Chunk chunk = this;
chunk.retain();
try {
- buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ allocatedBytes += startingCapacity;
chunk = null;
} finally {
if (chunk != null) {
+ unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
// If chunk is not null we know that buf.init(...) failed and so we need to manually release
- // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
- // restore the old allocatedBytes value.
- allocatedBytes -= segmentSize;
- chunk.releaseSegment(startIndex);
+ // the chunk again as we retained it before calling buf.init(...).
+ chunk.release();
}
}
+ return true;
+ }
+
+ @Override
+ public void accept(int packed) {
+ // Called by allocating thread when draining freeList.
+ int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
+ int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
+ unreserveMatchingBuddy(1, size, offset, 0);
+ allocatedBytes -= size;
+ }
+
+ @Override
+ void releaseSegment(int startingIndex, int size) {
+ int packedOffset = startingIndex / MIN_BUDDY_SIZE;
+ int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
+ int packed = packedOffset | packedSize;
+ freeList.offer(packed);
+ release();
}
@Override
public int remainingCapacity() {
- int remainingCapacity = super.remainingCapacity();
- if (remainingCapacity > segmentSize) {
- return remainingCapacity;
+ if (!freeList.isEmpty()) {
+ freeList.drain(freeListCapacity, this);
}
- int updatedRemainingCapacity = freeList.size() * segmentSize;
- if (updatedRemainingCapacity == remainingCapacity) {
- return remainingCapacity;
- }
- // update allocatedBytes based on what's available in the free list
- allocatedBytes = capacity() - updatedRemainingCapacity;
- return updatedRemainingCapacity;
+ return super.remainingCapacity();
}
@Override
- boolean releaseFromMagazine() {
- // Size-classed chunks can be reused before they become empty.
- // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
- Magazine mag = magazine;
- detachFromMagazine();
- if (!mag.offerToQueue(this)) {
- return super.releaseFromMagazine();
+ public boolean hasUnprocessedFreelistEntries() {
+ return !freeList.isEmpty();
+ }
+
+ @Override
+ public void processFreelistEntries() {
+ freeList.drain(freeListCapacity, this);
+ }
+
+ /**
+ * Claim a suitable buddy and return its start offset into the delegate chunk, or return -1 if nothing claimed.
+ */
+ private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
+ byte[] buddies = this.buddies;
+ while (index < buddies.length) {
+ byte buddy = buddies[index];
+ int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
+ if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
+ return -1;
+ }
+ if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
+ buddies[index] |= IS_CLAIMED;
+ return currOffset;
+ }
+ int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
+ if (found != -1) {
+ buddies[index] |= HAS_CLAIMED_CHILDREN;
+ return found;
+ }
+ index = (index << 1) + 1;
+ currOffset += currValue >> 1; // Bump offset to skip first half of this layer.
}
- return false;
+ return -1;
+ }
+
+ /**
+ * Un-reserve the matching buddy and return whether there are any other child or sibling reservations.
+ */
+ private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
+ byte[] buddies = this.buddies;
+ if (buddies.length <= index) {
+ return false;
+ }
+ byte buddy = buddies[index];
+ int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
+
+ if (currSize == size) {
+ // We're at the right size level.
+ if (currOffset == offset) {
+ buddies[index] &= SHIFT_MASK;
+ return false;
+ }
+ throw new IllegalStateException("The intended segment was not found at index " +
+ index + ", for size " + size + " and offset " + offset);
+ }
+
+ // We're at a parent size level. Use the target offset to guide our drill-down path.
+ boolean claims;
+ int siblingIndex;
+ if (offset < currOffset + (currSize >> 1)) {
+ // Must be down the left path.
+ claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
+ siblingIndex = (index << 1) + 1;
+ } else {
+ // Must be down the rigth path.
+ claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
+ siblingIndex = index << 1;
+ }
+ if (!claims) {
+ // No other claims down the path we took. Check if the sibling has claims.
+ byte sibling = buddies[siblingIndex];
+ if ((sibling & SHIFT_MASK) == sibling) {
+ // No claims in the sibling. We can clear this level as well.
+ buddies[index] &= SHIFT_MASK;
+ return false;
+ }
+ }
+ return true;
}
@Override
- boolean releaseSegment(int startIndex) {
- boolean released = release();
- boolean segmentReturned = freeList.offer(startIndex);
- assert segmentReturned: "Unable to return segment " + startIndex + " to free list";
- return released;
+ public String toString() {
+ int capacity = delegate.capacity();
+ int remaining = capacity - allocatedBytes;
+ return "BuddyChunk[capacity: " + capacity +
+ ", remaining: " + remaining +
+ ", free list: " + freeList.size() + ']';
}
}
static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
- private final ObjectPool.Handle handle;
+ private final EnhancedHandle handle;
// this both act as adjustment and the start index for a free list segment allocation
private int startIndex;
@@ -1403,7 +1640,7 @@ static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
private boolean hasArray;
private boolean hasMemoryAddress;
- AdaptiveByteBuf(ObjectPool.Handle recyclerHandle) {
+ AdaptiveByteBuf(EnhancedHandle recyclerHandle) {
super(0);
handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
}
@@ -1442,12 +1679,11 @@ public int maxFastWritableBytes() {
@Override
public ByteBuf capacity(int newCapacity) {
+ checkNewCapacity(newCapacity);
if (length <= newCapacity && newCapacity <= maxFastCapacity) {
- ensureAccessible();
length = newCapacity;
return this;
}
- checkNewCapacity(newCapacity);
if (newCapacity < capacity()) {
length = newCapacity;
trimIndicesToCapacity(newCapacity);
@@ -1460,11 +1696,14 @@ public ByteBuf capacity(int newCapacity) {
int readerIndex = this.readerIndex;
int writerIndex = this.writerIndex;
int baseOldRootIndex = startIndex;
- int oldCapacity = length;
+ int oldLength = length;
+ int oldCapacity = maxFastCapacity;
AbstractByteBuf oldRoot = rootParent();
allocator.reallocate(newCapacity, maxCapacity(), this);
- oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
- chunk.releaseSegment(baseOldRootIndex);
+ oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
+ chunk.releaseSegment(baseOldRootIndex, oldCapacity);
+ assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
+ "Capacity increase failed";
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
return this;
@@ -1475,6 +1714,7 @@ public ByteBufAllocator alloc() {
return rootParent().alloc();
}
+ @SuppressWarnings("deprecation")
@Override
public ByteOrder order() {
return rootParent().order();
@@ -1841,17 +2081,12 @@ private int idx(int index) {
@Override
protected void deallocate() {
if (chunk != null) {
- chunk.releaseSegment(startIndex);
+ chunk.releaseSegment(startIndex, maxFastCapacity);
}
tmpNioBuf = null;
chunk = null;
rootParent = null;
- if (handle instanceof EnhancedHandle) {
- EnhancedHandle enhancedHandle = (EnhancedHandle) handle;
- enhancedHandle.unguardedRecycle(this);
- } else {
- handle.recycle(this);
- }
+ handle.unguardedRecycle(this);
}
}
diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
index 4ad86136888..4786724dc0b 100644
--- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
+++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
@@ -2360,4 +2360,17 @@ private void shiftComps(int i, int count) {
}
componentCount = newSize;
}
+
+ /**
+ * Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
+ * count reaches at {@code 0}. At this point it will also decrement the reference count of each internal
+ * component by {@code 1}.
+ *
+ * @param decrement the number by which the reference count should be decreased
+ * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
+ */
+ @Override
+ public boolean release(final int decrement) {
+ return super.release(decrement);
+ }
}
diff --git a/buffer/src/main/java/io/netty/buffer/SizeClasses.java b/buffer/src/main/java/io/netty/buffer/SizeClasses.java
index b42d455d5e6..d1fa1389855 100644
--- a/buffer/src/main/java/io/netty/buffer/SizeClasses.java
+++ b/buffer/src/main/java/io/netty/buffer/SizeClasses.java
@@ -107,7 +107,7 @@ final class SizeClasses implements SizeClassesMetric {
private final int[] pageIdx2sizeTab;
- // lookup table for sizeIdx <= smallMaxSizeIdx
+ // lookup table for sizeIdx < nSizes
private final int[] sizeIdx2sizeTab;
// lookup table used for size <= lookupMaxClass
diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
index c32183fa707..a5f3675ba66 100644
--- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
@@ -17,6 +17,7 @@
import io.netty.util.internal.PlatformDependent;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
@@ -26,6 +27,7 @@
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.abort;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -196,6 +198,27 @@ public void shouldReuseChunks() throws Exception {
.isLessThan(8 * 1024 * 1024);
}
+ @Test
+ public void testCapacityNotGreaterThanMaxCapacity() {
+ testCapacityNotGreaterThanMaxCapacity(true);
+ testCapacityNotGreaterThanMaxCapacity(false);
+ }
+
+ private void testCapacityNotGreaterThanMaxCapacity(boolean preferDirect) {
+ final int maxSize = 100000;
+ final ByteBuf buf = newAllocator(preferDirect).newDirectBuffer(maxSize, maxSize);
+ try {
+ assertThrows(IllegalArgumentException.class, new Executable() {
+ @Override
+ public void execute() throws Throwable {
+ buf.capacity(maxSize + 1);
+ }
+ });
+ } finally {
+ buf.release();
+ }
+ }
+
protected long expectedUsedMemory(T allocator, int capacity) {
return capacity;
}
diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
index 58a4ae82e75..d8ff780f517 100644
--- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
@@ -57,6 +57,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -74,7 +75,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -2290,7 +2290,7 @@ public void testToString() {
}
@Test
- @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ @Timeout(30)
public void testToStringMultipleThreads() throws Throwable {
buffer.clear();
buffer.writeBytes("Hello, World!".getBytes(CharsetUtil.ISO_8859_1));
@@ -2300,7 +2300,7 @@ public void testToStringMultipleThreads() throws Throwable {
static void testToStringMultipleThreads0(final ByteBuf buffer) throws Throwable {
final String expected = buffer.toString(CharsetUtil.ISO_8859_1);
- final AtomicInteger counter = new AtomicInteger(30000);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
final AtomicReference errorRef = new AtomicReference();
List threads = new ArrayList();
for (int i = 0; i < 10; i++) {
@@ -2308,11 +2308,15 @@ static void testToStringMultipleThreads0(final ByteBuf buffer) throws Throwable
@Override
public void run() {
try {
- while (errorRef.get() == null && counter.decrementAndGet() > 0) {
+ startBarrier.await(10, TimeUnit.SECONDS);
+ int counter = 3000;
+ while (errorRef.get() == null && counter-- > 0) {
assertEquals(expected, buffer.toString(CharsetUtil.ISO_8859_1));
}
} catch (Throwable cause) {
- errorRef.compareAndSet(null, cause);
+ if (!errorRef.compareAndSet(null, cause)) {
+ ThrowableUtil.addSuppressed(errorRef.get(), cause);
+ }
}
}
});
@@ -2322,13 +2326,27 @@ public void run() {
thread.start();
}
- for (Thread thread : threads) {
- thread.join();
- }
+ joinAllAndReportErrors(threads, errorRef);
+ }
- Throwable error = errorRef.get();
- if (error != null) {
- throw error;
+ private static void joinAllAndReportErrors(List threads, AtomicReference errorRef)
+ throws Throwable {
+ try {
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ Throwable error = errorRef.get();
+ if (error != null) {
+ throw error;
+ }
+ } catch (Throwable e) {
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(thread, e);
+ }
+ }
+ throw e;
}
}
@@ -2345,7 +2363,7 @@ public void testCopyMultipleThreads0() throws Throwable {
static void testCopyMultipleThreads0(final ByteBuf buffer) throws Throwable {
final ByteBuf expected = buffer.copy();
try {
- final AtomicInteger counter = new AtomicInteger(30000);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
final AtomicReference errorRef = new AtomicReference();
List threads = new ArrayList();
for (int i = 0; i < 10; i++) {
@@ -2353,7 +2371,9 @@ static void testCopyMultipleThreads0(final ByteBuf buffer) throws Throwable {
@Override
public void run() {
try {
- while (errorRef.get() == null && counter.decrementAndGet() > 0) {
+ startBarrier.await(10, TimeUnit.SECONDS);
+ int counter = 3000;
+ while (errorRef.get() == null && counter-- > 0) {
ByteBuf copy = buffer.copy();
try {
assertEquals(expected, copy);
@@ -2372,14 +2392,7 @@ public void run() {
thread.start();
}
- for (Thread thread : threads) {
- thread.join();
- }
-
- Throwable error = errorRef.get();
- if (error != null) {
- throw error;
- }
+ joinAllAndReportErrors(threads, errorRef);
} finally {
expected.release();
}
@@ -2879,43 +2892,54 @@ public void testSliceBytesInArrayMultipleThreads() throws Exception {
static void testBytesInArrayMultipleThreads(
final ByteBuf buffer, final byte[] expectedBytes, final boolean slice) throws Exception {
- final AtomicReference cause = new AtomicReference();
- final CountDownLatch latch = new CountDownLatch(60000);
- final CyclicBarrier barrier = new CyclicBarrier(11);
- for (int i = 0; i < 10; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (cause.get() == null && latch.getCount() > 0) {
- ByteBuf buf;
- if (slice) {
- buf = buffer.slice();
- } else {
- buf = buffer.duplicate();
- }
-
- byte[] array = new byte[8];
- buf.readBytes(array);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
+ final CyclicBarrier endBarrier = new CyclicBarrier(11);
+ Callable callable = new Callable() {
+ @Override
+ public Void call() throws Exception {
+ startBarrier.await();
+ for (int i = 0; i < 6000; i++) {
+ ByteBuf buf;
+ if (slice) {
+ buf = buffer.slice();
+ } else {
+ buf = buffer.duplicate();
+ }
- assertArrayEquals(expectedBytes, array);
+ byte[] array = new byte[8];
+ buf.readBytes(array);
- Arrays.fill(array, (byte) 0);
- buf.getBytes(0, array);
- assertArrayEquals(expectedBytes, array);
+ assertArrayEquals(expectedBytes, array);
- latch.countDown();
- }
- try {
- barrier.await();
- } catch (Exception e) {
- // ignore
- }
+ Arrays.fill(array, (byte) 0);
+ buf.getBytes(0, array);
+ assertArrayEquals(expectedBytes, array);
}
- }).start();
+ endBarrier.await();
+ return null;
+ }
+ };
+ List> tasks = new ArrayList>();
+ for (int i = 0; i < 10; i++) {
+ FutureTask task = new FutureTask(callable);
+ new Thread(task).start();
+ tasks.add(task);
+ }
+ try {
+ endBarrier.await(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ for (FutureTask task : tasks) {
+ try {
+ task.get(100, TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
+ }
+ throw e;
+ }
+ for (FutureTask task : tasks) {
+ task.get(1, TimeUnit.SECONDS);
}
- latch.await(10, TimeUnit.SECONDS);
- barrier.await(5, TimeUnit.SECONDS);
- assertNull(cause.get());
}
public static Object[][] setCharSequenceCombinations() {
diff --git a/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
index 448930a3189..4c212410d88 100644
--- a/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
@@ -16,10 +16,17 @@
package io.netty.buffer;
import io.netty.util.NettyRuntime;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+
+import java.lang.reflect.Array;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.SplittableRandom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
@@ -111,24 +118,29 @@ public void testUsedHeapMemory() {
@Test
void adaptiveChunkMustDeallocateOrReuseWthBufferRelease() throws Exception {
AdaptiveByteBufAllocator allocator = newAllocator(false);
- ByteBuf a = allocator.heapBuffer(28 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- ByteBuf b = allocator.heapBuffer(100 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- b.release();
- a.release();
- assertEquals(262144, allocator.usedHeapMemory());
- a = allocator.heapBuffer(28 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- b = allocator.heapBuffer(100 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- a.release();
- ByteBuf c = allocator.heapBuffer(28 * 1024);
- assertEquals(2 * 262144, allocator.usedHeapMemory());
- c.release();
- assertEquals(2 * 262144, allocator.usedHeapMemory());
- b.release();
- assertEquals(2 * 262144, allocator.usedHeapMemory());
+ Deque bufs = new ArrayDeque();
+ assertEquals(0, allocator.usedHeapMemory());
+ assertEquals(0, allocator.usedHeapMemory());
+ bufs.add(allocator.heapBuffer(256));
+ long usedHeapMemory = allocator.usedHeapMemory();
+ int buffersPerChunk = Math.toIntExact(usedHeapMemory / 256);
+ for (int i = 0; i < buffersPerChunk; i++) {
+ bufs.add(allocator.heapBuffer(256));
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ bufs.pop().release();
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ while (!bufs.isEmpty()) {
+ bufs.pop().release();
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ for (int i = 0; i < 2 * buffersPerChunk; i++) {
+ bufs.add(allocator.heapBuffer(256));
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ while (!bufs.isEmpty()) {
+ bufs.pop().release();
+ }
}
@ParameterizedTest
@@ -198,4 +210,71 @@ public void run() {
fail("Expected no exception, but got", throwable);
}
}
+
+ @RepeatedTest(100)
+ void buddyAllocationConsistency(RepetitionInfo info) {
+ SplittableRandom rng = new SplittableRandom(info.getCurrentRepetition());
+ AdaptiveByteBufAllocator allocator = newAllocator(true);
+ int small = 32768;
+ int large = 2 * small;
+ int xlarge = 2 * large;
+
+ int[] allocationSizes = {
+ small, small, small, small, small, small, small, small,
+ large, large, large, large,
+ xlarge, xlarge,
+ };
+
+ shuffle(rng, allocationSizes);
+
+ ByteBuf[] bufs = new ByteBuf[allocationSizes.length];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = allocator.buffer(allocationSizes[i], allocationSizes[i]);
+ }
+
+ shuffle(rng, bufs);
+
+ int[] reallocations = new int[bufs.length / 2];
+ for (int i = 0; i < reallocations.length; i++) {
+ reallocations[i] = bufs[i].capacity();
+ bufs[i].release();
+ bufs[i] = null;
+ }
+ for (int i = 0; i < reallocations.length; i++) {
+ assertNull(bufs[i]);
+ bufs[i] = allocator.buffer(reallocations[i], reallocations[i]);
+ }
+
+ for (int i = 0; i < bufs.length; i++) {
+ while (bufs[i].isWritable()) {
+ bufs[i].writeByte(i + 1);
+ }
+ }
+ try {
+ for (int i = 0; i < bufs.length; i++) {
+ while (bufs[i].isReadable()) {
+ int b = Byte.toUnsignedInt(bufs[i].readByte());
+ if (b != i + 1) {
+ fail("Expected byte " + (i + 1) +
+ " at index " + (bufs[i].readerIndex() - 1) +
+ " but got " + b);
+ }
+ }
+ }
+ } finally {
+ for (ByteBuf buf : bufs) {
+ buf.release();
+ }
+ }
+ }
+
+ private static void shuffle(SplittableRandom rng, Object array) {
+ int len = Array.getLength(array);
+ for (int i = 0; i < len; i++) {
+ int n = rng.nextInt(i, len);
+ Object value = Array.get(array, i);
+ Array.set(array, i, Array.get(array, n));
+ Array.set(array, n, value);
+ }
+ }
}
diff --git a/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
index ab47050c641..4a4c28deebf 100644
--- a/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
@@ -15,52 +15,11 @@
*/
package io.netty.buffer;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.function.Supplier;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
-class AdaptivePoolingAllocatorTest implements Supplier {
- private int i;
-
- @BeforeEach
- void setUp() {
- i = 0;
- }
-
- @Override
- public String get() {
- return "i = " + i;
- }
-
- @Test
- void sizeBucketComputations() throws Exception {
- assertSizeBucket(0, 16 * 1024);
- assertSizeBucket(1, 24 * 1024);
- assertSizeBucket(2, 32 * 1024);
- assertSizeBucket(3, 48 * 1024);
- assertSizeBucket(4, 64 * 1024);
- assertSizeBucket(5, 96 * 1024);
- assertSizeBucket(6, 128 * 1024);
- assertSizeBucket(7, 192 * 1024);
- assertSizeBucket(8, 256 * 1024);
- assertSizeBucket(9, 384 * 1024);
- assertSizeBucket(10, 512 * 1024);
- assertSizeBucket(11, 768 * 1024);
- assertSizeBucket(12, 1024 * 1024);
- assertSizeBucket(13, 1792 * 1024);
- assertSizeBucket(14, 2048 * 1024);
- assertSizeBucket(15, 3072 * 1024);
- // The sizeBucket function will be used for sizes up to 8 MiB
- assertSizeBucket(15, 4 * 1024 * 1024);
- assertSizeBucket(15, 5 * 1024 * 1024);
- assertSizeBucket(15, 6 * 1024 * 1024);
- assertSizeBucket(15, 7 * 1024 * 1024);
- assertSizeBucket(15, 8 * 1024 * 1024);
- }
-
+class AdaptivePoolingAllocatorTest {
@Test
void sizeClassComputations() throws Exception {
final int[] sizeClasses = AdaptivePoolingAllocator.getSizeClasses();
@@ -75,20 +34,7 @@ void sizeClassComputations() throws Exception {
private static void assertSizeClassOf(int expectedSizeClass, int previousSizeIncluded, int maxSizeIncluded) {
for (int size = previousSizeIncluded; size <= maxSizeIncluded; size++) {
- final int sizeToTest = size;
- Supplier messageSupplier = new Supplier() {
- @Override
- public String get() {
- return "size = " + sizeToTest;
- }
- };
- assertEquals(expectedSizeClass, AdaptivePoolingAllocator.sizeClassIndexOf(size), messageSupplier);
- }
- }
-
- private void assertSizeBucket(int expectedSizeBucket, int maxSizeIncluded) {
- for (; i <= maxSizeIncluded; i++) {
- assertEquals(expectedSizeBucket, AdaptivePoolingAllocator.sizeToBucket(i), this);
+ assertEquals(expectedSizeClass, AdaptivePoolingAllocator.sizeClassIndexOf(size), "size = " + size);
}
}
}
diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
index ecc01065210..64638f8e1cb 100644
--- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
@@ -20,6 +20,7 @@
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
+import io.netty.util.internal.ThrowableUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -30,6 +31,7 @@
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -349,13 +351,13 @@ public void testAllocateSmallOffset() {
}
@Test
- @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ @Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException {
testThreadCacheDestroyed(false);
}
@Test
- @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ @Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException {
testThreadCacheDestroyed(true);
}
@@ -408,7 +410,6 @@ public void run() {
while (allocator.metric().numThreadLocalCaches() > 0) {
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
System.gc();
- System.runFinalization();
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
}
@@ -416,8 +417,8 @@ public void run() {
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testNumThreadCachesWithNoDirectArenas() throws InterruptedException {
+ @Timeout(10)
+ public void testNumThreadCachesWithNoDirectArenas() throws Exception {
int numHeapArenas = 1;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1);
@@ -436,11 +437,11 @@ public void testNumThreadCachesWithNoDirectArenas() throws InterruptedException
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testNumThreadCachesAccountForDirectAndHeapArenas() throws InterruptedException {
- int numHeapArenas = 1;
+ @Timeout(10)
+ public void testNumThreadCachesAccountForDirectAndHeapArenas() throws Exception {
+ int numArenas = 1;
final PooledByteBufAllocator allocator =
- new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1);
+ new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
ThreadCache tcache0 = createNewThreadCache(allocator, false);
assertEquals(1, allocator.metric().numThreadLocalCaches());
@@ -456,8 +457,8 @@ public void testNumThreadCachesAccountForDirectAndHeapArenas() throws Interrupte
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testThreadCacheToArenaMappings() throws InterruptedException {
+ @Timeout(10)
+ public void testThreadCacheToArenaMappings() throws Exception {
int numArenas = 2;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
@@ -500,8 +501,7 @@ private static ThreadCache createNewThreadCache(final PooledByteBufAllocator all
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch cacheLatch = new CountDownLatch(1);
- final Thread t = new FastThreadLocalThread(new Runnable() {
-
+ final FutureTask task = new FutureTask(new Runnable() {
@Override
public void run() {
final ByteBuf buf;
@@ -527,23 +527,35 @@ public void run() {
FastThreadLocal.removeAll();
}
- });
+ }, null);
+ final Thread t = new FastThreadLocalThread(task);
t.start();
// Wait until we allocated a buffer and so be sure the thread was started and the cache exists.
- cacheLatch.await();
+ try {
+ cacheLatch.await();
+ } catch (InterruptedException e) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(t, e);
+ throw e;
+ }
return new ThreadCache() {
@Override
- public void destroy() throws InterruptedException {
+ public void destroy() throws Exception {
latch.countDown();
- t.join();
+ try {
+ task.get();
+ t.join();
+ } catch (InterruptedException e) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(t, e);
+ throw e;
+ }
}
};
}
private interface ThreadCache {
- void destroy() throws InterruptedException;
+ void destroy() throws Exception;
}
@Test
diff --git a/buffer/src/test/java/io/netty/buffer/UnpooledTest.java b/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
index efc1dafd1ed..fd705f1bed0 100644
--- a/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
+++ b/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
@@ -476,7 +476,7 @@ public void testUnmodifiableBuffer() throws Exception {
} catch (UnsupportedOperationException e) {
// Expected
}
- Mockito.verifyZeroInteractions(inputStream);
+ Mockito.verifyNoInteractions(inputStream);
ScatteringByteChannel scatteringByteChannel = Mockito.mock(ScatteringByteChannel.class);
try {
@@ -485,7 +485,7 @@ public void testUnmodifiableBuffer() throws Exception {
} catch (UnsupportedOperationException e) {
// Expected
}
- Mockito.verifyZeroInteractions(scatteringByteChannel);
+ Mockito.verifyNoInteractions(scatteringByteChannel);
buf.release();
}
diff --git a/codec-dns/pom.xml b/codec-dns/pom.xml
index 39846136332..00ed2a59064 100644
--- a/codec-dns/pom.xml
+++ b/codec-dns/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-dns
diff --git a/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java b/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
index 2aea39159fe..80cf862ab6a 100644
--- a/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
+++ b/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
@@ -16,6 +16,8 @@
package io.netty.handler.codec.dns;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.CorruptedFrameException;
/**
* The default {@link DnsRecordDecoder} implementation.
@@ -99,6 +101,30 @@ protected DnsRecord decodeRecord(
DnsCodecUtil.decompressDomainName(
in.duplicate().setIndex(offset, offset + length)));
}
+ if (type == DnsRecordType.MX) {
+ // MX RDATA: 16-bit preference + exchange (domain name, possibly compressed)
+ if (length < 3) {
+ throw new CorruptedFrameException("MX record RDATA is too short: " + length);
+ }
+ final int pref = in.getUnsignedShort(offset);
+ ByteBuf exchange = null;
+ try {
+ exchange = DnsCodecUtil.decompressDomainName(
+ in.duplicate().setIndex(offset + 2, offset + length));
+
+ // Build decompressed RDATA = [preference][expanded exchange name]
+ final ByteBuf out = in.alloc().buffer(2 + exchange.readableBytes());
+ out.writeShort(pref);
+ out.writeBytes(exchange);
+
+ return new DefaultDnsRawRecord(name, type, dnsClass, timeToLive, out);
+ } finally {
+ if (exchange != null) {
+ exchange.release();
+ }
+ }
+ }
+
return new DefaultDnsRawRecord(
name, type, dnsClass, timeToLive, in.retainedDuplicate().setIndex(offset, offset + length));
}
diff --git a/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java b/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
index a8379f6d8d7..d66b994b604 100644
--- a/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
+++ b/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
@@ -166,6 +166,51 @@ public void testDecodeCompressionRDataPointer() throws Exception {
}
}
+ @Test
+ public void testDecodeCompressionRDataPointerMX() throws Exception {
+ DefaultDnsRecordDecoder decoder = new DefaultDnsRecordDecoder();
+ byte[] compressionPointer = {
+ 5, 'n', 'e', 't', 't', 'y', 2, 'i', 'o', 0,
+ 0, 10, // preference = 10
+ (byte) 0xC0, 0 // record is a pointer to netty.io
+ };
+
+ byte[] expected = {
+ 0, 10, // pref = 10
+ 5, 'n', 'e', 't', 't', 'y', 2, 'i', 'o', 0
+ };
+ ByteBuf buffer = Unpooled.wrappedBuffer(compressionPointer);
+ DefaultDnsRawRecord mxRecord = null;
+ ByteBuf expectedBuf = null;
+ try {
+ mxRecord = (DefaultDnsRawRecord) decoder.decodeRecord(
+ "mail.example.com",
+ DnsRecordType.MX,
+ DnsRecord.CLASS_IN,
+ 60,
+ buffer,
+ 10,
+ 4);
+
+ expectedBuf = Unpooled.wrappedBuffer(expected);
+
+ assertEquals(0, ByteBufUtil.compare(expectedBuf, mxRecord.content()),
+ "The rdata of MX-type record should be decompressed in advance");
+ assertEquals(10, mxRecord.content().getUnsignedShort(0));
+
+ ByteBuf exchangerName = mxRecord.content().duplicate().setIndex(2, mxRecord.content().writerIndex());
+ assertEquals("netty.io.", DnsCodecUtil.decodeDomainName(exchangerName));
+ } finally {
+ buffer.release();
+ if (expectedBuf != null) {
+ expectedBuf.release();
+ }
+ if (mxRecord != null) {
+ mxRecord.release();
+ }
+ }
+ }
+
@Test
public void testDecodeMessageCompression() throws Exception {
// See https://www.ietf.org/rfc/rfc1035 [4.1.4. Message compression]
diff --git a/codec-haproxy/pom.xml b/codec-haproxy/pom.xml
index b7a0e7202a1..139ef90e84d 100644
--- a/codec-haproxy/pom.xml
+++ b/codec-haproxy/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-haproxy
diff --git a/codec-http/pom.xml b/codec-http/pom.xml
index 9d37046e74e..0c6b82ac520 100644
--- a/codec-http/pom.xml
+++ b/codec-http/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-http
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
index 3cd8d0c6985..a4762516846 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
@@ -92,7 +92,15 @@ public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String
*/
public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri,
ByteBuf content, HttpHeaders headers, HttpHeaders trailingHeader) {
- super(httpVersion, method, uri, headers);
+ this(httpVersion, method, uri, content, headers, trailingHeader, true);
+ }
+
+ /**
+ * Create a full HTTP response with the given HTTP version, method, URI, contents, and header and trailer objects.
+ */
+ public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri,
+ ByteBuf content, HttpHeaders headers, HttpHeaders trailingHeader, boolean validateRequestLine) {
+ super(httpVersion, method, uri, headers, validateRequestLine);
this.content = checkNotNull(content, "content");
this.trailingHeader = checkNotNull(trailingHeader, "trailingHeader");
}
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
index 271b6069a02..437598503e6 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
@@ -75,9 +75,25 @@ public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri
* @param headers the Headers for this Request
*/
public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri, HttpHeaders headers) {
+ this(httpVersion, method, uri, headers, true);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param httpVersion the HTTP version of the request
+ * @param method the HTTP method of the request
+ * @param uri the URI or path of the request
+ * @param headers the Headers for this Request
+ */
+ public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri, HttpHeaders headers,
+ boolean validateRequestLine) {
super(httpVersion, headers);
this.method = checkNotNull(method, "method");
this.uri = checkNotNull(uri, "uri");
+ if (validateRequestLine) {
+ HttpUtil.validateRequestLineTokens(method, uri);
+ }
}
@Override
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java
new file mode 100644
index 00000000000..6839ce8d8db
--- /dev/null
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2026 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.http;
+
+import io.netty.util.ByteProcessor;
+
+import java.util.BitSet;
+
+/**
+ * Validates the chunk start line. That is, the chunk size and chunk extensions, until the CR LF pair.
+ * See RFC 9112 section 7.1.
+ *
+ *