Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private LinkedBlockingDeque<CallRunner> queue;

// so we can calculate actual threshold to switch to LIFO under load
private int maxCapacity;
private volatile int softLimit;

// metrics (shared across all queues)
private LongAdder numGeneralCallsDropped;
Expand All @@ -71,27 +71,30 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean isOverloaded = new AtomicBoolean(false);

public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
this.maxCapacity = capacity;
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches,
int currentQueueLimit) {
this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay;
this.codelInterval = interval;
this.lifoThreshold = lifoThreshold;
this.numGeneralCallsDropped = numGeneralCallsDropped;
this.numLifoModeSwitches = numLifoModeSwitches;
this.softLimit = currentQueueLimit;
}

/**
* Update tunables.
* @param newCodelTargetDelay new CoDel target delay
* @param newCodelInterval new CoDel interval
* @param newLifoThreshold new Adaptive Lifo threshold
* @param currentQueueLimit new soft limit of queue
*/
public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
double newLifoThreshold) {
public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold,
int currentQueueLimit) {
this.codelTargetDelay = newCodelTargetDelay;
this.codelInterval = newCodelInterval;
this.lifoThreshold = newLifoThreshold;
this.softLimit = currentQueueLimit;
}

/**
Expand All @@ -104,7 +107,7 @@ public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
public CallRunner take() throws InterruptedException {
CallRunner cr;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.softLimit) > lifoThreshold) {
numLifoModeSwitches.increment();
cr = queue.takeLast();
} else {
Expand All @@ -124,7 +127,7 @@ public CallRunner poll() {
CallRunner cr;
boolean switched = false;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.softLimit) > lifoThreshold) {
// Only count once per switch.
if (!switched) {
switched = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";

public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;

Expand All @@ -107,7 +107,11 @@ public abstract class RpcExecutor {
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;

// this is soft limit of the queue, not size/capacity.
protected volatile int currentQueueLimit;
// While initializing we will use hard limit as the capacity of queue, it will let us dynamically
// change the queue limit
protected final int queueHardLimit;

private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<RpcHandler> handlers;
Expand Down Expand Up @@ -161,11 +165,13 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
}
currentQueueLimit = maxQueueLength;
queueHardLimit = Math.max(maxQueueLength, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);

if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs =
new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) };
new Object[] { queueHardLimit, new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
Expand All @@ -174,8 +180,8 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold =
conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueInitArgs = new Object[] { queueHardLimit, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches, currentQueueLimit };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
Expand All @@ -185,12 +191,12 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
throw new PluggableRpcQueueNotFound(
"Pluggable call queue failed to load and selected call" + " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueInitArgs = new Object[] { queueHardLimit, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
this.name += ".Fifo";
this.queueInitArgs = new Object[] { maxQueueLength };
this.queueInitArgs = new Object[] { queueHardLimit };
this.queueClass = LinkedBlockingQueue.class;
}

Expand Down Expand Up @@ -231,20 +237,7 @@ public Map<String, Long> getCallQueueSizeSummary() {
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}

// This method can only be called ONCE per executor instance.
// Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity)
// After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original
// soft limit.
// Multiple calls would incorrectly use the hard limit as the soft limit.
// As all the queues has same initArgs and queueClass, there should be no need to call this again.
protected void initializeQueues(final int numQueues) {
if (!queues.isEmpty()) {
throw new RuntimeException("Queues are already initialized");
}
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < numQueues; ++i) {
queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs));
}
Expand Down Expand Up @@ -466,7 +459,15 @@ public void resizeQueues(Configuration conf) {
}
}
final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit);
int newQueueLimit = conf.getInt(configKey, queueLimit);
if (newQueueLimit > queueHardLimit) {
LOG.warn(
"Requested soft limit {} exceeds queue hard limit/capacity {}. "
+ "A region server restart is required to grow the underlying queue.",
newQueueLimit, queueHardLimit);
newQueueLimit = currentQueueLimit;
}
currentQueueLimit = newQueueLimit;
}

public void onConfigurationChange(Configuration conf) {
Expand All @@ -479,8 +480,10 @@ public void onConfigurationChange(Configuration conf) {

for (BlockingQueue<CallRunner> queue : queues) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
// current queue Limit for executor is already updated as part of resizeQueues, we need to
// let codel queue also make aware of it
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
codelLifoThreshold);
codelLifoThreshold, currentQueueLimit);
} else if (queue instanceof ConfigurationObserver) {
((ConfigurationObserver) queue).onConfigurationChange(conf);
}
Expand Down
Loading