Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-ducks-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fixed RTCEngine.sendData reporting Result.success when DataChannel.send returned false. The boolean result is now checked and surfaced as Result.failure(RoomException.ConnectException), allowing callers to retry locally rejected data packets. For RELIABLE packets, reliableMessageBuffer queueing and sequence counter advancement now happen only after a successful send, so a failed send does not burn a sequence number or leave a stale resume entry; caller retries reuse the same sequence. The RECONNECTING fast path still queues without sending. Additionally, DataPacketItem now stores the packet payload as a ByteArray instead of a ByteBuffer, and resendReliableMessagesForResume wraps a fresh ByteBuffer per send. Previously the underlying WebRTC wrapper drained the buffer's position via ByteBuffer.get(byte[]), so a buffered item that survived multiple resume attempts (e.g., back-to-back reconnects with no server-side progress on lastMessageSeq) would replay an empty payload on the second and subsequent attempts while still reporting success. resendReliableMessagesForResume also now checks DataChannel.send's boolean return per item and surfaces a Result.failure(RoomException.ConnectException) on the first rejected replay; the reconnect caller logs the failure and proceeds, so buffered items remain queued for the next resume rather than being silently dropped.
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,12 @@ internal constructor(
connectionState = ConnectionState.CONNECTED
}
if (lastMessageSeq != null) {
resendReliableMessagesForResume(lastMessageSeq)
resendReliableMessagesForResume(lastMessageSeq).onFailure { e ->
LKLog.w(e) {
"Reliable data replay did not complete on resume; " +
"buffered items remain queued for the next resume."
}
}
}
// Is connected, notify and return.
regionUrlProvider?.clearAttemptedRegions()
Expand Down Expand Up @@ -750,29 +755,37 @@ internal constructor(
}
}

if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
val isReliable = dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE
if (isReliable) {
dataPacket = dataPacket.toBuilder()
.setSequence(reliableDataSequence)
.build()
reliableDataSequence++
}

val byteBuffer = ByteBuffer.wrap(dataPacket.toByteArray())
val packetBytes = dataPacket.toByteArray()

if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
reliableMessageBuffer.queue(DataPacketItem(byteBuffer, dataPacket.sequence))
if (this.connectionState == ConnectionState.RECONNECTING) {
return Result.success(Unit)
}
if (isReliable && this.connectionState == ConnectionState.RECONNECTING) {
reliableMessageBuffer.queue(DataPacketItem(packetBytes, dataPacket.sequence))
reliableDataSequence++
return Result.success(Unit)
}
val buf = DataChannel.Buffer(
byteBuffer,
ByteBuffer.wrap(packetBytes),
true,
)
val channel = dataChannelForKind(dataPacket.kind)
?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}")

channel.send(buf)
if (!channel.send(buf)) {
return Result.failure(
RoomException.ConnectException("failed to send data packet for ${dataPacket.kind.name}"),
)
}

if (isReliable) {
reliableMessageBuffer.queue(DataPacketItem(packetBytes, dataPacket.sequence))
reliableDataSequence++
}
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
return Result.failure(e)
Expand All @@ -789,15 +802,22 @@ internal constructor(
}
}

