From 546dcc32be850f1da8b2ca5bbe506ffe17eb1289 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 03:07:23 +0900 Subject: [PATCH 1/2] Fix exception when resending data channel messages after a resume --- .changeset/tiny-buses-hear.md | 5 +++++ .../src/main/java/io/livekit/android/room/RTCEngine.kt | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 .changeset/tiny-buses-hear.md diff --git a/.changeset/tiny-buses-hear.md b/.changeset/tiny-buses-hear.md new file mode 100644 index 000000000..de52c30c0 --- /dev/null +++ b/.changeset/tiny-buses-hear.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix exception when resending data channel messages after a resume diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index ed1f5fbe4..723262305 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -790,7 +790,12 @@ internal constructor( } internal suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result { - ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE) + try { + ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE) + } catch (e: Exception) { + e.rethrowIfCancellationSignal() + return Result.failure(e) + } val channel = dataChannelForKind(LivekitModels.DataPacket.Kind.RELIABLE) ?: return Result.failure(NullPointerException("reliable channel not established!")) From 9df6404cac5eb89a3a87f15aaa1aeedbd27d79c1 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 5 May 2026 03:27:39 +0900 Subject: [PATCH 2/2] test for resendReliableData --- .../android/room/RTCEngineMockE2ETest.kt | 65 ++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index c881a2df9..96e6b943b 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,10 @@ package io.livekit.android.room +import com.google.protobuf.ByteString import io.livekit.android.test.MockE2ETest import io.livekit.android.test.events.FlowCollector +import io.livekit.android.test.mock.MockDataChannel import io.livekit.android.test.mock.MockPeerConnection import io.livekit.android.test.mock.SignalRequestHandler import io.livekit.android.test.mock.TestData @@ -27,6 +29,7 @@ import io.livekit.android.util.toOkioByteString import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.advanceUntilIdle import livekit.LivekitModels +import livekit.LivekitModels.DataPacket import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection import org.junit.Assert @@ -366,4 +369,64 @@ class RTCEngineMockE2ETest : MockE2ETest() { val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID) assertEquals(TestData.JOIN.join.participant.sid, sid) } + + /** + * After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine + * drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the + * reliable data channel — see [RTCEngine.resendReliableMessagesForResume]. + */ + @Test + fun softReconnectResendsBufferedReliableData() = runTest { + room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) + + val publisherOfferHandler: SignalRequestHandler = { request -> + if (request.hasOffer()) { + val answer = with(LivekitRtc.SignalResponse.newBuilder()) { + answer = with(LivekitRtc.SessionDescription.newBuilder()) { + sdp = "remote_answer" + type = "answer" + id = request.offer.id + build() + } + build() + } + wsFactory.receiveMessage(answer) + true + } else { + false + } + } + wsFactory.registerSignalRequestHandler(publisherOfferHandler) + connect() + + val lastMessageSeq = TestData.RECONNECT.reconnect.lastMessageSeq + val pubDataChannel = + getPublisherPeerConnection().dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel + + val payloads = listOf(byteArrayOf(1), byteArrayOf(2), byteArrayOf(3)) + for (payload in payloads) { + assertTrue(room.localParticipant.publishData(payload).isSuccess) + } + assertEquals(3, pubDataChannel.sentBuffers.size) + + disconnectPeerConnection() + testScheduler.advanceTimeBy(1000) + wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + simulateMessageFromServer(TestData.RECONNECT) + connectPeerConnection() + + advanceUntilIdle() + + val expectedResentSequences = listOf(1, 2, 3).filter { it > lastMessageSeq } + assertEquals(3 + expectedResentSequences.size, pubDataChannel.sentBuffers.size) + expectedResentSequences.forEachIndexed { index, sequence -> + val packet = DataPacket.parseFrom( + ByteString.copyFrom(pubDataChannel.sentBuffers[3 + index].data), + ) + assertEquals(sequence, packet.sequence) + assertTrue( + packet.user.payload.toByteArray().contentEquals(byteArrayOf(sequence.toByte())), + ) + } + } }