Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sturdy-foxes-publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fixed RTCEngine.addTrack leaking pendingTrackResolvers entries on timeout or caller cancellation, which previously caused subsequent publishes of the same track to fail with DuplicateTrackException until the connection was torn down.
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ internal constructor(
synchronized(pendingTrackResolvers) {
pendingTrackResolvers[cid] = cont
}
cont.invokeOnCancellation {
synchronized(pendingTrackResolvers) {
if (pendingTrackResolvers[cid] === cont) {
pendingTrackResolvers.remove(cid)
}
}
}
client.sendAddTrack(
cid = cid,
name = name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@
package io.livekit.android.room

import com.google.protobuf.ByteString
import io.livekit.android.room.track.TrackException
import io.livekit.android.test.MockE2ETest
import io.livekit.android.test.events.FlowCollector
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockPeerConnection
import io.livekit.android.test.mock.SignalRequestHandler
import io.livekit.android.test.mock.TestData
import io.livekit.android.test.util.toPBByteString
import io.livekit.android.util.TimeoutException
import io.livekit.android.util.flow
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import livekit.LivekitModels
import livekit.LivekitModels.DataPacket
import livekit.LivekitRtc
import livekit.org.webrtc.PeerConnection
import org.junit.Assert
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
Expand Down Expand Up @@ -370,6 +378,121 @@ class RTCEngineMockE2ETest : MockE2ETest() {
assertEquals(TestData.JOIN.join.participant.sid, sid)
}

/**
* Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers,
* poisoning every subsequent publish of the same track with DuplicateTrackException
* until the connection was torn down (or the server eventually responded.
*/
@Test
fun addTrackTimeoutDoesNotPoisonRetry() = runTest {
connect()
// The default mock handler auto-replies to ADD_TRACK; remove it so we can
// simulate the "server never responds" case that triggers the timeout.
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)

// Use a SupervisorJob so timeout/cancellation in addTrack does not cancel the test scope.
val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob())
try {
val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid

val firstPublish = supervisor.async {
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
}
runCurrent()

// Push past the 20s AddTrack deadline without the server replying.
testScheduler.advanceTimeBy(21_000)
runCurrent()

assertTrue("firstPublish should be completed", firstPublish.isCompleted)
val firstFailure = firstPublish.getCompletionExceptionOrNull()
assertTrue(
"Expected TimeoutException, got $firstFailure",
firstFailure is TimeoutException,
)

// Retry with the same cid — must not be rejected by the duplicate guard.
val secondPublish = supervisor.async {
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
}
// runCurrent (not advanceUntilIdle): otherwise the new 20s deadline fires
// before the simulated server response is delivered.
runCurrent()

if (secondPublish.isCompleted) {
val syncFailure = secondPublish.getCompletionExceptionOrNull()
assertFalse(
"Retry must not fail synchronously with DuplicateTrackException, got $syncFailure",
syncFailure is TrackException.DuplicateTrackException,
)
}

// Server now replies for the retry; the second publish should resolve cleanly.
simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED)
runCurrent()

assertTrue("secondPublish should be completed", secondPublish.isCompleted)
val secondFailure = secondPublish.getCompletionExceptionOrNull()
assertTrue("Retry should have succeeded, got $secondFailure", secondFailure == null)
assertEquals(
TestData.LOCAL_TRACK_PUBLISHED.trackPublished.track.sid,
secondPublish.getCompleted().sid,
)
} finally {
supervisor.cancel()
}
}

/**
* Regression: caller cancellation of an in-flight addTrack must also clean up
* the pendingTrackResolvers entry so the same cid can be retried.
*/
@Test
fun addTrackCallerCancellationDoesNotPoisonRetry() = runTest {
connect()
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)

val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob())
try {
val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid

val firstPublish = supervisor.async {
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
}
runCurrent()
assertFalse("firstPublish should still be in-flight", firstPublish.isCompleted)

firstPublish.cancel()
runCurrent()
assertTrue("firstPublish should be cancelled", firstPublish.isCancelled)

val secondPublish = supervisor.async {
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
}
runCurrent()

if (secondPublish.isCompleted) {
val syncFailure = secondPublish.getCompletionExceptionOrNull()
assertFalse(
"Retry must not fail synchronously with DuplicateTrackException, got $syncFailure",
syncFailure is TrackException.DuplicateTrackException,
)
}

simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED)
runCurrent()

assertTrue("secondPublish should be completed", secondPublish.isCompleted)
val secondFailure = secondPublish.getCompletionExceptionOrNull()
assertTrue(
"Retry after cancellation should have succeeded, got $secondFailure",
secondFailure == null,
)
} finally {
supervisor.cancel()
}
}

/**
* After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine
* drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the
Expand Down
Loading