internal suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result<Unit> {
@VisibleForTesting(otherwise = VisibleForTesting.PACKAGE_PRIVATE)
suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result<Unit> {
ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
val channel = dataChannelForKind(LivekitModels.DataPacket.Kind.RELIABLE)
?: return Result.failure(NullPointerException("reliable channel not established!"))

synchronized(reliableStateLock) {
reliableMessageBuffer.popToSequence(lastMessageSeq)
reliableMessageBuffer.getAll().forEach { item ->
channel.send(DataChannel.Buffer(item.data, true))
for (item in reliableMessageBuffer.getAll()) {
if (!channel.send(DataChannel.Buffer(ByteBuffer.wrap(item.data), true))) {
return Result.failure(
RoomException.ConnectException(
"failed to replay reliable data packet at sequence ${item.sequence}",
),
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,11 @@

package io.livekit.android.webrtc

import java.nio.ByteBuffer
import java.util.Deque
import java.util.LinkedList

/** @suppress */
data class DataPacketItem(val data: ByteBuffer, val sequence: Int)
data class DataPacketItem(val data: ByteArray, val sequence: Int)

/** @suppress */
class DataPacketBuffer(private val extraCapacity: Long = 0L) {
Expand All @@ -37,7 +36,7 @@ class DataPacketBuffer(private val extraCapacity: Long = 0L) {
@Synchronized
fun queue(item: DataPacketItem) {
buffer.add(item)
totalSize += item.data.capacity()
totalSize += item.data.size
}

@Synchronized
Expand All @@ -46,7 +45,7 @@ class DataPacketBuffer(private val extraCapacity: Long = 0L) {
return null
}
val item = buffer.removeFirst()
totalSize -= item.data.capacity()
totalSize -= item.data.size
return item
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
var observer: Observer? = null
var sentBuffers = mutableListOf<Buffer>()

/** Snapshot of the bytes visible at send time, captured via the Buffer's current position/limit. */
var sentPayloads = mutableListOf<ByteArray>()
var sendResult = true

/**
* When true, [send] advances the buffer's position to its limit, mirroring
* the real WebRTC wrapper which drains the buffer via `ByteBuffer.get(byte[])`.
* Off by default to keep existing tests that read from `sentBuffers` working.
*/
var consumeSentBuffer = false

private var stateBacking: State = State.OPEN
var state: State
get() {
Expand Down Expand Up @@ -57,6 +68,7 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {

fun clearSentBuffers() {
sentBuffers.clear()
sentPayloads.clear()
}

override fun registerObserver(observer: Observer?) {
Expand Down Expand Up @@ -92,7 +104,15 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
override fun send(buffer: Buffer): Boolean {
ensureNotDisposed()
sentBuffers.add(buffer)
return true
// Capture what native WebRTC would receive at this moment (position..limit).
val savedPos = buffer.data.position()
val payload = ByteArray(buffer.data.remaining())
buffer.data.get(payload)
sentPayloads.add(payload)
if (!consumeSentBuffer) {
buffer.data.position(savedPos)
}
return sendResult
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@ package io.livekit.android.room

import io.livekit.android.test.MockE2ETest
import io.livekit.android.test.events.FlowCollector
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockPeerConnection
import io.livekit.android.test.mock.SignalRequestHandler
import io.livekit.android.test.mock.TestData
Expand All @@ -27,9 +28,11 @@ import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.advanceUntilIdle
import livekit.LivekitModels
import livekit.LivekitModels.DataPacket
import livekit.LivekitRtc
import livekit.org.webrtc.PeerConnection
import org.junit.Assert
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Before
Expand Down Expand Up @@ -366,4 +369,47 @@ class RTCEngineMockE2ETest : MockE2ETest() {
val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID)
assertEquals(TestData.JOIN.join.participant.sid, sid)
}

@Test
fun resendReliableMessagesReplaysFullPayloadAcrossMultipleResumes() = runTest {
connect()
val pubPeerConnection = getPublisherPeerConnection()
val pubDataChannel = pubPeerConnection
.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
pubDataChannel.consumeSentBuffer = true

val payload = "hello-resume".toByteArray()
val publishResult = room.localParticipant.publishData(payload)
assertTrue(publishResult.isSuccess)

// Two consecutive resumes without server progress (lastMessageSeq=0 pops nothing).
rtcEngine.resendReliableMessagesForResume(0)
rtcEngine.resendReliableMessagesForResume(0)

// 1 initial publish + 2 replays.
assertEquals(3, pubDataChannel.sentPayloads.size)

// Both replays must carry the original payload, not an empty buffer.
val replay1 = DataPacket.parseFrom(pubDataChannel.sentPayloads[1])
val replay2 = DataPacket.parseFrom(pubDataChannel.sentPayloads[2])
assertArrayEquals(payload, replay1.user.payload.toByteArray())
assertArrayEquals(payload, replay2.user.payload.toByteArray())
}

@Test
fun resendReliableMessagesReturnsFailureWhenReplaySendFails() = runTest {
connect()
val pubPeerConnection = getPublisherPeerConnection()
val pubDataChannel = pubPeerConnection
.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel

val publishResult = room.localParticipant.publishData("queued".toByteArray())
assertTrue(publishResult.isSuccess)

pubDataChannel.sendResult = false

val resendResult = rtcEngine.resendReliableMessagesForResume(0)
assertTrue(resendResult.isFailure)
assertTrue(resendResult.exceptionOrNull() is RoomException.ConnectException)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,4 +899,26 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
assertTrue(result.isFailure)
assertTrue(result.exceptionOrNull() is RoomException.ConnectException)
}

@Test
fun publishDataReturnsFailureWhenDataChannelSendFails() = runTest {
connect()
val pubPeerConnection = getPublisherPeerConnection()
val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
pubDataChannel.sendResult = false

val result = room.localParticipant.publishData("hello".toByteArray())

assertTrue(result.isFailure)
assertTrue(result.exceptionOrNull() is RoomException.ConnectException)

pubDataChannel.sendResult = true
val retryResult = room.localParticipant.publishData("hello".toByteArray())

assertTrue(retryResult.isSuccess)
assertEquals(2, pubDataChannel.sentBuffers.size)

val retriedPacket = DataPacket.parseFrom(ByteString.copyFrom(pubDataChannel.sentBuffers[1].data))
assertEquals(1, retriedPacket.sequence)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@ import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Before
import org.junit.Test
import java.nio.ByteBuffer

class DataPacketBufferTest : BaseTest() {

Expand All @@ -37,7 +36,7 @@ class DataPacketBufferTest : BaseTest() {
fun fillWithTestValues() {
for (i in 1..5) {
val bytes = ByteArray(i)
buffer.queue(DataPacketItem(ByteBuffer.wrap(bytes), i))
buffer.queue(DataPacketItem(bytes, i))
}
}

Expand All @@ -61,7 +60,7 @@ class DataPacketBufferTest : BaseTest() {

list.forEachIndexed { index, item ->
assertEquals(index + 1, item.sequence)
assertEquals(index + 1, item.data.capacity())
assertEquals(index + 1, item.data.size)
}
}

Expand All @@ -81,7 +80,7 @@ class DataPacketBufferTest : BaseTest() {
assertEquals(5, list.size)
list.forEachIndexed { index, item ->
assertEquals(index + 1, item.sequence)
assertEquals(index + 1, item.data.capacity())
assertEquals(index + 1, item.data.size)
}
}

Expand Down
Loading