Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/big-points-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix Room.connect not properly throwing ConnectException for websocket connection failures during Room.join()
5 changes: 5 additions & 0 deletions .changeset/khaki-trains-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix reconnect potentially getting cancelled by websocket failure
1 change: 1 addition & 0 deletions livekit-android-sdk/detekt-baseline-release.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<ID>CyclomaticComplexMethod:RTCEngine.kt$RTCEngine$private fun makeRTCConfig( serverResponse: Either&lt;JoinResponse, ReconnectResponse>, connectOptions: ConnectOptions, ): RTCConfiguration</ID>
<ID>CyclomaticComplexMethod:Room.kt$Room$@Throws(Exception::class) suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions())</ID>
<ID>CyclomaticComplexMethod:RoomEvent.kt$fun LivekitModels.DisconnectReason?.convert(): DisconnectReason</ID>
<ID>CyclomaticComplexMethod:SignalClient.kt$SignalClient$override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?)</ID>
<ID>CyclomaticComplexMethod:SignalClient.kt$SignalClient$private fun handleSignalResponseImpl(ws: WebSocket, response: LivekitRtc.SignalResponse)</ID>
<ID>EmptyFunctionBlock:RTCEngine.kt$RTCEngine${ }</ID>
<ID>HasPlatformType:DataChannelManager.kt$DataChannelManager$@get:FlowObservable var state by flowDelegate(dataChannel.state()) private set</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import java.util.Date
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
import kotlin.coroutines.resumeWithException

