Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public synchronized void startConnectCheckTask(long connectionTimeout) {
connectCheckerTask = new SchedulerTimerTask(connectChecker);

synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
if (CHECKER_COUNTER == 0 || READ_CHECK_TIMER == null) {
if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
ASYNC_TASKS = createExecutor();
}
Expand All @@ -426,7 +426,19 @@ public synchronized void startConnectCheckTask(long connectionTimeout) {
}
}
CHECKER_COUNTER++;
READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
try {
READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
} catch (IllegalStateException e) {
// The timer thread may have exited due to an unhandled Error in a previous task.
// Cancel the dead timer so its thread can exit, replace it, and reschedule.
// Note: tasks from other active connections that were on the old timer will no
// longer fire; those connections lose timeout enforcement until they reconnect.
LOG.warn("Read check timer was in a cancelled state, creating new timer instance: {}", e.getMessage());
Timer oldTimer = READ_CHECK_TIMER;
READ_CHECK_TIMER = new Timer(READ_CHECK_THREAD_NAME, true);
READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
oldTimer.cancel();
}
}
}
}
Expand Down Expand Up @@ -485,10 +497,26 @@ protected synchronized void startMonitorThreads() throws IOException {

CHECKER_COUNTER++;
if (readCheckTime > 0) {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
try {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
} catch (IllegalStateException e) {
LOG.warn("Read check timer was in a cancelled state, creating new timer instance: {}", e.getMessage());
Timer oldTimer = READ_CHECK_TIMER;
READ_CHECK_TIMER = new Timer(READ_CHECK_THREAD_NAME, true);
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
oldTimer.cancel();
}
}
if (writeCheckTime > 0) {
WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
try {
WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
} catch (IllegalStateException e) {
LOG.warn("Write check timer was in a cancelled state, creating new timer instance: {}", e.getMessage());
Timer oldTimer = WRITE_CHECK_TIMER;
WRITE_CHECK_TIMER = new Timer(WRITE_CHECK_THREAD_NAME, true);
WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
oldTimer.cancel();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import static org.hamcrest.core.IsNot.not;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +44,7 @@
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
Expand Down Expand Up @@ -282,6 +285,174 @@ public void testCheckTimersNotLeakingConcurrentConnections() throws Exception {
}
}

/**
* Verifies that a connection succeeds when READ_CHECK_TIMER is already in a cancelled
* state with CHECKER_COUNTER == 0. This can happen when the timer's internal thread
* dies due to an unhandled Error in a TimerTask: Java marks the Timer as cancelled but
* the static field is not nulled, so subsequent schedule() calls throw
* "Timer already cancelled".
*/
public void testRecoversFromCancelledTimerWhenCheckerCounterIsZero() throws Exception {
Field readCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("READ_CHECK_TIMER");
readCheckTimerField.setAccessible(true);

// Inject a pre-cancelled (but non-null) timer with CHECKER_COUNTER == 0.
// Without the fix, READ_CHECK_TIMER.schedule() in startConnectCheckTask()
// throws IllegalStateException and the connection is rejected.
Timer cancelledTimer = new Timer(READ_CHECK_THREAD_NAME, true);
cancelledTimer.cancel();
readCheckTimerField.set(null, cancelledTimer);

startClient();

// The recovery in startConnectCheckTask() is synchronous: the new timer is
// installed before startClient() returns, so no wait is required here.
Timer recovered = (Timer) readCheckTimerField.get(null);
assertNotNull("A new timer should have been installed by the recovery path", recovered);
assertNotSame("Recovered timer must be a new instance, not the cancelled one", cancelledTimer, recovered);

// Confirm no errors materialised asynchronously after the connection was set up.
assertFalse("Client should connect without errors",
Wait.waitFor(() -> clientErrorCount.get() > 0, 2000, 50));
assertFalse("Server should accept without errors",
Wait.waitFor(() -> serverErrorCount.get() > 0, 2000, 50));
}

/**
* Verifies that a new connection succeeds when READ_CHECK_TIMER is in a cancelled
* state while CHECKER_COUNTER > 0 (active connections exist). This is the exact
* production scenario logged as WARN "Could not accept connection: Timer already
* cancelled" from TransportConnector when the timer thread dies unexpectedly.
*/
public void testRecoversFromCancelledTimerWithActiveConnections() throws Exception {
// Establish a first connection so CHECKER_COUNTER > 0 with a valid timer.
// startConnectCheckTask() is synchronous: READ_CHECK_TIMER and CHECKER_COUNTER > 0
// are both set before startClient() returns, so no sleep is needed.
startClient();

// Save the first server-side transport before a second connection overwrites the field.
Transport firstServerTransport = serverTransport;

Field readCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("READ_CHECK_TIMER");
readCheckTimerField.setAccessible(true);

// Replace the valid static timer with a cancelled one while CHECKER_COUNTER > 0.
// This simulates the timer thread dying (e.g. due to OOM) with active connections
// still alive and their tasks silently orphaned on the dead timer.
Timer cancelledTimer = new Timer(READ_CHECK_THREAD_NAME, true);
cancelledTimer.cancel();
readCheckTimerField.set(null, cancelledTimer);

// A second connection attempt with CHECKER_COUNTER > 0 and a cancelled timer
// previously hit the unguarded READ_CHECK_TIMER.schedule() call directly
// (the null-check was skipped) and threw IllegalStateException.
AtomicBoolean secondClientError = new AtomicBoolean(false);
Transport secondClient = TransportFactory.connect(
new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
secondClient.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {}
@Override
public void onException(IOException error) {
secondClientError.set(true);
}
@Override
public void transportInterupted() {}
@Override
public void transportResumed() {}
});

try {
secondClient.start();
} catch (Exception e) {
fail("Second connection should not fail after cancelled timer recovery: " + e.getMessage());
}

// The recovery in startConnectCheckTask() is synchronous: the new timer is
// installed before secondClient.start() returns.
Timer recovered = (Timer) readCheckTimerField.get(null);
assertNotNull("A new timer should have been installed", recovered);
assertNotSame("Recovered timer must differ from the cancelled one", cancelledTimer, recovered);

// Confirm no errors materialised asynchronously.
assertFalse("Second client should connect without error",
Wait.waitFor(() -> secondClientError.get(), 2000, 50));

ignoreClientError.set(true);
ignoreServerError.set(true);
secondClient.stop();
firstServerTransport.stop();
}

/**
* Verifies that a new connection succeeds when WRITE_CHECK_TIMER is in a cancelled
* state. The write timer is set in startMonitorThreads(), which is called during the
* WireFormatInfo exchange. This test covers the recovery path in that method.
*/
public void testRecoversFromCancelledWriteCheckTimer() throws Exception {
// Establish a first connection so startMonitorThreads() runs and initialises
// WRITE_CHECK_TIMER. That method is triggered asynchronously by the WireFormatInfo
// exchange, so we wait until the field is populated.
startClient();

Field writeCheckTimerField = AbstractInactivityMonitor.class.getDeclaredField("WRITE_CHECK_TIMER");
writeCheckTimerField.setAccessible(true);

assertTrue("WRITE_CHECK_TIMER should be set after the WireFormatInfo exchange",
Wait.waitFor(() -> writeCheckTimerField.get(null) != null, 3000, 50));

// Save the first server-side transport before a second connection overwrites the field.
Transport firstServerTransport = serverTransport;

// Replace the valid WRITE_CHECK_TIMER with a cancelled one while monitor threads
// are active, simulating the timer thread dying under load.
Timer cancelledTimer = new Timer(WRITE_CHECK_THREAD_NAME, true);
cancelledTimer.cancel();
writeCheckTimerField.set(null, cancelledTimer);

// The second client's WireFormatInfo exchange triggers startMonitorThreads(), which
// calls WRITE_CHECK_TIMER.schedule(). The cancelled timer causes an ISE that the
// fix catches, replacing the timer and rescheduling the write checker.
AtomicBoolean secondClientError = new AtomicBoolean(false);
Transport secondClient = TransportFactory.connect(
new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
secondClient.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {}
@Override
public void onException(IOException error) {
secondClientError.set(true);
}
@Override
public void transportInterupted() {}
@Override
public void transportResumed() {}
});

try {
secondClient.start();
} catch (Exception e) {
fail("Second connection should not fail after cancelled write-check timer recovery: " + e.getMessage());
}

// The write-timer recovery happens inside the WireFormatInfo exchange, which is
// asynchronous after start() returns. Wait until the cancelled timer is replaced.
assertTrue("WRITE_CHECK_TIMER should be replaced by the recovery path",
Wait.waitFor(() -> writeCheckTimerField.get(null) != cancelledTimer, 3000, 50));

Timer recovered = (Timer) writeCheckTimerField.get(null);
assertNotNull("A new write check timer should have been installed", recovered);
assertNotSame("Recovered timer must differ from the cancelled one", cancelledTimer, recovered);

assertFalse("Second client should connect without error",
Wait.waitFor(() -> secondClientError.get(), 2000, 50));

ignoreClientError.set(true);
ignoreServerError.set(true);
secondClient.stop();
firstServerTransport.stop();
}

private static boolean hasTimerThread(String name) {
return name.equals(READ_CHECK_THREAD_NAME) || name.equals(WRITE_CHECK_THREAD_NAME);
}
Expand Down