Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<version>5.14.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.17.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
55 changes: 40 additions & 15 deletions src/main/java/com/github/copilot/sdk/CopilotSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -121,6 +124,7 @@ public final class CopilotSession implements AutoCloseable {
private volatile EventErrorHandler eventErrorHandler;
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
private final ScheduledExecutorService timeoutScheduler;

/** Tracks whether this session instance has been terminated via close(). */
private volatile boolean isTerminated = false;
Expand Down Expand Up @@ -157,6 +161,13 @@ public final class CopilotSession implements AutoCloseable {
this.sessionId = sessionId;
this.rpc = rpc;
this.workspacePath = workspacePath;
var executor = new ScheduledThreadPoolExecutor(1, r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
executor.setRemoveOnCancelPolicy(true);
this.timeoutScheduler = executor;
}

/**
Expand Down Expand Up @@ -407,29 +418,41 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
return null;
});

// Set up timeout with daemon thread so it doesn't prevent JVM exit
var scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
scheduler.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
}
scheduler.shutdown();
}, timeoutMs, TimeUnit.MILLISECONDS);

var result = new CompletableFuture<AssistantMessageEvent>();

// Schedule timeout on the shared session-level scheduler.
// Per Javadoc, timeoutMs <= 0 means "no timeout".
ScheduledFuture<?> timeoutTask = null;
if (timeoutMs > 0) {
try {
timeoutTask = timeoutScheduler.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(
new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
}
}, timeoutMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
try {
subscription.close();
} catch (IOException closeEx) {
e.addSuppressed(closeEx);
}
result.completeExceptionally(e);
return result;
}
}

// When inner future completes, run cleanup and propagate to result
final ScheduledFuture<?> taskToCancel = timeoutTask;
future.whenComplete((r, ex) -> {
try {
subscription.close();
} catch (IOException e) {
LOG.log(Level.SEVERE, "Error closing subscription", e);
}
scheduler.shutdown();
if (taskToCancel != null) {
taskToCancel.cancel(false);
}
if (!result.isDone()) {
if (ex != null) {
result.completeExceptionally(ex);
Expand Down Expand Up @@ -1303,6 +1326,8 @@ public void close() {
isTerminated = true;
}

timeoutScheduler.shutdownNow();

try {
rpc.invoke("session.destroy", Map.of("sessionId", sessionId), Void.class).get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.json.MessageOptions;

/**
* Reproduces the race between {@code sendAndWait()} and {@code close()}.
* <p>
* If {@code close()} shuts down the timeout scheduler after
* {@code ensureNotTerminated()} passes but before
* {@code timeoutScheduler.schedule()} executes, the schedule call throws
* {@link RejectedExecutionException}. Without a fix the exception propagates
* uncaught, leaking the event subscription and leaving the returned future
* incomplete.
*/
public class SchedulerShutdownRaceTest {

@SuppressWarnings("unchecked")
@Test
void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception {
// Build a session via reflection (package-private constructor)
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
ctor.setAccessible(true);

// Mock JsonRpcClient so send() returns a pending future instead of NPE
var mockRpc = mock(JsonRpcClient.class);
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());

var session = ctor.newInstance("race-test", mockRpc, null);

// Shut down the scheduler without setting isTerminated,
// simulating the race window between ensureNotTerminated() and schedule()
var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler");
schedulerField.setAccessible(true);
var scheduler = (ScheduledExecutorService) schedulerField.get(session);
scheduler.shutdownNow();

// With the fix: sendAndWait returns a future that completes exceptionally.
// Without the fix: sendAndWait throws RejectedExecutionException directly.
CompletableFuture<?> result = session.sendAndWait(new MessageOptions().setPrompt("test"), 5000);

assertNotNull(result, "sendAndWait should return a future, not throw");
var ex = assertThrows(ExecutionException.class, () -> result.get(1, TimeUnit.SECONDS));
assertInstanceOf(RejectedExecutionException.class, ex.getCause());
}
}
146 changes: 146 additions & 0 deletions src/test/java/com/github/copilot/sdk/TimeoutEdgeCaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.json.MessageOptions;

/**
* Tests for timeout edge cases in {@link CopilotSession#sendAndWait}.
* <p>
* These tests prove two defects in the current per-call
* {@code ScheduledExecutorService} approach:
* <ol>
* <li>A timeout fires after {@code close()}, leaking a {@code TimeoutException}
* onto the returned future.</li>
* <li>Each {@code sendAndWait} call spawns a new OS thread (~1 MB stack),
* instead of reusing a shared scheduler thread.</li>
* </ol>
*/
public class TimeoutEdgeCaseTest {

/**
* Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that
* never complete. The reader thread blocks forever on the input stream, and
* writes go to a no-op output stream.
*/
private JsonRpcClient createHangingRpcClient() throws Exception {
InputStream blockingInput = new InputStream() {
@Override
public int read() throws IOException {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
return -1;
}
};
ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream();

var ctor = JsonRpcClient.class.getDeclaredConstructor(InputStream.class, java.io.OutputStream.class,
Socket.class, Process.class);
ctor.setAccessible(true);
return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null);
}

/**
* After {@code close()}, the future returned by {@code sendAndWait} must NOT be
* completed by a stale timeout.
* <p>
* Current buggy behavior: the per-call scheduler is not cancelled by
* {@code close()}, so its 2-second timeout fires during the 5-second
* {@code session.destroy} RPC wait, completing the future with
* {@code TimeoutException}.
* <p>
* Expected behavior after fix: {@code close()} cancels pending timeouts before
* the blocking RPC call, so the future remains incomplete.
*/
@Test
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
try (CopilotSession session = new CopilotSession("test-timeout-id", rpc)) {

CompletableFuture<AssistantMessageEvent> result = session
.sendAndWait(new MessageOptions().setPrompt("hello"), 2000);

assertFalse(result.isDone(), "Future should be pending before timeout fires");

// close() blocks up to 5s on session.destroy RPC. The 2s timeout
// fires during that window with the current per-call scheduler.
session.close();

assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. "
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
}
} finally {
rpc.close();
}
}