/**
* SignalClient to LiveKit WS servers
Expand Down Expand Up @@ -192,8 +193,8 @@ constructor(
// If the coroutine is cancelled, websocket needs to be cancelled.
// onFailure will handle cleanup.
LKLog.v { "connect cancelled, abort websocket" }
currentWs?.cancel()
joinContinuation = null
currentWs?.cancel()
}
currentWs = websocketFactory.newWebSocket(request, this@SignalClient)
}
Expand Down Expand Up @@ -350,25 +351,39 @@ constructor(
}

val wasConnected = isConnected
val wasConnecting = joinContinuation != null
val wasReconnectHandshake = joinContinuation != null && isReconnecting

// Clear joinContinuation before cont.cancel() so a synchronous nested onFailure (e.g. from
// WebSocket.cancel) does not treat this as an in-flight join/reconnect handshake.
val joinCont = joinContinuation
joinContinuation = null

if (reason != null) {
LKLog.e(t) { "websocket failure: $reason" }
val error = Exception(reason)
val error = if (joinCont != null) {
RoomException.ConnectException(reason, t)
} else {
Exception(reason)
}
listener?.onError(error)
joinContinuation?.cancel(error)
joinCont?.resumeWithException(error)
} else {
LKLog.e(t) { "websocket failure: $response" }
listener?.onError(t)
joinContinuation?.cancel(t)
val error = if (joinCont != null) {
RoomException.ConnectException(t.localizedMessage, t)
} else {
t
}
listener?.onError(error)
joinCont?.resumeWithException(error)
}
joinContinuation = null

if (wasConnected || wasConnecting) {
if (wasConnected || wasReconnectHandshake) {
// onClosing/onClosed will not be called after onFailure.
// Handle websocket closure here.
// Also handle the case where failure occurs during a reconnect attempt (wasConnecting),
// where isConnected is already false but the upper layer still needs to be notified.
// Also handle failure during a soft reconnect handshake (reconnect query): isConnected is
// still false but the upper layer should be notified like a close. Initial join handshake
// failures do not call onClose so RTCEngine can surface them via onError / onFailToConnect.
handleWebSocketClose(
reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure",
code = response?.code ?: CLOSE_REASON_WEBSOCKET_FAILURE,
Expand Down Expand Up @@ -886,8 +901,10 @@ constructor(
pongJob = null
currentWs?.close(code, reason)
currentWs = null
joinContinuation?.cancel()
// Same ordering as [onFailure]: clear the field before cancel() for synchronous listener callbacks.
val joinCont = joinContinuation
joinContinuation = null
joinCont?.cancel()
if (shouldClearQueuedRequests) {
requestFlow.resetReplayCache()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,9 @@ class MockWebSocket(
get() = mutableSentRequests

override fun cancel() {
if (isClosed) {
return
}
isClosed = true
listener.onFailure(this, IOException("cancelled"), null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import livekit.LivekitModels
Expand Down Expand Up @@ -81,6 +83,38 @@ class RTCEngineMockE2ETest : MockE2ETest() {
assertEquals(sentIceServers, subPeerConnection.rtcConfig.iceServers)
}

@Test
fun roomConnectDoesNotHangOnWebSocketFailure() = runTest {
val connectJob = async {
try {
room.connect(
url = TestData.EXAMPLE_URL,
token = "token",
)
null
} catch (e: Throwable) {
e
}
}

room::state.flow
.takeWhile { it != Room.State.CONNECTING }
.collect()
runCurrent()

wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
// simulate websocket failure
wsFactory.ws.cancel()
advanceUntilIdle()

val connectResult = connectJob.await()
assertTrue("connect job should fail on websocket cancel", connectResult != null)
assertTrue(
"Expected RoomException.ConnectException, got $connectResult",
connectResult is RoomException.ConnectException,
)
}

@Test
fun iceSubscriberConnect() = runTest {
connect()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,8 @@ import io.livekit.android.test.util.toPBByteString
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import kotlinx.serialization.json.Json
import livekit.LivekitRtc
import livekit.org.webrtc.SessionDescription
Expand All @@ -45,9 +47,11 @@ import org.junit.Before
import org.junit.Test
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.Mockito.clearInvocations
import org.mockito.Mockito.inOrder
import org.mockito.kotlin.any
import org.mockito.kotlin.argThat
import org.mockito.kotlin.never
import org.mockito.kotlin.times

@ExperimentalCoroutinesApi
Expand Down Expand Up @@ -146,7 +150,7 @@ class SignalClientTest : BaseTest() {
}
}

client.onFailure(wsFactory.ws, Exception(), null)
wsFactory.ws.cancel()
job.await()

assertTrue(failed)
Expand Down Expand Up @@ -195,12 +199,86 @@ class SignalClientTest : BaseTest() {
connectWebsocketAndJoin()
job.await()

client.onFailure(wsFactory.ws, Exception(), null)
wsFactory.ws.cancel()

Mockito.verify(listener)
.onClose(any(), any())
}

/**
* Cold [join] handshake failure: [isConnected] is still false and this is not a reconnect attempt,
* so the listener must not get [SignalClient.Listener.onClose] (RTCEngine uses [onError] while
* CONNECTING). OkHttp does not call [WebSocketListener.onClosed] after [WebSocketListener.onFailure].
*/
@Test
fun onCloseNotInvokedWhenJoinHandshakeFailsBeforeJoinMessage() = runTest {
supervisorScope {
val job = async {
client.join(EXAMPLE_URL, "")
}
yield()
client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
wsFactory.ws.cancel()

runCatching { job.await() }
}

Mockito.verify(listener, never()).onClose(any(), any())
Mockito.verify(listener, times(1)).onError(any())
}

/**
* Mirrors [RTCEngine] soft reconnect, and then tests when the reconnect handshake fails before
* the reconnect message is delivered (no join response yet).
*/
@Test
fun onCloseInvokedWhenReconnectHandshakeFailsBeforeReconnectMessage() = runTest {
val joinJob = async {
client.join(EXAMPLE_URL, "")
}
connectWebsocketAndJoin()
joinJob.await()

val connectedWs = wsFactory.ws
connectedWs.cancel()
Mockito.verify(listener).onClose(any(), any())

supervisorScope {
val reconnectJob = async {
client.reconnect(EXAMPLE_URL, "", "participant_sid")
}
yield()
clearInvocations(listener)

client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
wsFactory.ws.cancel()

runCatching { reconnectJob.await() }
}

Mockito.verify(listener).onClose(
argThat { reason: String -> reason.contains("cancelled") },
any(),
)
}

/**
* When the join coroutine is cancelled, [MockWebSocket.cancel] runs and delivers [onFailure].
* That path must not report a normal signal close to the listener (no duplicate reconnect churn).
*/
@Test
fun onCloseNotInvokedWhenJoinJobCancelled() = runTest {
val job = async {
client.join(EXAMPLE_URL, "")
}
yield()
job.cancel()

runCatching { job.await() }

Mockito.verify(listener, never()).onClose(any(), any())
}

/**
* Ensure responses that come in before [SignalClient.onReadyForResponses] are queued.
*/
Expand Down
Loading