diff --git a/.changeset/sturdy-foxes-publish.md b/.changeset/sturdy-foxes-publish.md new file mode 100644 index 00000000..3aae73bb --- /dev/null +++ b/.changeset/sturdy-foxes-publish.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fixed RTCEngine.addTrack leaking pendingTrackResolvers entries on timeout or caller cancellation, which previously caused subsequent publishes of the same track to fail with DuplicateTrackException until the connection was torn down. diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 72326230..323b8001 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -403,6 +403,13 @@ internal constructor( synchronized(pendingTrackResolvers) { pendingTrackResolvers[cid] = cont } + cont.invokeOnCancellation { + synchronized(pendingTrackResolvers) { + if (pendingTrackResolvers[cid] === cont) { + pendingTrackResolvers.remove(cid) + } + } + } client.sendAddTrack( cid = cid, name = name, diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index 96e6b943..87ed6ff4 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -17,6 +17,7 @@ package io.livekit.android.room import com.google.protobuf.ByteString +import io.livekit.android.room.track.TrackException import io.livekit.android.test.MockE2ETest import io.livekit.android.test.events.FlowCollector import io.livekit.android.test.mock.MockDataChannel @@ -24,16 +25,23 @@ import io.livekit.android.test.mock.MockPeerConnection import io.livekit.android.test.mock.SignalRequestHandler import io.livekit.android.test.mock.TestData import io.livekit.android.test.util.toPBByteString +import io.livekit.android.util.TimeoutException import io.livekit.android.util.flow import io.livekit.android.util.toOkioByteString +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import livekit.LivekitModels import livekit.LivekitModels.DataPacket import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection import org.junit.Assert import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -370,6 +378,121 @@ class RTCEngineMockE2ETest : MockE2ETest() { assertEquals(TestData.JOIN.join.participant.sid, sid) } + /** + * Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers, + * poisoning every subsequent publish of the same track with DuplicateTrackException + * until the connection was torn down (or the server eventually responded. + */ + @Test + fun addTrackTimeoutDoesNotPoisonRetry() = runTest { + connect() + // The default mock handler auto-replies to ADD_TRACK; remove it so we can + // simulate the "server never responds" case that triggers the timeout. + wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) + + // Use a SupervisorJob so timeout/cancellation in addTrack does not cancel the test scope. + val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob()) + try { + val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid + + val firstPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + + // Push past the 20s AddTrack deadline without the server replying. + testScheduler.advanceTimeBy(21_000) + runCurrent() + + assertTrue("firstPublish should be completed", firstPublish.isCompleted) + val firstFailure = firstPublish.getCompletionExceptionOrNull() + assertTrue( + "Expected TimeoutException, got $firstFailure", + firstFailure is TimeoutException, + ) + + // Retry with the same cid — must not be rejected by the duplicate guard. + val secondPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + // runCurrent (not advanceUntilIdle): otherwise the new 20s deadline fires + // before the simulated server response is delivered. + runCurrent() + + if (secondPublish.isCompleted) { + val syncFailure = secondPublish.getCompletionExceptionOrNull() + assertFalse( + "Retry must not fail synchronously with DuplicateTrackException, got $syncFailure", + syncFailure is TrackException.DuplicateTrackException, + ) + } + + // Server now replies for the retry; the second publish should resolve cleanly. + simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED) + runCurrent() + + assertTrue("secondPublish should be completed", secondPublish.isCompleted) + val secondFailure = secondPublish.getCompletionExceptionOrNull() + assertTrue("Retry should have succeeded, got $secondFailure", secondFailure == null) + assertEquals( + TestData.LOCAL_TRACK_PUBLISHED.trackPublished.track.sid, + secondPublish.getCompleted().sid, + ) + } finally { + supervisor.cancel() + } + } + + /** + * Regression: caller cancellation of an in-flight addTrack must also clean up + * the pendingTrackResolvers entry so the same cid can be retried. + */ + @Test + fun addTrackCallerCancellationDoesNotPoisonRetry() = runTest { + connect() + wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) + + val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob()) + try { + val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid + + val firstPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + assertFalse("firstPublish should still be in-flight", firstPublish.isCompleted) + + firstPublish.cancel() + runCurrent() + assertTrue("firstPublish should be cancelled", firstPublish.isCancelled) + + val secondPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + + if (secondPublish.isCompleted) { + val syncFailure = secondPublish.getCompletionExceptionOrNull() + assertFalse( + "Retry must not fail synchronously with DuplicateTrackException, got $syncFailure", + syncFailure is TrackException.DuplicateTrackException, + ) + } + + simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED) + runCurrent() + + assertTrue("secondPublish should be completed", secondPublish.isCompleted) + val secondFailure = secondPublish.getCompletionExceptionOrNull() + assertTrue( + "Retry after cancellation should have succeeded, got $secondFailure", + secondFailure == null, + ) + } finally { + supervisor.cancel() + } + } + /** * After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine * drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the