diff --git a/.changeset/big-points-happen.md b/.changeset/big-points-happen.md
new file mode 100644
index 00000000..0e2741fe
--- /dev/null
+++ b/.changeset/big-points-happen.md
@@ -0,0 +1,5 @@
+---
+"client-sdk-android": patch
+---
+
+Fix Room.connect not properly throwing ConnectException for websocket connection failures during Room.join()
diff --git a/.changeset/khaki-trains-roll.md b/.changeset/khaki-trains-roll.md
new file mode 100644
index 00000000..389deeba
--- /dev/null
+++ b/.changeset/khaki-trains-roll.md
@@ -0,0 +1,5 @@
+---
+"client-sdk-android": patch
+---
+
+Fix reconnect potentially getting cancelled by websocket failure
diff --git a/livekit-android-sdk/detekt-baseline-release.xml b/livekit-android-sdk/detekt-baseline-release.xml
index 89e10ca0..913ef5b2 100644
--- a/livekit-android-sdk/detekt-baseline-release.xml
+++ b/livekit-android-sdk/detekt-baseline-release.xml
@@ -26,6 +26,7 @@
CyclomaticComplexMethod:RTCEngine.kt$RTCEngine$private fun makeRTCConfig( serverResponse: Either<JoinResponse, ReconnectResponse>, connectOptions: ConnectOptions, ): RTCConfiguration
CyclomaticComplexMethod:Room.kt$Room$@Throws(Exception::class) suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions())
CyclomaticComplexMethod:RoomEvent.kt$fun LivekitModels.DisconnectReason?.convert(): DisconnectReason
+ CyclomaticComplexMethod:SignalClient.kt$SignalClient$override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?)
CyclomaticComplexMethod:SignalClient.kt$SignalClient$private fun handleSignalResponseImpl(ws: WebSocket, response: LivekitRtc.SignalResponse)
EmptyFunctionBlock:RTCEngine.kt$RTCEngine${ }
HasPlatformType:DataChannelManager.kt$DataChannelManager$@get:FlowObservable var state by flowDelegate(dataChannel.state()) private set
diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt
index 70e7434e..65c0cc50 100644
--- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt
+++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt
@@ -62,6 +62,7 @@ import java.util.Date
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
+import kotlin.coroutines.resumeWithException
/**
* SignalClient to LiveKit WS servers
@@ -192,8 +193,8 @@ constructor(
// If the coroutine is cancelled, websocket needs to be cancelled.
// onFailure will handle cleanup.
LKLog.v { "connect cancelled, abort websocket" }
- currentWs?.cancel()
joinContinuation = null
+ currentWs?.cancel()
}
currentWs = websocketFactory.newWebSocket(request, this@SignalClient)
}
@@ -350,25 +351,39 @@ constructor(
}
val wasConnected = isConnected
- val wasConnecting = joinContinuation != null
+ val wasReconnectHandshake = joinContinuation != null && isReconnecting
+
+ // Clear joinContinuation before cont.cancel() so a synchronous nested onFailure (e.g. from
+ // WebSocket.cancel) does not treat this as an in-flight join/reconnect handshake.
+ val joinCont = joinContinuation
+ joinContinuation = null
if (reason != null) {
LKLog.e(t) { "websocket failure: $reason" }
- val error = Exception(reason)
+ val error = if (joinCont != null) {
+ RoomException.ConnectException(reason, t)
+ } else {
+ Exception(reason)
+ }
listener?.onError(error)
- joinContinuation?.cancel(error)
+ joinCont?.resumeWithException(error)
} else {
LKLog.e(t) { "websocket failure: $response" }
- listener?.onError(t)
- joinContinuation?.cancel(t)
+ val error = if (joinCont != null) {
+ RoomException.ConnectException(t.localizedMessage, t)
+ } else {
+ t
+ }
+ listener?.onError(error)
+ joinCont?.resumeWithException(error)
}
- joinContinuation = null
- if (wasConnected || wasConnecting) {
+ if (wasConnected || wasReconnectHandshake) {
// onClosing/onClosed will not be called after onFailure.
// Handle websocket closure here.
- // Also handle the case where failure occurs during a reconnect attempt (wasConnecting),
- // where isConnected is already false but the upper layer still needs to be notified.
+ // Also handle failure during a soft reconnect handshake (reconnect query): isConnected is
+ // still false but the upper layer should be notified like a close. Initial join handshake
+ // failures do not call onClose so RTCEngine can surface them via onError / onFailToConnect.
handleWebSocketClose(
reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure",
code = response?.code ?: CLOSE_REASON_WEBSOCKET_FAILURE,
@@ -886,8 +901,10 @@ constructor(
pongJob = null
currentWs?.close(code, reason)
currentWs = null
- joinContinuation?.cancel()
+ // Same ordering as [onFailure]: clear the field before cancel() for synchronous listener callbacks.
+ val joinCont = joinContinuation
joinContinuation = null
+ joinCont?.cancel()
if (shouldClearQueuedRequests) {
requestFlow.resetReplayCache()
}
diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt
index c13e5996..3b1aa84e 100644
--- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt
+++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2023-2025 LiveKit, Inc.
+ * Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,6 +36,9 @@ class MockWebSocket(
get() = mutableSentRequests
override fun cancel() {
+ if (isClosed) {
+ return
+ }
isClosed = true
listener.onFailure(this, IOException("cancelled"), null)
}
diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt
index 87ed6ff4..7c3acca0 100644
--- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt
+++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt
@@ -33,6 +33,8 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import livekit.LivekitModels
@@ -81,6 +83,38 @@ class RTCEngineMockE2ETest : MockE2ETest() {
assertEquals(sentIceServers, subPeerConnection.rtcConfig.iceServers)
}
+ @Test
+ fun roomConnectDoesNotHangOnWebSocketFailure() = runTest {
+ val connectJob = async {
+ try {
+ room.connect(
+ url = TestData.EXAMPLE_URL,
+ token = "token",
+ )
+ null
+ } catch (e: Throwable) {
+ e
+ }
+ }
+
+ room::state.flow
+ .takeWhile { it != Room.State.CONNECTING }
+ .collect()
+ runCurrent()
+
+ wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
+ // simulate websocket failure
+ wsFactory.ws.cancel()
+ advanceUntilIdle()
+
+ val connectResult = connectJob.await()
+ assertTrue("connect job should fail on websocket cancel", connectResult != null)
+ assertTrue(
+ "Expected RoomException.ConnectException, got $connectResult",
+ connectResult is RoomException.ConnectException,
+ )
+ }
+
@Test
fun iceSubscriberConnect() = runTest {
connect()
diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt
index de50fb7c..262def2f 100644
--- a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt
+++ b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2023-2025 LiveKit, Inc.
+ * Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,8 @@ import io.livekit.android.test.util.toPBByteString
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
+import kotlinx.coroutines.supervisorScope
+import kotlinx.coroutines.yield
import kotlinx.serialization.json.Json
import livekit.LivekitRtc
import livekit.org.webrtc.SessionDescription
@@ -45,9 +47,11 @@ import org.junit.Before
import org.junit.Test
import org.mockito.Mock
import org.mockito.Mockito
+import org.mockito.Mockito.clearInvocations
import org.mockito.Mockito.inOrder
import org.mockito.kotlin.any
import org.mockito.kotlin.argThat
+import org.mockito.kotlin.never
import org.mockito.kotlin.times
@ExperimentalCoroutinesApi
@@ -146,7 +150,7 @@ class SignalClientTest : BaseTest() {
}
}
- client.onFailure(wsFactory.ws, Exception(), null)
+ wsFactory.ws.cancel()
job.await()
assertTrue(failed)
@@ -195,12 +199,86 @@ class SignalClientTest : BaseTest() {
connectWebsocketAndJoin()
job.await()
- client.onFailure(wsFactory.ws, Exception(), null)
+ wsFactory.ws.cancel()
Mockito.verify(listener)
.onClose(any(), any())
}
+ /**
+ * Cold [join] handshake failure: [isConnected] is still false and this is not a reconnect attempt,
+ * so the listener must not get [SignalClient.Listener.onClose] (RTCEngine uses [onError] while
+ * CONNECTING). OkHttp does not call [WebSocketListener.onClosed] after [WebSocketListener.onFailure].
+ */
+ @Test
+ fun onCloseNotInvokedWhenJoinHandshakeFailsBeforeJoinMessage() = runTest {
+ supervisorScope {
+ val job = async {
+ client.join(EXAMPLE_URL, "")
+ }
+ yield()
+ client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
+ wsFactory.ws.cancel()
+
+ runCatching { job.await() }
+ }
+
+ Mockito.verify(listener, never()).onClose(any(), any())
+ Mockito.verify(listener, times(1)).onError(any())
+ }
+
+ /**
+ * Mirrors [RTCEngine] soft reconnect, and then tests when the reconnect handshake fails before
+ * the reconnect message is delivered (no join response yet).
+ */
+ @Test
+ fun onCloseInvokedWhenReconnectHandshakeFailsBeforeReconnectMessage() = runTest {
+ val joinJob = async {
+ client.join(EXAMPLE_URL, "")
+ }
+ connectWebsocketAndJoin()
+ joinJob.await()
+
+ val connectedWs = wsFactory.ws
+ connectedWs.cancel()
+ Mockito.verify(listener).onClose(any(), any())
+
+ supervisorScope {
+ val reconnectJob = async {
+ client.reconnect(EXAMPLE_URL, "", "participant_sid")
+ }
+ yield()
+ clearInvocations(listener)
+
+ client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
+ wsFactory.ws.cancel()
+
+ runCatching { reconnectJob.await() }
+ }
+
+ Mockito.verify(listener).onClose(
+ argThat { reason: String -> reason.contains("cancelled") },
+ any(),
+ )
+ }
+
+ /**
+ * When the join coroutine is cancelled, [MockWebSocket.cancel] runs and delivers [onFailure].
+ * That path must not report a normal signal close to the listener (no duplicate reconnect churn).
+ */
+ @Test
+ fun onCloseNotInvokedWhenJoinJobCancelled() = runTest {
+ val job = async {
+ client.join(EXAMPLE_URL, "")
+ }
+ yield()
+ job.cancel()
+
+ runCatching { job.await() }
+
+ Mockito.verify(listener, never()).onClose(any(), any())
+ }
+
/**
* Ensure responses that come in before [SignalClient.onReadyForResponses] are queued.
*/