From 29ffb1ce071fd3fa359ff8f806b2d1d79c21d35d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Fri, 24 Apr 2026 15:14:02 +0200 Subject: [PATCH] [AMQ-7452] Fix "Timer already cancelled" on new connections when shared inactivity timer dies The static READ_CHECK_TIMER and WRITE_CHECK_TIMER in AbstractInactivityMonitor are shared across all transport instances. Java's Timer permanently marks itself cancelled when its internal thread exits due to an unhandled Error thrown by a TimerTask. SchedulerTimerTask rethrows Error subclasses, so an OutOfMemoryError in any timer task kills the shared timer thread while the static field remains non-null. All subsequent schedule() calls on new connections then throw IllegalStateException ("Timer already cancelled"), which propagates to TransportConnector.onAcceptError() and is logged as WARN "Could not accept connection: Timer already cancelled." Fix: wrap the schedule() calls in startConnectCheckTask() and startMonitorThreads() in try/catch(IllegalStateException). On catch, the dead timer is cancelled (to let its thread exit), replaced with a fresh instance, and the task is rescheduled. The guard condition in startConnectCheckTask() is also extended with || READ_CHECK_TIMER == null to cover the case where the field is unexpectedly null when CHECKER_COUNTER > 0. Add three regression tests to InactivityMonitorTest covering: - READ_CHECK_TIMER cancelled with CHECKER_COUNTER == 0 (startConnectCheckTask path) - READ_CHECK_TIMER cancelled with CHECKER_COUNTER > 0 (production scenario) - WRITE_CHECK_TIMER cancelled (startMonitorThreads path) Tests use Wait.waitFor() instead of Thread.sleep() for reliability on slow CI. --- .../transport/AbstractInactivityMonitor.java | 36 +++- .../transport/tcp/InactivityMonitorTest.java | 171 ++++++++++++++++++ 2 files changed, 203 insertions(+), 4 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index 75cd4079d84..48f3269e16c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -417,7 +417,7 @@ public synchronized void startConnectCheckTask(long connectionTimeout) { connectCheckerTask = new SchedulerTimerTask(connectChecker); synchronized (AbstractInactivityMonitor.class) { - if (CHECKER_COUNTER == 0) { + if (CHECKER_COUNTER == 0 || READ_CHECK_TIMER == null) { if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { ASYNC_TASKS = createExecutor(); } @@ -426,7 +426,19 @@ public synchronized void startConnectCheckTask(long connectionTimeout) { } } CHECKER_COUNTER++; - READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout); + try { + READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout); + } catch (IllegalStateException e) { + // The timer thread may have exited due to an unhandled Error in a previous task. + // Cancel the dead timer so its thread can exit, replace it, and reschedule. + // Note: tasks from other active connections that were on the old timer will no + // longer fire; those connections lose timeout enforcement until they reconnect. + LOG.warn("Read check timer was in a cancelled state, creating new timer instance: {}", e.getMessage()); + Timer oldTimer = READ_CHECK_TIMER; + READ_CHECK_TIMER = new Timer(READ_CHECK_THREAD_NAME, true); + READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout); + oldTimer.cancel(); + } } } } @@ -485,10 +497,26 @@ protected synchronized void startMonitorThreads() throws IOException { CHECKER_COUNTER++; if (readCheckTime > 0) { - READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); + try { + READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); + } catch (IllegalStateException e) { + LOG.warn("Read check timer was in a cancelled state, creating new timer instance: {}", e.getMessage()); + Timer oldTimer = READ_CHECK_TIMER; + READ_CHECK_TIMER = new Timer(READ_CHECK_THREAD_NAME, true); + READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); + oldTimer.cancel(); + } } if (writeCheckTime > 0) { - WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); + try { + WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); + } catch (IllegalStateException e) { + LOG.warn("Write check timer was in a cancelled state, creating new timer instance: {}", e.getMessage()); + Timer oldTimer = WRITE_CHECK_TIMER; + WRITE_CHECK_TIMER = new Timer(WRITE_CHECK_THREAD_NAME, true); + WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); + oldTimer.cancel(); + } } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index c1ff0264a7e..48363dba2b3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -26,10 +26,12 @@ import static org.hamcrest.core.IsNot.not; import java.io.IOException; +import java.lang.reflect.Field; import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; import java.util.List; +import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -42,6 +44,7 @@ import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.AbstractInactivityMonitor; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportFactory; @@ -282,6 +285,174 @@ public void testCheckTimersNotLeakingConcurrentConnections() throws Exception { } } + /** + * Verifies that a connection succeeds when READ_CHECK_TIMER is already in a cancelled + * state with CHECKER_COUNTER == 0. This can happen when the timer's internal thread + * dies due to an unhandled Error in a TimerTask: Java marks the Timer as cancelled but + * the static field is not nulled, so subsequent schedule() calls throw + * "Timer already cancelled". + */ + public void testRecoversFromCancelledTimerWhenCheckerCounterIsZero() throws Exception { + Field readCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("READ_CHECK_TIMER"); + readCheckTimerField.setAccessible(true); + + // Inject a pre-cancelled (but non-null) timer with CHECKER_COUNTER == 0. + // Without the fix, READ_CHECK_TIMER.schedule() in startConnectCheckTask() + // throws IllegalStateException and the connection is rejected. + Timer cancelledTimer = new Timer(READ_CHECK_THREAD_NAME, true); + cancelledTimer.cancel(); + readCheckTimerField.set(null, cancelledTimer); + + startClient(); + + // The recovery in startConnectCheckTask() is synchronous: the new timer is + // installed before startClient() returns, so no wait is required here. + Timer recovered = (Timer) readCheckTimerField.get(null); + assertNotNull("A new timer should have been installed by the recovery path", recovered); + assertNotSame("Recovered timer must be a new instance, not the cancelled one", cancelledTimer, recovered); + + // Confirm no errors materialised asynchronously after the connection was set up. + assertFalse("Client should connect without errors", + Wait.waitFor(() -> clientErrorCount.get() > 0, 2000, 50)); + assertFalse("Server should accept without errors", + Wait.waitFor(() -> serverErrorCount.get() > 0, 2000, 50)); + } + + /** + * Verifies that a new connection succeeds when READ_CHECK_TIMER is in a cancelled + * state while CHECKER_COUNTER > 0 (active connections exist). This is the exact + * production scenario logged as WARN "Could not accept connection: Timer already + * cancelled" from TransportConnector when the timer thread dies unexpectedly. + */ + public void testRecoversFromCancelledTimerWithActiveConnections() throws Exception { + // Establish a first connection so CHECKER_COUNTER > 0 with a valid timer. + // startConnectCheckTask() is synchronous: READ_CHECK_TIMER and CHECKER_COUNTER > 0 + // are both set before startClient() returns, so no sleep is needed. + startClient(); + + // Save the first server-side transport before a second connection overwrites the field. + Transport firstServerTransport = serverTransport; + + Field readCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("READ_CHECK_TIMER"); + readCheckTimerField.setAccessible(true); + + // Replace the valid static timer with a cancelled one while CHECKER_COUNTER > 0. + // This simulates the timer thread dying (e.g. due to OOM) with active connections + // still alive and their tasks silently orphaned on the dead timer. + Timer cancelledTimer = new Timer(READ_CHECK_THREAD_NAME, true); + cancelledTimer.cancel(); + readCheckTimerField.set(null, cancelledTimer); + + // A second connection attempt with CHECKER_COUNTER > 0 and a cancelled timer + // previously hit the unguarded READ_CHECK_TIMER.schedule() call directly + // (the null-check was skipped) and threw IllegalStateException. + AtomicBoolean secondClientError = new AtomicBoolean(false); + Transport secondClient = TransportFactory.connect( + new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000")); + secondClient.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) {} + @Override + public void onException(IOException error) { + secondClientError.set(true); + } + @Override + public void transportInterupted() {} + @Override + public void transportResumed() {} + }); + + try { + secondClient.start(); + } catch (Exception e) { + fail("Second connection should not fail after cancelled timer recovery: " + e.getMessage()); + } + + // The recovery in startConnectCheckTask() is synchronous: the new timer is + // installed before secondClient.start() returns. + Timer recovered = (Timer) readCheckTimerField.get(null); + assertNotNull("A new timer should have been installed", recovered); + assertNotSame("Recovered timer must differ from the cancelled one", cancelledTimer, recovered); + + // Confirm no errors materialised asynchronously. + assertFalse("Second client should connect without error", + Wait.waitFor(() -> secondClientError.get(), 2000, 50)); + + ignoreClientError.set(true); + ignoreServerError.set(true); + secondClient.stop(); + firstServerTransport.stop(); + } + + /** + * Verifies that a new connection succeeds when WRITE_CHECK_TIMER is in a cancelled + * state. The write timer is set in startMonitorThreads(), which is called during the + * WireFormatInfo exchange. This test covers the recovery path in that method. + */ + public void testRecoversFromCancelledWriteCheckTimer() throws Exception { + // Establish a first connection so startMonitorThreads() runs and initialises + // WRITE_CHECK_TIMER. That method is triggered asynchronously by the WireFormatInfo + // exchange, so we wait until the field is populated. + startClient(); + + Field writeCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("WRITE_CHECK_TIMER"); + writeCheckTimerField.setAccessible(true); + + assertTrue("WRITE_CHECK_TIMER should be set after the WireFormatInfo exchange", + Wait.waitFor(() -> writeCheckTimerField.get(null) != null, 3000, 50)); + + // Save the first server-side transport before a second connection overwrites the field. + Transport firstServerTransport = serverTransport; + + // Replace the valid WRITE_CHECK_TIMER with a cancelled one while monitor threads + // are active, simulating the timer thread dying under load. + Timer cancelledTimer = new Timer(WRITE_CHECK_THREAD_NAME, true); + cancelledTimer.cancel(); + writeCheckTimerField.set(null, cancelledTimer); + + // The second client's WireFormatInfo exchange triggers startMonitorThreads(), which + // calls WRITE_CHECK_TIMER.schedule(). The cancelled timer causes an ISE that the + // fix catches, replacing the timer and rescheduling the write checker. + AtomicBoolean secondClientError = new AtomicBoolean(false); + Transport secondClient = TransportFactory.connect( + new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000")); + secondClient.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) {} + @Override + public void onException(IOException error) { + secondClientError.set(true); + } + @Override + public void transportInterupted() {} + @Override + public void transportResumed() {} + }); + + try { + secondClient.start(); + } catch (Exception e) { + fail("Second connection should not fail after cancelled write-check timer recovery: " + e.getMessage()); + } + + // The write-timer recovery happens inside the WireFormatInfo exchange, which is + // asynchronous after start() returns. Wait until the cancelled timer is replaced. + assertTrue("WRITE_CHECK_TIMER should be replaced by the recovery path", + Wait.waitFor(() -> writeCheckTimerField.get(null) != cancelledTimer, 3000, 50)); + + Timer recovered = (Timer) writeCheckTimerField.get(null); + assertNotNull("A new write check timer should have been installed", recovered); + assertNotSame("Recovered timer must differ from the cancelled one", cancelledTimer, recovered); + + assertFalse("Second client should connect without error", + Wait.waitFor(() -> secondClientError.get(), 2000, 50)); + + ignoreClientError.set(true); + ignoreServerError.set(true); + secondClient.stop(); + firstServerTransport.stop(); + } + private static boolean hasTimerThread(String name) { return name.equals(READ_CHECK_THREAD_NAME) || name.equals(WRITE_CHECK_THREAD_NAME); }