Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tiny-buses-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix exception when resending data channel messages after a resume
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,12 @@ internal constructor(
}

internal suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result<Unit> {
ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
try {
ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
return Result.failure(e)
}
val channel = dataChannelForKind(LivekitModels.DataPacket.Kind.RELIABLE)
?: return Result.failure(NullPointerException("reliable channel not established!"))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

package io.livekit.android.room

import com.google.protobuf.ByteString
import io.livekit.android.test.MockE2ETest
import io.livekit.android.test.events.FlowCollector
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockPeerConnection
import io.livekit.android.test.mock.SignalRequestHandler
import io.livekit.android.test.mock.TestData
Expand All @@ -27,6 +29,7 @@ import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.advanceUntilIdle
import livekit.LivekitModels
import livekit.LivekitModels.DataPacket
import livekit.LivekitRtc
import livekit.org.webrtc.PeerConnection
import org.junit.Assert
Expand Down Expand Up @@ -366,4 +369,64 @@ class RTCEngineMockE2ETest : MockE2ETest() {
val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID)
assertEquals(TestData.JOIN.join.participant.sid, sid)
}

/**
* After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine
* drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the
* reliable data channel — see [RTCEngine.resendReliableMessagesForResume].
*/
@Test
fun softReconnectResendsBufferedReliableData() = runTest {
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)

val publisherOfferHandler: SignalRequestHandler = { request ->
if (request.hasOffer()) {
val answer = with(LivekitRtc.SignalResponse.newBuilder()) {
answer = with(LivekitRtc.SessionDescription.newBuilder()) {
sdp = "remote_answer"
type = "answer"
id = request.offer.id
build()
}
build()
}
wsFactory.receiveMessage(answer)
true
} else {
false
}
}
wsFactory.registerSignalRequestHandler(publisherOfferHandler)
connect()

val lastMessageSeq = TestData.RECONNECT.reconnect.lastMessageSeq
val pubDataChannel =
getPublisherPeerConnection().dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel

val payloads = listOf(byteArrayOf(1), byteArrayOf(2), byteArrayOf(3))
for (payload in payloads) {
assertTrue(room.localParticipant.publishData(payload).isSuccess)
}
assertEquals(3, pubDataChannel.sentBuffers.size)

disconnectPeerConnection()
testScheduler.advanceTimeBy(1000)
wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
simulateMessageFromServer(TestData.RECONNECT)
connectPeerConnection()

advanceUntilIdle()

val expectedResentSequences = listOf(1, 2, 3).filter { it > lastMessageSeq }
assertEquals(3 + expectedResentSequences.size, pubDataChannel.sentBuffers.size)
expectedResentSequences.forEachIndexed { index, sequence ->
val packet = DataPacket.parseFrom(
ByteString.copyFrom(pubDataChannel.sentBuffers[3 + index].data),
)
assertEquals(sequence, packet.sequence)
assertTrue(
packet.user.payload.toByteArray().contentEquals(byteArrayOf(sequence.toByte())),
)
}
}
}
Loading