From 0552bcc3204817d9ad8da585d6584a6d28b30d01 Mon Sep 17 00:00:00 2001 From: Adrian Niculescu <15037449+adrian-niculescu@users.noreply.github.com> Date: Tue, 5 May 2026 17:06:10 +0300 Subject: [PATCH] Fixed silent reliable data send loss --- .changeset/quiet-ducks-deliver.md | 5 ++ .../java/io/livekit/android/room/RTCEngine.kt | 50 ++++++++++++++----- .../android/test/mock/MockDataChannel.kt | 22 +++++++- .../android/room/RTCEngineMockE2ETest.kt | 44 ++++++++++++++++ .../LocalParticipantMockE2ETest.kt | 22 ++++++++ 5 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 .changeset/quiet-ducks-deliver.md diff --git a/.changeset/quiet-ducks-deliver.md b/.changeset/quiet-ducks-deliver.md new file mode 100644 index 000000000..34d771920 --- /dev/null +++ b/.changeset/quiet-ducks-deliver.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fixed silent loss of reliable data when DataChannel.send returned false and when buffered items were replayed across multiple resumes. diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 323b80011..51c5cf5a7 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -678,7 +678,12 @@ internal constructor( connectionState = ConnectionState.CONNECTED } if (lastMessageSeq != null) { - resendReliableMessagesForResume(lastMessageSeq) + resendReliableMessagesForResume(lastMessageSeq).onFailure { e -> + LKLog.w(e) { + "Reliable data replay did not complete on resume; " + + "buffered items remain queued for the next resume." + } + } } // Is connected, notify and return. regionUrlProvider?.clearAttemptedRegions() @@ -757,29 +762,39 @@ internal constructor( } } - if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) { + val isReliable = dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE + if (isReliable) { dataPacket = dataPacket.toBuilder() .setSequence(reliableDataSequence) .build() - reliableDataSequence++ } - val byteBuffer = ByteBuffer.wrap(dataPacket.toByteArray()) + val packetBytes = dataPacket.toByteArray() - if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) { - reliableMessageBuffer.queue(DataPacketItem(byteBuffer, dataPacket.sequence)) - if (this.connectionState == ConnectionState.RECONNECTING) { - return Result.success(Unit) - } + if (isReliable && this.connectionState == ConnectionState.RECONNECTING) { + reliableMessageBuffer.queue(DataPacketItem(ByteBuffer.wrap(packetBytes), dataPacket.sequence)) + reliableDataSequence++ + return Result.success(Unit) } val buf = DataChannel.Buffer( - byteBuffer, + ByteBuffer.wrap(packetBytes), true, ) val channel = dataChannelForKind(dataPacket.kind) ?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}") - channel.send(buf) + if (!channel.send(buf)) { + return Result.failure( + RoomException.ConnectException("failed to send data packet for ${dataPacket.kind.name}"), + ) + } + + if (isReliable) { + // Wrap a fresh ByteBuffer so the queued item's position stays at 0; the + // ByteBuffer just sent has been drained by DataChannel.send. + reliableMessageBuffer.queue(DataPacketItem(ByteBuffer.wrap(packetBytes), dataPacket.sequence)) + reliableDataSequence++ + } } catch (e: Exception) { e.rethrowIfCancellationSignal() return Result.failure(e) @@ -808,8 +823,17 @@ internal constructor( synchronized(reliableStateLock) { reliableMessageBuffer.popToSequence(lastMessageSeq) - reliableMessageBuffer.getAll().forEach { item -> - channel.send(DataChannel.Buffer(item.data, true)) + for (item in reliableMessageBuffer.getAll()) { + // Send a duplicate so the underlying buffer keeps position=0 and survives + // multiple resume attempts. DataChannel.send drains the buffer's position to + // its limit; without duplicate(), the second replay would send empty bytes. + if (!channel.send(DataChannel.Buffer(item.data.duplicate(), true))) { + return Result.failure( + RoomException.ConnectException( + "failed to replay reliable data packet at sequence ${item.sequence}", + ), + ) + } } } diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt index 8a332dc25..61dcd6d0c 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt @@ -23,6 +23,17 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { var observer: Observer? = null var sentBuffers = mutableListOf() + /** Snapshot of the bytes visible at send time, captured via the Buffer's current position/limit. */ + var sentPayloads = mutableListOf() + var sendResult = true + + /** + * When true, [send] advances the buffer's position to its limit, mirroring + * the real WebRTC wrapper which drains the buffer via `ByteBuffer.get(byte[])`. + * Off by default to keep existing tests that read from `sentBuffers` working. + */ + var consumeSentBuffer = false + private var stateBacking: State = State.OPEN var state: State get() { @@ -57,6 +68,7 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { fun clearSentBuffers() { sentBuffers.clear() + sentPayloads.clear() } override fun registerObserver(observer: Observer?) { @@ -92,7 +104,15 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { override fun send(buffer: Buffer): Boolean { ensureNotDisposed() sentBuffers.add(buffer) - return true + // Capture what native WebRTC would receive at this moment (position..limit). + val savedPos = buffer.data.position() + val payload = ByteArray(buffer.data.remaining()) + buffer.data.get(payload) + sentPayloads.add(payload) + if (!consumeSentBuffer) { + buffer.data.position(savedPos) + } + return sendResult } override fun close() { diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index 7c3acca09..9478133b9 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -42,6 +42,7 @@ import livekit.LivekitModels.DataPacket import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection import org.junit.Assert +import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue @@ -412,6 +413,49 @@ class RTCEngineMockE2ETest : MockE2ETest() { assertEquals(TestData.JOIN.join.participant.sid, sid) } + @Test + fun resendReliableMessagesReplaysFullPayloadAcrossMultipleResumes() = runTest { + connect() + val pubPeerConnection = getPublisherPeerConnection() + val pubDataChannel = pubPeerConnection + .dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel + pubDataChannel.consumeSentBuffer = true + + val payload = "hello-resume".toByteArray() + val publishResult = room.localParticipant.publishData(payload) + assertTrue(publishResult.isSuccess) + + // Two consecutive resumes without server progress (lastMessageSeq=0 pops nothing). + rtcEngine.resendReliableMessagesForResume(0) + rtcEngine.resendReliableMessagesForResume(0) + + // 1 initial publish + 2 replays. + assertEquals(3, pubDataChannel.sentPayloads.size) + + // Both replays must carry the original payload, not an empty buffer. + val replay1 = DataPacket.parseFrom(pubDataChannel.sentPayloads[1]) + val replay2 = DataPacket.parseFrom(pubDataChannel.sentPayloads[2]) + assertArrayEquals(payload, replay1.user.payload.toByteArray()) + assertArrayEquals(payload, replay2.user.payload.toByteArray()) + } + + @Test + fun resendReliableMessagesReturnsFailureWhenReplaySendFails() = runTest { + connect() + val pubPeerConnection = getPublisherPeerConnection() + val pubDataChannel = pubPeerConnection + .dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel + + val publishResult = room.localParticipant.publishData("queued".toByteArray()) + assertTrue(publishResult.isSuccess) + + pubDataChannel.sendResult = false + + val resendResult = rtcEngine.resendReliableMessagesForResume(0) + assertTrue(resendResult.isFailure) + assertTrue(resendResult.exceptionOrNull() is RoomException.ConnectException) + } + /** * Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers, * poisoning every subsequent publish of the same track with DuplicateTrackException diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt index 00b5ca035..0048872d6 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt @@ -899,4 +899,26 @@ class LocalParticipantMockE2ETest : MockE2ETest() { assertTrue(result.isFailure) assertTrue(result.exceptionOrNull() is RoomException.ConnectException) } + + @Test + fun publishDataReturnsFailureWhenDataChannelSendFails() = runTest { + connect() + val pubPeerConnection = getPublisherPeerConnection() + val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel + pubDataChannel.sendResult = false + + val result = room.localParticipant.publishData("hello".toByteArray()) + + assertTrue(result.isFailure) + assertTrue(result.exceptionOrNull() is RoomException.ConnectException) + + pubDataChannel.sendResult = true + val retryResult = room.localParticipant.publishData("hello".toByteArray()) + + assertTrue(retryResult.isSuccess) + assertEquals(2, pubDataChannel.sentBuffers.size) + + val retriedPacket = DataPacket.parseFrom(ByteString.copyFrom(pubDataChannel.sentBuffers[1].data)) + assertEquals(1, retriedPacket.sequence) + } }