From 4fa88e8c165f5ce950aaa3d10f196a7b4f4d702d Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 21:09:16 +0900 Subject: [PATCH 1/7] More fixes for SignalClient on websocket failure when reconnecting --- .../io/livekit/android/room/SignalClient.kt | 11 +-- .../livekit/android/room/SignalClientTest.kt | 78 +++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index 70e7434ef..e5b982848 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -192,8 +192,8 @@ constructor( // If the coroutine is cancelled, websocket needs to be cancelled. // onFailure will handle cleanup. LKLog.v { "connect cancelled, abort websocket" } - currentWs?.cancel() joinContinuation = null + currentWs?.cancel() } currentWs = websocketFactory.newWebSocket(request, this@SignalClient) } @@ -350,7 +350,7 @@ constructor( } val wasConnected = isConnected - val wasConnecting = joinContinuation != null + val wasReconnectHandshake = joinContinuation != null && isReconnecting if (reason != null) { LKLog.e(t) { "websocket failure: $reason" } @@ -364,11 +364,12 @@ constructor( } joinContinuation = null - if (wasConnected || wasConnecting) { + if (wasConnected || wasReconnectHandshake) { // onClosing/onClosed will not be called after onFailure. // Handle websocket closure here. - // Also handle the case where failure occurs during a reconnect attempt (wasConnecting), - // where isConnected is already false but the upper layer still needs to be notified. + // Also handle failure during a soft reconnect handshake (reconnect query): isConnected is + // still false but the upper layer should be notified like a close. Initial join handshake + // failures do not call onClose so RTCEngine can surface them via onError / onFailToConnect. handleWebSocketClose( reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure", code = response?.code ?: CLOSE_REASON_WEBSOCKET_FAILURE, diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt index de50fb7c7..b2c8b7ccf 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt @@ -30,6 +30,8 @@ import io.livekit.android.test.util.toPBByteString import io.livekit.android.util.toOkioByteString import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.async +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.yield import kotlinx.serialization.json.Json import livekit.LivekitRtc import livekit.org.webrtc.SessionDescription @@ -45,9 +47,11 @@ import org.junit.Before import org.junit.Test import org.mockito.Mock import org.mockito.Mockito +import org.mockito.Mockito.clearInvocations import org.mockito.Mockito.inOrder import org.mockito.kotlin.any import org.mockito.kotlin.argThat +import org.mockito.kotlin.never import org.mockito.kotlin.times @ExperimentalCoroutinesApi @@ -201,6 +205,80 @@ class SignalClientTest : BaseTest() { .onClose(any(), any()) } + /** + * Cold [join] handshake failure: [isConnected] is still false and this is not a reconnect attempt, + * so the listener must not get [SignalClient.Listener.onClose] (RTCEngine uses [onError] while + * CONNECTING). OkHttp does not call [WebSocketListener.onClosed] after [WebSocketListener.onFailure]. + */ + @Test + fun onCloseNotInvokedWhenJoinHandshakeFailsBeforeJoinMessage() = runTest { + supervisorScope { + val job = async { + client.join(EXAMPLE_URL, "") + } + yield() + client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + client.onFailure(wsFactory.ws, Exception("handshake failure"), null) + + runCatching { job.await() } + } + + Mockito.verify(listener, never()).onClose(any(), any()) + Mockito.verify(listener, times(1)).onError(any()) + } + + /** + * Mirrors [RTCEngine] soft reconnect, and then tests when the reconnect handshake fails before + * the reconnect message is delivered (no join response yet). + */ + @Test + fun onCloseInvokedWhenReconnectHandshakeFailsBeforeReconnectMessage() = runTest { + val joinJob = async { + client.join(EXAMPLE_URL, "") + } + connectWebsocketAndJoin() + joinJob.await() + + val connectedWs = wsFactory.ws + client.onFailure(connectedWs, Exception("initial connection lost"), null) + Mockito.verify(listener).onClose(any(), any()) + + supervisorScope { + val reconnectJob = async { + client.reconnect(EXAMPLE_URL, "", "participant_sid") + } + yield() + clearInvocations(listener) + + client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + client.onFailure(wsFactory.ws, Exception("reconnect handshake failure"), null) + + runCatching { reconnectJob.await() } + } + + Mockito.verify(listener).onClose( + argThat { reason: String -> reason.contains("reconnect handshake failure") }, + any(), + ) + } + + /** + * When the join coroutine is cancelled, [MockWebSocket.cancel] runs and delivers [onFailure]. + * That path must not report a normal signal close to the listener (no duplicate reconnect churn). + */ + @Test + fun onCloseNotInvokedWhenJoinJobCancelled() = runTest { + val job = async { + client.join(EXAMPLE_URL, "") + } + yield() + job.cancel() + + runCatching { job.await() } + + Mockito.verify(listener, never()).onClose(any(), any()) + } + /** * Ensure responses that come in before [SignalClient.onReadyForResponses] are queued. */ From 892b1ccef23b8e5afeca30842bf67e42afc4a289 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 22:08:50 +0900 Subject: [PATCH 2/7] Make MockWebSocket.cancel idempotent (like okttp websocket) --- .../main/java/io/livekit/android/test/mock/MockWebSocket.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt index c13e59965..c9d92305c 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt @@ -36,6 +36,9 @@ class MockWebSocket( get() = mutableSentRequests override fun cancel() { + if (isClosed) { + return + } isClosed = true listener.onFailure(this, IOException("cancelled"), null) } From 855df14dabfba319d0ca1dac6840557686baeb59 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 22:10:02 +0900 Subject: [PATCH 3/7] Signal client tests --- .../android/room/RTCEngineMockE2ETest.kt | 34 +++++++++++++++++++ .../livekit/android/room/SignalClientTest.kt | 12 +++---- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index 87ed6ff4e..7c3acca09 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -33,6 +33,8 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runCurrent import livekit.LivekitModels @@ -81,6 +83,38 @@ class RTCEngineMockE2ETest : MockE2ETest() { assertEquals(sentIceServers, subPeerConnection.rtcConfig.iceServers) } + @Test + fun roomConnectDoesNotHangOnWebSocketFailure() = runTest { + val connectJob = async { + try { + room.connect( + url = TestData.EXAMPLE_URL, + token = "token", + ) + null + } catch (e: Throwable) { + e + } + } + + room::state.flow + .takeWhile { it != Room.State.CONNECTING } + .collect() + runCurrent() + + wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + // simulate websocket failure + wsFactory.ws.cancel() + advanceUntilIdle() + + val connectResult = connectJob.await() + assertTrue("connect job should fail on websocket cancel", connectResult != null) + assertTrue( + "Expected RoomException.ConnectException, got $connectResult", + connectResult is RoomException.ConnectException, + ) + } + @Test fun iceSubscriberConnect() = runTest { connect() diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt index b2c8b7ccf..aa2abfb63 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt @@ -150,7 +150,7 @@ class SignalClientTest : BaseTest() { } } - client.onFailure(wsFactory.ws, Exception(), null) + wsFactory.ws.cancel() job.await() assertTrue(failed) @@ -199,7 +199,7 @@ class SignalClientTest : BaseTest() { connectWebsocketAndJoin() job.await() - client.onFailure(wsFactory.ws, Exception(), null) + wsFactory.ws.cancel() Mockito.verify(listener) .onClose(any(), any()) @@ -218,7 +218,7 @@ class SignalClientTest : BaseTest() { } yield() client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) - client.onFailure(wsFactory.ws, Exception("handshake failure"), null) + wsFactory.ws.cancel() runCatching { job.await() } } @@ -240,7 +240,7 @@ class SignalClientTest : BaseTest() { joinJob.await() val connectedWs = wsFactory.ws - client.onFailure(connectedWs, Exception("initial connection lost"), null) + connectedWs.cancel() Mockito.verify(listener).onClose(any(), any()) supervisorScope { @@ -251,13 +251,13 @@ class SignalClientTest : BaseTest() { clearInvocations(listener) client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) - client.onFailure(wsFactory.ws, Exception("reconnect handshake failure"), null) + wsFactory.ws.cancel() runCatching { reconnectJob.await() } } Mockito.verify(listener).onClose( - argThat { reason: String -> reason.contains("reconnect handshake failure") }, + argThat { reason: String -> reason.contains("cancelled") }, any(), ) } From 4bda1f0e883d0376caa4a6e6a1fa520a8a847e01 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 22:28:33 +0900 Subject: [PATCH 4/7] Have websocket onFailure resumeWithException instead of cancelling job --- .../io/livekit/android/room/SignalClient.kt | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index e5b982848..65c0cc500 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -62,6 +62,7 @@ import java.util.Date import javax.inject.Inject import javax.inject.Named import javax.inject.Singleton +import kotlin.coroutines.resumeWithException /** * SignalClient to LiveKit WS servers @@ -352,17 +353,30 @@ constructor( val wasConnected = isConnected val wasReconnectHandshake = joinContinuation != null && isReconnecting + // Clear joinContinuation before cont.cancel() so a synchronous nested onFailure (e.g. from + // WebSocket.cancel) does not treat this as an in-flight join/reconnect handshake. + val joinCont = joinContinuation + joinContinuation = null + if (reason != null) { LKLog.e(t) { "websocket failure: $reason" } - val error = Exception(reason) + val error = if (joinCont != null) { + RoomException.ConnectException(reason, t) + } else { + Exception(reason) + } listener?.onError(error) - joinContinuation?.cancel(error) + joinCont?.resumeWithException(error) } else { LKLog.e(t) { "websocket failure: $response" } - listener?.onError(t) - joinContinuation?.cancel(t) + val error = if (joinCont != null) { + RoomException.ConnectException(t.localizedMessage, t) + } else { + t + } + listener?.onError(error) + joinCont?.resumeWithException(error) } - joinContinuation = null if (wasConnected || wasReconnectHandshake) { // onClosing/onClosed will not be called after onFailure. @@ -887,8 +901,10 @@ constructor( pongJob = null currentWs?.close(code, reason) currentWs = null - joinContinuation?.cancel() + // Same ordering as [onFailure]: clear the field before cancel() for synchronous listener callbacks. + val joinCont = joinContinuation joinContinuation = null + joinCont?.cancel() if (shouldClearQueuedRequests) { requestFlow.resetReplayCache() } From 23778c20ccfc7b96369efefe6dd1d402b126d5f8 Mon Sep 17 00:00:00 2001 From: davidliu Date: Wed, 6 May 2026 00:14:19 +0900 Subject: [PATCH 5/7] spotless --- .../src/main/java/io/livekit/android/test/mock/MockWebSocket.kt | 2 +- .../src/test/java/io/livekit/android/room/SignalClientTest.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt index c9d92305c..3b1aa84ee 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockWebSocket.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt index aa2abfb63..262def2fa 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From b9df87ae5d3a3adf2b5961b06d2353aa1a108ee2 Mon Sep 17 00:00:00 2001 From: davidliu Date: Wed, 6 May 2026 01:06:15 +0900 Subject: [PATCH 6/7] detekt baseline --- livekit-android-sdk/detekt-baseline-release.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/livekit-android-sdk/detekt-baseline-release.xml b/livekit-android-sdk/detekt-baseline-release.xml index 89e10ca0b..913ef5b22 100644 --- a/livekit-android-sdk/detekt-baseline-release.xml +++ b/livekit-android-sdk/detekt-baseline-release.xml @@ -26,6 +26,7 @@ CyclomaticComplexMethod:RTCEngine.kt$RTCEngine$private fun makeRTCConfig( serverResponse: Either<JoinResponse, ReconnectResponse>, connectOptions: ConnectOptions, ): RTCConfiguration CyclomaticComplexMethod:Room.kt$Room$@Throws(Exception::class) suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) CyclomaticComplexMethod:RoomEvent.kt$fun LivekitModels.DisconnectReason?.convert(): DisconnectReason + CyclomaticComplexMethod:SignalClient.kt$SignalClient$override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) CyclomaticComplexMethod:SignalClient.kt$SignalClient$private fun handleSignalResponseImpl(ws: WebSocket, response: LivekitRtc.SignalResponse) EmptyFunctionBlock:RTCEngine.kt$RTCEngine${ } HasPlatformType:DataChannelManager.kt$DataChannelManager$@get:FlowObservable var state by flowDelegate(dataChannel.state()) private set From 14de74bc0602d394bbaca7323c35900ca629a0ea Mon Sep 17 00:00:00 2001 From: davidliu Date: Wed, 6 May 2026 01:27:46 +0900 Subject: [PATCH 7/7] changesets --- .changeset/big-points-happen.md | 5 +++++ .changeset/khaki-trains-roll.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 .changeset/big-points-happen.md create mode 100644 .changeset/khaki-trains-roll.md diff --git a/.changeset/big-points-happen.md b/.changeset/big-points-happen.md new file mode 100644 index 000000000..0e2741fe6 --- /dev/null +++ b/.changeset/big-points-happen.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix Room.connect not properly throwing ConnectException for websocket connection failures during Room.join() diff --git a/.changeset/khaki-trains-roll.md b/.changeset/khaki-trains-roll.md new file mode 100644 index 000000000..389deebac --- /dev/null +++ b/.changeset/khaki-trains-roll.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix reconnect potentially getting cancelled by websocket failure