From cf8577f42876fadd5df9dbbc3afa38a3c076696a Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Tue, 31 Mar 2026 08:59:52 +0300 Subject: [PATCH 1/3] IGNITE-28057 Refactor communication test messages --- .../src/test/config/io-manager-benchmark.xml | 6 ++ .../GridCommunicationSendMessageSelfTest.java | 63 ++---------- ...ommunicationSendOverByteIdTestMessage.java | 28 ++++++ .../GridCommunicationSendTestMessage.java | 28 ++++++ .../communication/GridIoManagerSelfTest.java | 25 +---- .../GridIoManagerSelfTestMessage.java | 36 +++++++ .../IgniteMessageFactoryImplTest.java | 99 ++++--------------- .../IgniteMessageFactoryImplTestMessage1.java | 36 +++++++ .../IgniteMessageFactoryImplTestMessage2.java | 36 +++++++ ...IgniteMessageFactoryImplTestMessage42.java | 36 +++++++ .../MessageDirectTypeIdConflictTest.java | 26 +---- ...essageDirectTypeIdConflictTestMessage.java | 37 +++++++ .../GridIoManagerBenchmark0.java | 2 + .../communication/GridTestMessage.java | 30 +++--- .../LoadTestsCommunicationPluginProvider.java | 39 ++++++++ ...ommunicationConnectionPoolMetricsTest.java | 71 ++++++------- ...ationConnectionPoolMetricsTestMessage.java | 40 ++++++++ 17 files changed, 397 insertions(+), 241 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendOverByteIdTestMessage.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendTestMessage.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTestMessage.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTestMessage1.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTestMessage2.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTestMessage42.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTestMessage.java create mode 100644 modules/core/src/test/java/org/apache/ignite/loadtests/communication/LoadTestsCommunicationPluginProvider.java create mode 100644 modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTestMessage.java diff --git a/modules/core/src/test/config/io-manager-benchmark.xml b/modules/core/src/test/config/io-manager-benchmark.xml index f04aaa987ea80..5f3bd89e16cc9 100644 --- a/modules/core/src/test/config/io-manager-benchmark.xml +++ b/modules/core/src/test/config/io-manager-benchmark.xml @@ -34,6 +34,12 @@ + + + + + + diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index c813901fd5ff9..bdc0cc85ae7f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.managers.communication; -import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.apache.ignite.configuration.IgniteConfiguration; @@ -27,8 +26,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -71,7 +68,7 @@ public void testSendMessage() throws Exception { try { startGridsMultiThreaded(2); - doSend(new TestMessage(), TestMessage.class); + doSend(new GridCommunicationSendTestMessage(), GridCommunicationSendTestMessage.class); } finally { stopAllGrids(); @@ -86,7 +83,7 @@ public void testSendMessageOverByteId() throws Exception { try { startGridsMultiThreaded(2); - doSend(new TestOverByteIdMessage(), TestOverByteIdMessage.class); + doSend(new GridCommunicationSendOverByteIdTestMessage(), GridCommunicationSendOverByteIdTestMessage.class); } finally { stopAllGrids(); @@ -101,7 +98,7 @@ public void testSendMessageWithBuffer() throws Exception { try { startGridsMultiThreaded(2); - doSend(new TestMessage(), TestMessage.class); + doSend(new GridCommunicationSendTestMessage(), GridCommunicationSendTestMessage.class); } finally { stopAllGrids(); @@ -147,53 +144,7 @@ private void doSend(Message msg, final Class msgCls) throws Exception { info(">>>"); } - /** */ - private static class TestMessage implements Message { - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.writeHeader(directType())) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return DIRECT_TYPE; - } - } - - /** */ - private static class TestOverByteIdMessage implements Message { - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.writeHeader(directType())) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return DIRECT_TYPE_OVER_BYTE; - } - } - - /** */ + /** Registers test messages with explicit direct types to cover both byte-range and over-byte ids. */ public static class TestPluginProvider extends AbstractTestPluginProvider { /** {@inheritDoc} */ @Override public String name() { @@ -204,8 +155,10 @@ public static class TestPluginProvider extends AbstractTestPluginProvider { @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { registry.registerExtension(MessageFactoryProvider.class, new MessageFactoryProvider() { @Override public void registerAll(MessageFactory factory) { - factory.register(DIRECT_TYPE, TestMessage::new); - factory.register(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new); + factory.register(DIRECT_TYPE, GridCommunicationSendTestMessage::new, + new GridCommunicationSendTestMessageSerializer()); + factory.register(DIRECT_TYPE_OVER_BYTE, GridCommunicationSendOverByteIdTestMessage::new, + new GridCommunicationSendOverByteIdTestMessageSerializer()); } }); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendOverByteIdTestMessage.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendOverByteIdTestMessage.java new file mode 100644 index 0000000000000..07df2655c43f4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendOverByteIdTestMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Test message. */ +public class GridCommunicationSendOverByteIdTestMessage implements Message { + /** */ + @Order(0) + byte marker; +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendTestMessage.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendTestMessage.java new file mode 100644 index 0000000000000..09a360ea6d455 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendTestMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Test message. */ +public class GridCommunicationSendTestMessage implements Message { + /** */ + @Order(0) + byte marker; +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index 593d2b305a800..742223eb68ec9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.managers.communication; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.UUID; import java.util.concurrent.Callable; @@ -29,8 +28,6 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; @@ -93,7 +90,8 @@ public GridIoManagerSelfTest() throws IgniteCheckedException { public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception { GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { - new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(), + new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, + new GridIoManagerSelfTestMessage(), GridIoPolicy.P2P_POOL); return null; @@ -224,23 +222,4 @@ private static class IsEqualCollection implements ArgumentMatcher f.register(10_000, GridTestMessage::new, new GridTestMessageSerializer()) + ); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java index 5d18e781e1e73..9cedf1035bfa3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java @@ -17,11 +17,9 @@ package org.apache.ignite.spi.communication.tcp; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.ignite.Ignite; @@ -47,6 +45,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool; import org.apache.ignite.spi.metric.IntMetric; @@ -76,6 +75,9 @@ public class CommunicationConnectionPoolMetricsTest extends GridCommonAbstractTe /** */ private static final int MIN_LOAD_THREADS = 2; + /** Artificial write delay reproducing the original pending queue scenario. */ + static final int WRITE_DELAY_MS = 50; + /** */ private volatile long maxConnIdleTimeout = TcpCommunicationSpi.DFLT_IDLE_CONN_TIMEOUT; @@ -154,7 +156,7 @@ public void testRemovedConnectionMetrics() throws Exception { Ignite ldr = clientLdr ? cli : srvr; AtomicBoolean runFlag = new AtomicBoolean(true); - TestMessage msg = new TestMessage(); + CommunicationConnectionPoolMetricsTestMessage msg = new CommunicationConnectionPoolMetricsTestMessage(); IgniteInternalFuture loadFut = runLoad(ldr, runFlag, () -> msg, null); @@ -206,7 +208,7 @@ public void testIdleRemovedConnectionMetricsUnderLazyLoad() throws Exception { Ignite ldr = clientLdr ? cli : srvr; AtomicBoolean runFlag = new AtomicBoolean(true); - Message msg = new TestMessage(); + Message msg = new CommunicationConnectionPoolMetricsTestMessage(); IgniteInternalFuture loadFut = runLoad(ldr, runFlag, () -> msg, null, maxConnIdleTimeout, maxConnIdleTimeout * 4); @@ -257,7 +259,7 @@ public void testMetricsBasics() throws Exception { AtomicBoolean runFlag = new AtomicBoolean(true); AtomicLong loadCnt = new AtomicLong(preloadCnt); - TestMessage msg = new TestMessage(); + CommunicationConnectionPoolMetricsTestMessage msg = new CommunicationConnectionPoolMetricsTestMessage(); long loadMillis0 = System.currentTimeMillis(); @@ -368,7 +370,7 @@ public void testAcquiringThreadsCntMetric() throws Exception { } }); - IgniteInternalFuture loadFut = runLoad(ldr, runFlag, () -> new TestMessage((int)maxConnIdleTimeout * 3), null); + IgniteInternalFuture loadFut = runLoad(ldr, runFlag, CommunicationConnectionPoolMetricsTestMessage::new, null); monFut.get(getTestTimeout()); @@ -390,12 +392,12 @@ public void testPendingMessagesMetric() throws Exception { AtomicBoolean runFlag = new AtomicBoolean(true); AtomicLong loadCnt = new AtomicLong(preloadCnt); - AtomicInteger writeDelay = new AtomicInteger(); + AtomicBoolean delayWrite = new AtomicBoolean(); IgniteInternalFuture loadFut = runLoad( ldr, runFlag, - () -> new TestMessage(writeDelay.get()), + () -> new CommunicationConnectionPoolMetricsTestMessage(delayWrite.get() ? WRITE_DELAY_MS : 0), loadCnt ); @@ -404,8 +406,8 @@ public void testPendingMessagesMetric() throws Exception { // Ensure that preloaded without a failure. assertTrue(runFlag.get()); - // Will delay message queue processing but not network i/o. - writeDelay.set(50); + // Delay actual socket writes for the test message to build up the outbound queue. + delayWrite.set(true); long checkPeriod = U.nanosToMillis(ConnectionClientPool.METRICS_UPDATE_THRESHOLD / 3); @@ -422,7 +424,7 @@ public void testPendingMessagesMetric() throws Exception { ); } - writeDelay.set(0); + delayWrite.set(false); dumpMetrics(ldr); @@ -528,7 +530,11 @@ public static class TestCommunicationMessagePluginProvider extends AbstractTestP @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { registry.registerExtension(MessageFactoryProvider.class, new MessageFactoryProvider() { @Override public void registerAll(MessageFactory factory) { - factory.register(TestMessage.DIRECT_TYPE, TestMessage::new); + factory.register( + 10_000, + CommunicationConnectionPoolMetricsTestMessage::new, + new TestMessageSerializer() + ); } }); } @@ -541,51 +547,32 @@ public static MetricRegistryImpl metricsForCommunicationConnection(Ignite from, .registry(metricName(SHARED_METRICS_REGISTRY_NAME, ((IgniteEx)to).context().localNodeId().toString())); } - /** */ - private static class TestMessage implements Message { - /** */ - public static final short DIRECT_TYPE = 200; - - /** */ - private final int writeDelay; - - /** */ - public TestMessage(int writeDelay) { - this.writeDelay = writeDelay; - } - - /** */ - public TestMessage() { - this(0); - } - + /** Serializer preserving the original write delay semantics of the test message. */ + private static class TestMessageSerializer implements MessageSerializer { /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (writeDelay > 0) { + @Override public boolean writeTo(CommunicationConnectionPoolMetricsTestMessage msg, MessageWriter writer) { + if (msg.writeDelay > 0) { try { - U.sleep(writeDelay); + U.sleep(msg.writeDelay); } catch (IgniteInterruptedCheckedException ignored) { // No-op. } } - writer.setBuffer(buf); + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(msg.directType())) + return false; - if (!writer.writeHeader(directType())) - return false; + writer.onHeaderWritten(); + } return true; } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + @Override public boolean readFrom(CommunicationConnectionPoolMetricsTestMessage msg, MessageReader reader) { return true; } - - /** {@inheritDoc} */ - @Override public short directType() { - return DIRECT_TYPE; - } } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTestMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTestMessage.java new file mode 100644 index 0000000000000..901b0aa2f0f94 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTestMessage.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Test message. */ +public class CommunicationConnectionPoolMetricsTestMessage implements Message { + /** Delay before writing the message header. */ + final int writeDelay; + + /** + * @param writeDelay Delay before writing the message header. + */ + public CommunicationConnectionPoolMetricsTestMessage(int writeDelay) { + this.writeDelay = writeDelay; + } + + /** + * Default constructor. + */ + public CommunicationConnectionPoolMetricsTestMessage() { + this(0); + } +} From 62070e1622e65bdcbeb8ef31d0242889b14135b5 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Fri, 10 Apr 2026 10:05:00 +0300 Subject: [PATCH 2/3] IGNITE-28057 Refactor communication test messages --- .../GridCommunicationSendMessageSelfTest.java | 31 ++++------------- .../MessageDirectTypeIdConflictTest.java | 26 ++------------- .../ignite/spi/MessagesPluginProvider.java | 33 +++++++++++++++---- ...ommunicationConnectionPoolMetricsTest.java | 7 +++- 4 files changed, 42 insertions(+), 55 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index bdc0cc85ae7f8..b201563814638 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -17,15 +17,12 @@ package org.apache.ignite.internal.managers.communication; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; -import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -51,7 +48,10 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration c = super.getConfiguration(igniteInstanceName); - c.setPluginProviders(new TestPluginProvider()); + c.setPluginProviders(new MessagesPluginProvider(Map.of( + DIRECT_TYPE, GridCommunicationSendTestMessage.class, + DIRECT_TYPE_OVER_BYTE, GridCommunicationSendOverByteIdTestMessage.class + ))); TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); @@ -144,23 +144,4 @@ private void doSend(Message msg, final Class msgCls) throws Exception { info(">>>"); } - /** Registers test messages with explicit direct types to cover both byte-range and over-byte ids. */ - public static class TestPluginProvider extends AbstractTestPluginProvider { - /** {@inheritDoc} */ - @Override public String name() { - return "TEST_PLUGIN"; - } - - /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { - registry.registerExtension(MessageFactoryProvider.class, new MessageFactoryProvider() { - @Override public void registerAll(MessageFactory factory) { - factory.register(DIRECT_TYPE, GridCommunicationSendTestMessage::new, - new GridCommunicationSendTestMessageSerializer()); - factory.register(DIRECT_TYPE_OVER_BYTE, GridCommunicationSendOverByteIdTestMessage::new, - new GridCommunicationSendOverByteIdTestMessageSerializer()); - } - }); - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java index 03e998dd7b47d..c9cb02e5a9fca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.managers.communication; +import java.util.Map; import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; -import org.apache.ignite.plugin.PluginContext; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -43,7 +40,7 @@ public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setPluginProviders(new TestPluginProvider()); + cfg.setPluginProviders(new MessagesPluginProvider(Map.of(MSG_DIRECT_TYPE, MessageDirectTypeIdConflictTestMessage.class))); return cfg; } @@ -73,21 +70,4 @@ public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exceptio "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE); } - /** */ - public static class TestPluginProvider extends AbstractTestPluginProvider { - /** {@inheritDoc} */ - @Override public String name() { - return "TEST_PLUGIN"; - } - - /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { - registry.registerExtension(MessageFactoryProvider.class, new MessageFactoryProvider() { - @Override public void registerAll(MessageFactory factory) { - factory.register(MSG_DIRECT_TYPE, MessageDirectTypeIdConflictTestMessage::new, - new MessageDirectTypeIdConflictTestMessageSerializer()); - } - }); - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java index 5134383656e71..791e9f413fdbe 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java @@ -17,6 +17,8 @@ package org.apache.ignite.spi; +import java.util.HashMap; +import java.util.Map; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.util.typedef.internal.U; @@ -38,22 +40,25 @@ public class MessagesPluginProvider extends AbstractTestPluginProvider { /** */ @SafeVarargs public MessagesPluginProvider(Class... msgs) { + this(registrations(msgs)); + } + + /** */ + public MessagesPluginProvider(Map> msgs) { msgFactoryProvider = f -> { - short directType = 10_000; + for (Map.Entry> msg : msgs.entrySet()) { + Class msgCls = msg.getValue(); - for (Class msg : msgs) { Supplier msgSupp = () -> { try { - return U.newInstance(msg); + return U.newInstance(msgCls); } catch (IgniteCheckedException e) { throw new RuntimeException(e); } }; - f.register(directType, msgSupp, loadSerializer(msg)); - - directType++; + f.register(msg.getKey(), msgSupp, loadSerializer(msgCls)); } }; } @@ -89,4 +94,20 @@ private MessageSerializer loadSerializer(Class> registrations(Class... msgs) { + Map> regs = new HashMap<>(); + + short directType = 10_000; + + for (Class msg : msgs) + regs.put(directType++, msg); + + return regs; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java index 9cedf1035bfa3..8cdd7aa19d2bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java @@ -370,7 +370,12 @@ public void testAcquiringThreadsCntMetric() throws Exception { } }); - IgniteInternalFuture loadFut = runLoad(ldr, runFlag, CommunicationConnectionPoolMetricsTestMessage::new, null); + IgniteInternalFuture loadFut = runLoad( + ldr, + runFlag, + () -> new CommunicationConnectionPoolMetricsTestMessage((int)maxConnIdleTimeout * 3), + null + ); monFut.get(getTestTimeout()); From 660adc2a25e33530b71926a745100bca42abcffa Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Thu, 4 Jun 2026 23:54:10 +0300 Subject: [PATCH 3/3] IGNITE-28057 Fix load test message provider direct type --- .../LoadTestsCommunicationPluginProvider.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/LoadTestsCommunicationPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/LoadTestsCommunicationPluginProvider.java index 3aa249737ef83..6aa8bf38b3daa 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/LoadTestsCommunicationPluginProvider.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/LoadTestsCommunicationPluginProvider.java @@ -17,23 +17,17 @@ package org.apache.ignite.loadtests.communication; -import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; -import org.apache.ignite.plugin.PluginContext; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.MessagesPluginProvider; /** Registers communication messages used by load tests. */ -public class LoadTestsCommunicationPluginProvider extends AbstractTestPluginProvider { - /** {@inheritDoc} */ - @Override public String name() { - return "LOAD_TESTS_COMMUNICATION_PLUGIN"; +public class LoadTestsCommunicationPluginProvider extends MessagesPluginProvider { + /** */ + public LoadTestsCommunicationPluginProvider() { + super(GridTestMessage.class); } /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { - registry.registerExtension( - MessageFactoryProvider.class, - f -> f.register(10_000, GridTestMessage::new, new GridTestMessageSerializer()) - ); + @Override public String name() { + return "LOAD_TESTS_COMMUNICATION_PLUGIN"; } }