Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Http2ConnectionState {
private final AtomicBoolean draining = new AtomicBoolean(false);
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
private final Object pendingLock = new Object();
private volatile Object partitionKey;

public boolean tryAcquireStream() {
Expand All @@ -54,22 +55,29 @@ public boolean tryAcquireStream() {

public void releaseStream() {
activeStreams.decrementAndGet();
// Try to dequeue and run a pending opener
Runnable pending = pendingOpeners.poll();
if (pending != null) {
pending.run();
}
drainPendingOpeners();
}

public void addPendingOpener(Runnable opener) {
pendingOpeners.add(opener);
// Re-check in case a stream was released between the failed tryAcquire and this enqueue
if (tryAcquireStream()) {
Runnable dequeued = pendingOpeners.poll();
if (dequeued != null) {
dequeued.run();
synchronized (pendingLock) {
if (tryAcquireStream()) {
opener.run();
} else {
releaseStream();
pendingOpeners.add(opener);
}
}
}

private void drainPendingOpeners() {
synchronized (pendingLock) {
Runnable pending = pendingOpeners.poll();
if (pending != null) {
if (tryAcquireStream()) {
pending.run();
} else {
// Put it back — another releaseStream() will pick it up
pendingOpeners.offer(pending);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,16 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl

@Override
public void handleException(NettyResponseFuture<?> future, Throwable error) {
if (!future.isDone()) {
readFailed(future.channel(), future, error);
}
}

@Override
public void handleChannelInactive(NettyResponseFuture<?> future) {
if (!future.isDone()) {
readFailed(future.channel(), future,
new IOException("HTTP/2 stream channel closed unexpectedly"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T

// channelInactive might be called between isChannelValid and writeRequest
// so if we don't store the Future now, channelInactive won't perform
// handleUnexpectedClosedChannel
Channels.setAttribute(channel, future);
// handleUnexpectedClosedChannel.
// For HTTP/2 parent channels, skip this — each stream channel gets its own attribute
// in openHttp2Stream(), and setting it on the parent would corrupt multiplexed state.
if (!ChannelManager.isHttp2(channel)) {
Channels.setAttribute(channel, future);
}

if (Channels.isChannelActive(channel)) {
writeRequest(future, channel);
Expand Down Expand Up @@ -485,7 +489,6 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
*/
private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parentChannel) {
Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
Runnable openStream = () -> openHttp2Stream(future, parentChannel);

if (state != null && !state.tryAcquireStream()) {
if (state.isDraining()) {
Expand All @@ -495,13 +498,13 @@ private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parent
return;
}
// Queue for later when a stream slot opens up
state.addPendingOpener(openStream);
state.addPendingOpener(() -> openHttp2Stream(future, parentChannel, state));
return;
}
openStream.run();
openHttp2Stream(future, parentChannel, state);
}

private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel) {
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel, Http2ConnectionState state) {
new Http2StreamChannelBootstrap(parentChannel)
.handler(new ChannelInitializer<Http2StreamChannel>() {
@Override
Expand All @@ -519,6 +522,7 @@ protected void initChannel(Http2StreamChannel streamCh) {
Http2StreamChannel streamChannel = f.getNow();
channelManager.registerOpenChannel(streamChannel);
Channels.setAttribute(streamChannel, future);
Channels.setActiveToken(streamChannel);
future.attachChannel(streamChannel, false);
try {
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
Expand All @@ -541,6 +545,10 @@ protected void initChannel(Http2StreamChannel streamCh) {
abort(streamChannel, future, e);
}
} else {
// Stream channel was never opened — release the acquired stream slot
if (state != null) {
state.releaseStream();
}
abort(parentChannel, future, f.cause());
}
});
Expand Down
Loading
Loading