/**
* A shared scheduler should reuse a single thread across multiple
* {@code sendAndWait} calls, rather than spawning a new OS thread per call.
* <p>
* Current buggy behavior: two calls create two {@code sendAndWait-timeout}
* threads.
* <p>
* Expected behavior after fix: two calls still use only one scheduler thread.
*/
@Test
void testSendAndWaitReusesTimeoutThread() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
try (CopilotSession session = new CopilotSession("test-thread-count-id", rpc)) {

long baselineCount = countTimeoutThreads();

CompletableFuture<AssistantMessageEvent> result1 = session
.sendAndWait(new MessageOptions().setPrompt("hello1"), 30000);

Thread.sleep(100);
long afterFirst = countTimeoutThreads();
assertTrue(afterFirst >= baselineCount + 1,
"Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: "
+ baselineCount + ", after: " + afterFirst);

CompletableFuture<AssistantMessageEvent> result2 = session
.sendAndWait(new MessageOptions().setPrompt("hello2"), 30000);

Thread.sleep(100);
long afterSecond = countTimeoutThreads();
assertTrue(afterSecond == afterFirst,
"Shared scheduler should reuse the same thread — no new threads after second call. "
+ "After first: " + afterFirst + ", after second: " + afterSecond);

result1.cancel(true);
result2.cancel(true);
}
} finally {
rpc.close();
}
}

/**
* Counts the number of live threads whose name contains "sendAndWait-timeout".
*/
private long countTimeoutThreads() {
return Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("sendAndWait-timeout"))
.filter(Thread::isAlive).count();
}
}
47 changes: 47 additions & 0 deletions src/test/java/com/github/copilot/sdk/ZeroTimeoutContractTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.json.MessageOptions;

/**
* Verifies the documented contract that {@code timeoutMs <= 0} means "no
* timeout" in {@link CopilotSession#sendAndWait(MessageOptions, long)}.
*/
public class ZeroTimeoutContractTest {

@SuppressWarnings("unchecked")
@Test
void sendAndWaitWithZeroTimeoutShouldNotTimeOut() throws Exception {
// Build a session via reflection (package-private constructor)
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
ctor.setAccessible(true);

var mockRpc = mock(JsonRpcClient.class);
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());

var session = ctor.newInstance("zero-timeout-test", mockRpc, null);

// Per the Javadoc: timeoutMs of 0 means "no timeout".
// The future should NOT complete with TimeoutException.
CompletableFuture<AssistantMessageEvent> result = session.sendAndWait(new MessageOptions().setPrompt("test"),
0);

// Give the scheduler a chance to fire if it was (incorrectly) scheduled
Thread.sleep(200);

// The future should still be pending — not timed out
assertFalse(result.isDone(), "Future should not be done; timeoutMs=0 means no timeout per Javadoc");
}
}