From 8d335b3d7760c2a3924c07c49e05a9f8fd9e69b0 Mon Sep 17 00:00:00 2001 From: Adrian Niculescu <15037449+adrian-niculescu@users.noreply.github.com> Date: Fri, 1 May 2026 00:35:19 +0300 Subject: [PATCH] Fixed RTCEngine.addTrack leaking pendingTrackResolvers on cancellation When the AddTrack request timed out via withDeadline (20s) or its caller coroutine was cancelled, the cid stayed in pendingTrackResolvers because suspendCancellableCoroutine had no invokeOnCancellation cleanup. A retry of the same track instance (same cid) hit the duplicate guard and threw DuplicateTrackException, blocking republish until the connection closed, the signal reconnected, or the server eventually replied for that cid. Added cont.invokeOnCancellation that removes the entry under the existing synchronized monitor, with an identity check so a stale handler cannot evict a freshly-registered resolver bound to the same cid. Added two regression tests in RTCEngineMockE2ETest covering the timeout and caller-cancellation paths. --- .changeset/sturdy-foxes-publish.md | 5 + .../java/io/livekit/android/room/RTCEngine.kt | 7 + .../android/room/RTCEngineMockE2ETest.kt | 125 +++++++++++++++++- 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 .changeset/sturdy-foxes-publish.md diff --git a/.changeset/sturdy-foxes-publish.md b/.changeset/sturdy-foxes-publish.md new file mode 100644 index 000000000..3aae73bba --- /dev/null +++ b/.changeset/sturdy-foxes-publish.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fixed RTCEngine.addTrack leaking pendingTrackResolvers entries on timeout or caller cancellation, which previously caused subsequent publishes of the same track to fail with DuplicateTrackException until the connection was torn down. diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index ed1f5fbe4..1249d156d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -403,6 +403,13 @@ internal constructor( synchronized(pendingTrackResolvers) { pendingTrackResolvers[cid] = cont } + cont.invokeOnCancellation { + synchronized(pendingTrackResolvers) { + if (pendingTrackResolvers[cid] === cont) { + pendingTrackResolvers.remove(cid) + } + } + } client.sendAddTrack( cid = cid, name = name, diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index c881a2df9..70ac17038 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,29 @@ package io.livekit.android.room +import io.livekit.android.room.track.TrackException import io.livekit.android.test.MockE2ETest import io.livekit.android.test.events.FlowCollector import io.livekit.android.test.mock.MockPeerConnection import io.livekit.android.test.mock.SignalRequestHandler import io.livekit.android.test.mock.TestData import io.livekit.android.test.util.toPBByteString +import io.livekit.android.util.TimeoutException import io.livekit.android.util.flow import io.livekit.android.util.toOkioByteString +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import livekit.LivekitModels import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection import org.junit.Assert import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -366,4 +374,119 @@ class RTCEngineMockE2ETest : MockE2ETest() { val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID) assertEquals(TestData.JOIN.join.participant.sid, sid) } + + /** + * Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers, + * poisoning every subsequent publish of the same track with DuplicateTrackException + * until the connection was torn down (or the server eventually responded. + */ + @Test + fun addTrackTimeoutDoesNotPoisonRetry() = runTest { + connect() + // The default mock handler auto-replies to ADD_TRACK; remove it so we can + // simulate the "server never responds" case that triggers the timeout. + wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) + + // Use a SupervisorJob so timeout/cancellation in addTrack does not cancel the test scope. + val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob()) + try { + val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid + + val firstPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + + // Push past the 20s AddTrack deadline without the server replying. + testScheduler.advanceTimeBy(21_000) + runCurrent() + + assertTrue("firstPublish should be completed", firstPublish.isCompleted) + val firstFailure = firstPublish.getCompletionExceptionOrNull() + assertTrue( + "Expected TimeoutException, got $firstFailure", + firstFailure is TimeoutException, + ) + + // Retry with the same cid — must not be rejected by the duplicate guard. + val secondPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + // runCurrent (not advanceUntilIdle): otherwise the new 20s deadline fires + // before the simulated server response is delivered. + runCurrent() + + if (secondPublish.isCompleted) { + val syncFailure = secondPublish.getCompletionExceptionOrNull() + assertFalse( + "Retry must not fail synchronously with DuplicateTrackException, got $syncFailure", + syncFailure is TrackException.DuplicateTrackException, + ) + } + + // Server now replies for the retry; the second publish should resolve cleanly. + simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED) + runCurrent() + + assertTrue("secondPublish should be completed", secondPublish.isCompleted) + val secondFailure = secondPublish.getCompletionExceptionOrNull() + assertTrue("Retry should have succeeded, got $secondFailure", secondFailure == null) + assertEquals( + TestData.LOCAL_TRACK_PUBLISHED.trackPublished.track.sid, + secondPublish.getCompleted().sid, + ) + } finally { + supervisor.cancel() + } + } + + /** + * Regression: caller cancellation of an in-flight addTrack must also clean up + * the pendingTrackResolvers entry so the same cid can be retried. + */ + @Test + fun addTrackCallerCancellationDoesNotPoisonRetry() = runTest { + connect() + wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) + + val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob()) + try { + val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid + + val firstPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + assertFalse("firstPublish should still be in-flight", firstPublish.isCompleted) + + firstPublish.cancel() + runCurrent() + assertTrue("firstPublish should be cancelled", firstPublish.isCancelled) + + val secondPublish = supervisor.async { + rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null) + } + runCurrent() + + if (secondPublish.isCompleted) { + val syncFailure = secondPublish.getCompletionExceptionOrNull() + assertFalse( + "Retry must not fail synchronously with DuplicateTrackException, got $syncFailure", + syncFailure is TrackException.DuplicateTrackException, + ) + } + + simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED) + runCurrent() + + assertTrue("secondPublish should be completed", secondPublish.isCompleted) + val secondFailure = secondPublish.getCompletionExceptionOrNull() + assertTrue( + "Retry after cancellation should have succeeded, got $secondFailure", + secondFailure == null, + ) + } finally { + supervisor.cancel() + } + } }