diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index 19b5398d7261..13c2b3c2b539 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -9,6 +9,17 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module. - Fixed JDBC/Azure Database and Redis passwordless connection scope defaulting using the wrong `azure.scopes` value for Azure China and Azure US Government when `spring.cloud.azure.profile.cloud-type` is set to `azure_china` or `azure_us_government`. The scopes are now correctly derived from the merged cloud type. ([#47096](https://github.com/Azure/azure-sdk-for-java/issues/47096)) +### Spring Cloud Azure Stream Binder Service Bus +This section includes changes in `spring-cloud-azure-stream-binder-servicebus` module. + +#### Features Added + +- Add support for Spring Cloud Stream consumer retry properties (`maxAttempts`, `backOffInitialInterval`, + `backOffMaxInterval`, `backOffMultiplier`) to enable retry with exponential backoff for message processing + failures. [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135). +- Add support for injecting a custom `RetryTemplate` from Spring context for advanced retry scenarios. + [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135). + ## 7.2.0 (2026-04-17) - This release is compatible with Spring Boot 4.0.0-4.0.5. (Note: 4.0.x (x>5) should be supported, but they aren't tested with this release.) - This release is compatible with Spring Cloud 2025.1.0-2025.1.1. (Note: 2025.1.x (x>1) should be supported, but they aren't tested with this release.) diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml index 15a232e6631c..cd90f2a4611f 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml @@ -59,6 +59,11 @@ 4.0.5 true + + org.springframework.retry + spring-retry + 2.0.12 + @@ -156,6 +161,7 @@ org.springframework.boot:spring-boot-starter-actuator:[4.0.5] + org.springframework.retry:spring-retry:[2.0.12] diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java index b2e84d4e8ecb..e25751160363 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java @@ -53,6 +53,9 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -78,7 +81,6 @@ public class ServiceBusMessageChannelBinder extends private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusMessageChannelBinder.class); private static final DefaultErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = new DefaultErrorMessageStrategy(); private static final String EXCEPTION_MESSAGE = "exception-message"; - private ServiceBusExtendedBindingProperties bindingProperties = new ServiceBusExtendedBindingProperties(); private NamespaceProperties namespaceProperties; private ServiceBusTemplate serviceBusTemplate; @@ -91,6 +93,7 @@ public class ServiceBusMessageChannelBinder extends private final List producerFactoryCustomizers = new ArrayList<>(); private final List processorFactoryCustomizers = new ArrayList<>(); + private RetryTemplate retryTemplate; /** * Construct a {@link ServiceBusMessageChannelBinder} with the specified headersToEmbed and {@link ServiceBusChannelProvisioner}. @@ -148,6 +151,16 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination inboundAdapter.setInstrumentationId(instrumentationId); inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); inboundAdapter.setMessageConverter(messageConverter); + + // Configure retry only when retry is enabled by the consumer properties. + if (shouldConfigureRetry(properties)) { + // Once retry is enabled, use the injected RetryTemplate if available; otherwise create one from properties. + RetryTemplate retryTemplateToUse = this.retryTemplate != null + ? this.retryTemplate + : createRetryTemplate(properties); + inboundAdapter.setRetryTemplate(retryTemplateToUse); + } + return inboundAdapter; } @@ -377,4 +390,63 @@ public void addProcessorFactoryCustomizer(ServiceBusProcessorFactoryCustomizer p } } + /** + * Set a custom retry template for message processing retries. + * If not set, a retry template will be created automatically based on consumer properties when maxAttempts > 1. + * + * @param retryTemplate the retry template to use + */ + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + /** + * Get the retry template configured for this binder. + * + * @return the retry template, or {@code null} if none has been set + */ + public RetryTemplate getRetryTemplate() { + return this.retryTemplate; + } + + private boolean shouldConfigureRetry(ExtendedConsumerProperties properties) { + return properties.getMaxAttempts() > 1; + } + + /** + * Create a RetryTemplate based on the consumer properties. + * + * @param properties the extended consumer properties + * @return the configured RetryTemplate + */ + private RetryTemplate createRetryTemplate(ExtendedConsumerProperties properties) { + RetryTemplate retryTemplate = new RetryTemplate(); + + // Configure retry policy + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(properties.getMaxAttempts()); + retryTemplate.setRetryPolicy(retryPolicy); + + // Configure backoff policy + retryTemplate.setBackOffPolicy(createExponentialBackOffPolicy(properties)); + + return retryTemplate; + } + + /** + * Create an {@link ExponentialBackOffPolicy} from the consumer properties. + * Package-private to allow direct verification in tests without reflective access to RetryTemplate internals. + * + * @param properties the extended consumer properties + * @return the configured ExponentialBackOffPolicy + */ + ExponentialBackOffPolicy createExponentialBackOffPolicy( + ExtendedConsumerProperties properties) { + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(properties.getBackOffInitialInterval()); + backOffPolicy.setMultiplier(properties.getBackOffMultiplier()); + backOffPolicy.setMaxInterval(properties.getBackOffMaxInterval()); + return backOffPolicy; + } + } diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java index 015d30383c1d..da543fb76938 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java @@ -37,6 +37,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.retry.support.RetryTemplate; import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME; @@ -96,6 +97,7 @@ ServiceBusChannelProvisioner serviceBusChannelProvisioner() { * @param messageConverter the message converter. * @param producerFactoryCustomizers customizers to customize producer factories. * @param processorFactoryCustomizers customizers to customize processor factories. + * @param retryTemplate optional custom retry template for message processing retries; must be a bean named {@code serviceBusRetryTemplate}. * * @return the {@link ServiceBusMessageChannelBinder} bean. */ @@ -106,7 +108,8 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha ObjectProvider namespaceProperties, ObjectProvider messageConverter, ObjectProvider producerFactoryCustomizers, - ObjectProvider processorFactoryCustomizers) { + ObjectProvider processorFactoryCustomizers, + @Qualifier("serviceBusRetryTemplate") ObjectProvider retryTemplate) { ServiceBusMessageChannelBinder binder = new ServiceBusMessageChannelBinder(null, channelProvisioner); binder.setBindingProperties(bindingProperties); @@ -114,6 +117,7 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha binder.setMessageConverter(messageConverter.getIfAvailable()); producerFactoryCustomizers.orderedStream().forEach(binder::addProducerFactoryCustomizer); processorFactoryCustomizers.orderedStream().forEach(binder::addProcessorFactoryCustomizer); + retryTemplate.ifAvailable(binder::setRetryTemplate); return binder; } diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java new file mode 100644 index 000000000000..7de33a40cda8 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java @@ -0,0 +1,207 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.stream.binder.servicebus.implementation; + +import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; +import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner; +import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusBindingProperties; +import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties; +import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties; +import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.HeaderMode; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.integration.core.MessageProducer; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.when; + +/** + * Tests for retry functionality in ServiceBusMessageChannelBinder. + */ +@ExtendWith(MockitoExtension.class) +class ServiceBusRetryTest { + + @Mock + private ConsumerDestination consumerDestination; + + private final ServiceBusExtendedBindingProperties extendedBindingProperties = + new ServiceBusExtendedBindingProperties(); + + private ExtendedConsumerProperties consumerProperties; + + private final ServiceBusConsumerProperties serviceBusConsumerProperties = new ServiceBusConsumerProperties(); + + private ServiceBusMessageChannelTestBinder binder; + + private GenericApplicationContext applicationContext; + + private static final String ENTITY_NAME = "test-entity"; + private static final String GROUP = "test"; + private static final String NAMESPACE_NAME = "test-namespace"; + + @BeforeEach + void init() { + binder = new ServiceBusMessageChannelTestBinder( + BinderHeaders.STANDARD_HEADERS, new ServiceBusChannelProvisioner()); + applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + } + + @AfterEach + void tearDown() { + if (applicationContext != null) { + applicationContext.close(); + } + } + + @Test + void testRetryTemplateConfiguredWhenMaxAttemptsGreaterThanOne() { + // Arrange + prepareConsumerProperties(); + consumerProperties.setMaxAttempts(3); + consumerProperties.setBackOffInitialInterval(1000); + consumerProperties.setBackOffMultiplier(2.0); + consumerProperties.setBackOffMaxInterval(5000); + when(consumerDestination.getName()).thenReturn(ENTITY_NAME); + + // Act + MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties); + + // Assert + assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class); + ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer; + RetryTemplate retryTemplate = adapter.getRetryTemplate(); + assertThat(retryTemplate).isNotNull(); + + // Verify maxAttempts=3 by executing the template and counting actual attempts + AtomicInteger callCount = new AtomicInteger(0); + assertThatThrownBy(() -> retryTemplate.execute(ctx -> { + callCount.incrementAndGet(); + throw new RuntimeException("test"); + })).isInstanceOf(RuntimeException.class); + assertThat(callCount.get()).isEqualTo(3); + + // Verify backoff policy configuration via the binder's factory method (no reflection needed) + ExponentialBackOffPolicy backOffPolicy = binder.createExponentialBackOffPolicy(consumerProperties); + assertThat(backOffPolicy.getInitialInterval()).isEqualTo(1000L); + assertThat(backOffPolicy.getMultiplier()).isEqualTo(2.0); + assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000L); + } + + @Test + void testRetryTemplateNotConfiguredWhenMaxAttemptsIsOne() { + // Arrange + prepareConsumerProperties(); + consumerProperties.setMaxAttempts(1); + when(consumerDestination.getName()).thenReturn(ENTITY_NAME); + + // Act + MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties); + + // Assert + assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class); + assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull(); + } + + @Test + void testRetryTemplateConfiguredWithDefaultSettings() { + // Arrange + prepareConsumerProperties(); + // Spring Cloud Stream default maxAttempts is 3 (> 1), so a RetryTemplate should be created. + when(consumerDestination.getName()).thenReturn(ENTITY_NAME); + + // Act + MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties); + + // Assert + assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class); + ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer; + RetryTemplate retryTemplate = adapter.getRetryTemplate(); + assertThat(retryTemplate).isNotNull(); + + // Verify maxAttempts matches Spring Cloud Stream's default via observable behavior + int expectedMaxAttempts = new ExtendedConsumerProperties<>(new ServiceBusConsumerProperties()).getMaxAttempts(); + AtomicInteger callCount = new AtomicInteger(0); + assertThatThrownBy(() -> retryTemplate.execute(ctx -> { + callCount.incrementAndGet(); + throw new RuntimeException("test"); + })).isInstanceOf(RuntimeException.class); + assertThat(callCount.get()).isEqualTo(expectedMaxAttempts); + } + + @Test + void testCustomRetryTemplateIsUsed() { + // Arrange + prepareConsumerProperties(); + consumerProperties.setMaxAttempts(3); + when(consumerDestination.getName()).thenReturn(ENTITY_NAME); + + // Create a custom RetryTemplate + RetryTemplate customRetryTemplate = new RetryTemplate(); + binder.setRetryTemplate(customRetryTemplate); + + // Act + MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties); + + // Assert + assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class); + ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer; + assertThat(adapter.getRetryTemplate()).isNotNull(); + assertThat(adapter.getRetryTemplate()).isSameAs(customRetryTemplate); + } + + @Test + void testCustomRetryTemplateNotAppliedWhenMaxAttemptsIsOne() { + // Arrange: maxAttempts=1 disables retry even when a custom RetryTemplate bean is injected + prepareConsumerProperties(); + consumerProperties.setMaxAttempts(1); + when(consumerDestination.getName()).thenReturn(ENTITY_NAME); + + RetryTemplate customRetryTemplate = new RetryTemplate(); + binder.setRetryTemplate(customRetryTemplate); + + // Act + MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties); + + // Assert + assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class); + assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull(); + } + + private void prepareConsumerProperties() { + serviceBusConsumerProperties.setEntityName(ENTITY_NAME); + serviceBusConsumerProperties.setSubscriptionName(GROUP); + serviceBusConsumerProperties.setEntityType(ServiceBusEntityType.TOPIC); + serviceBusConsumerProperties.setNamespace(NAMESPACE_NAME); + serviceBusConsumerProperties.getRetry().setTryTimeout(Duration.ofMinutes(5)); + serviceBusConsumerProperties.setAutoComplete(false); + ServiceBusBindingProperties bindingProperties = new ServiceBusBindingProperties(); + bindingProperties.setConsumer(serviceBusConsumerProperties); + + Map bindings = new HashMap<>(); + bindings.put(ENTITY_NAME, bindingProperties); + extendedBindingProperties.setBindings(bindings); + binder.setBindingProperties(extendedBindingProperties); + + consumerProperties = new ExtendedConsumerProperties<>(serviceBusConsumerProperties); + consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders); + } +} diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java index 1510974d625b..5c074d0dd9e6 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java @@ -30,6 +30,7 @@ import org.springframework.cloud.stream.binder.Binder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.retry.support.RetryTemplate; import org.springframework.test.util.ReflectionTestUtils; import java.lang.reflect.Field; @@ -151,6 +152,19 @@ void testExtendedBindingPropertiesShouldBind() { }); } + @Test + void retryTemplateShouldBeWiredWhenBeanProvided() { + this.contextRunner + .withBean("serviceBusRetryTemplate", RetryTemplate.class, RetryTemplate::new) + .run(context -> { + assertThat(context).hasSingleBean(ServiceBusMessageChannelBinder.class); + assertThat(context).hasSingleBean(RetryTemplate.class); + RetryTemplate retryTemplate = context.getBean(RetryTemplate.class); + ServiceBusMessageChannelBinder binder = context.getBean(ServiceBusMessageChannelBinder.class); + assertThat(binder.getRetryTemplate()).isSameAs(retryTemplate); + }); + } + @Test void clientMessageConverterShouldBeConfigured() { this.contextRunner diff --git a/sdk/spring/spring-integration-azure-servicebus/pom.xml b/sdk/spring/spring-integration-azure-servicebus/pom.xml index 91b74bf51501..4f8aa93e8f02 100644 --- a/sdk/spring/spring-integration-azure-servicebus/pom.xml +++ b/sdk/spring/spring-integration-azure-servicebus/pom.xml @@ -48,6 +48,11 @@ spring-messaging-azure-servicebus 7.3.0-beta.1 + + org.springframework.retry + spring-retry + 2.0.12 + @@ -148,6 +153,20 @@ + + org.apache.maven.plugins + maven-enforcer-plugin + 3.6.2 + + + + + org.springframework.retry:spring-retry:[2.0.12] + + + + + diff --git a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java index b803c45518f8..d2a71d0897b5 100644 --- a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java +++ b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java @@ -23,7 +23,9 @@ import org.slf4j.LoggerFactory; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; +import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; import reactor.core.publisher.Mono; @@ -81,6 +83,7 @@ public class ServiceBusInboundChannelAdapter extends MessageProducerSupport { private String instrumentationId; private final boolean isAutoComplete; private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s"; + private volatile RetryTemplate retryTemplate; /** * Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}. @@ -153,7 +156,38 @@ public void setInstrumentationManager(InstrumentationManager instrumentationMana */ public void setInstrumentationId(String instrumentationId) { this.instrumentationId = instrumentationId; + } + /** + * Set retry template for message processing retries. + * + * @param retryTemplate the retry template + */ + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + /** + * Get the retry template configured on this adapter, or {@code null} if retry is not enabled. + * + * @return the retry template, or {@code null} + */ + public RetryTemplate getRetryTemplate() { + return this.retryTemplate; + } + + /** + * Sends the message directly to the output channel without routing exceptions to the error channel. + * This is used inside the retry template so that exceptions propagate back to the retry logic. + * Uses the adapter's configured sendTimeout (via MessagingTemplate) to match the non-retry path. + * The caller is responsible for routing to the error channel after retries are exhausted. + * + * @param message the message to send + */ + private void sendMessageDirectly(Message message) { + MessageChannel outputCh = getOutputChannel(); + Assert.notNull(outputCh, "Output channel must not be null"); + getMessagingTemplate().send(outputCh, message); } private class IntegrationErrorHandler implements ServiceBusErrorHandler { @@ -199,7 +233,25 @@ public void onMessage(ServiceBusReceivedMessageContext messageContext) { Message message = getMessageConverter().toMessage(messageContext.getMessage(), new MessageHeaders(headers), payloadType); - sendMessage(message); + + RetryTemplate localRetryTemplate = retryTemplate; + if (localRetryTemplate != null) { + try { + localRetryTemplate.execute(context -> { + // Bypass sendMessage()'s error-channel routing so exceptions propagate + // back to the retry template for retry. After all retries are exhausted + // the catch block routes to the error channel. + sendMessageDirectly(message); + return null; + }); + } catch (RuntimeException e) { + if (!sendErrorMessageIfNecessary(message, e)) { + throw e; + } + } + } else { + sendMessage(message); + } } } diff --git a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java index bb72a17d3a18..6b266af3788b 100644 --- a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java +++ b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java @@ -31,6 +31,9 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; +import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; import java.time.Duration; import java.util.Arrays; @@ -211,4 +214,191 @@ void instrumentationErrorHandler() { } + @Test + void retryTemplateRetriesMessageOnFailure() throws InterruptedException { + ServiceBusMessageListenerContainer listenerContainer = + new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer); + + // Configure retry: maxAttempts=3, no backoff (for test speed) + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(3); + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(new NoBackOffPolicy()); + channelAdapter.setRetryTemplate(retryTemplate); + + DirectChannel channel = new DirectChannel(); + channel.setBeanName("output"); + + final int[] attemptCount = {0}; + final CountDownLatch successLatch = new CountDownLatch(1); + channel.subscribe(message -> { + attemptCount[0]++; + if (attemptCount[0] < 3) { + throw new RuntimeException("Simulated failure on attempt " + attemptCount[0]); + } + successLatch.countDown(); + }); + + channelAdapter.setOutputChannel(channel); + channelAdapter.onInit(); + channelAdapter.doStart(); + + MessageListener messageListener = listenerContainer.getContainerProperties().getMessageListener(); + assertTrue(messageListener instanceof ServiceBusRecordMessageListener); + + ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class); + ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class); + when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload")); + when(mockContext.getMessage()).thenReturn(mockMessage); + + ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext); + + assertTrue(successLatch.await(5L, TimeUnit.SECONDS), "Message should have been delivered after retries"); + assertEquals(3, attemptCount[0], "Message should have been attempted exactly 3 times"); + } + + @Test + void retryTemplateWorksWithErrorChannelConfigured() throws InterruptedException { + ServiceBusMessageListenerContainer listenerContainer = + new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer); + + // Configure retry: maxAttempts=3, no backoff (for test speed) + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(3); + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(new NoBackOffPolicy()); + channelAdapter.setRetryTemplate(retryTemplate); + + DirectChannel channel = new DirectChannel(); + channel.setBeanName("output"); + + // Handler fails first 2 attempts, succeeds on 3rd + final int[] attemptCount = {0}; + final CountDownLatch successLatch = new CountDownLatch(1); + channel.subscribe(message -> { + attemptCount[0]++; + if (attemptCount[0] < 3) { + throw new RuntimeException("Simulated failure on attempt " + attemptCount[0]); + } + successLatch.countDown(); + }); + + // Set an error channel — in the binder flow the adapter always has one configured + DirectChannel errorCh = new DirectChannel(); + List> errorMessages = new CopyOnWriteArrayList<>(); + errorCh.subscribe(msg -> errorMessages.add(msg)); + + channelAdapter.setOutputChannel(channel); + channelAdapter.setErrorChannel(errorCh); + channelAdapter.onInit(); + channelAdapter.doStart(); + + MessageListener messageListener = listenerContainer.getContainerProperties().getMessageListener(); + assertTrue(messageListener instanceof ServiceBusRecordMessageListener); + + ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class); + ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class); + when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload")); + when(mockContext.getMessage()).thenReturn(mockMessage); + + ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext); + + assertTrue(successLatch.await(5L, TimeUnit.SECONDS), "Message should have been delivered after retries"); + assertEquals(3, attemptCount[0], "Message should have been attempted exactly 3 times"); + assertTrue(errorMessages.isEmpty(), "No error message should be sent to error channel when retries succeed"); + } + + @Test + void retryTemplateExhaustedWithErrorChannelRoutesToErrorChannel() throws InterruptedException { + ServiceBusMessageListenerContainer listenerContainer = + new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer); + + // Configure retry: maxAttempts=2, no backoff + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(2); + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(new NoBackOffPolicy()); + channelAdapter.setRetryTemplate(retryTemplate); + + DirectChannel channel = new DirectChannel(); + channel.setBeanName("output"); + // Handler always fails + channel.subscribe(message -> { + throw new RuntimeException("Always fails"); + }); + + // Wire error channel + DirectChannel errorCh = new DirectChannel(); + List> errorMessages = new CopyOnWriteArrayList<>(); + CountDownLatch errorLatch = new CountDownLatch(1); + errorCh.subscribe(msg -> { + errorMessages.add(msg); + errorLatch.countDown(); + }); + + channelAdapter.setOutputChannel(channel); + channelAdapter.setErrorChannel(errorCh); + channelAdapter.onInit(); + channelAdapter.doStart(); + + MessageListener messageListener = listenerContainer.getContainerProperties().getMessageListener(); + assertTrue(messageListener instanceof ServiceBusRecordMessageListener); + + ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class); + ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class); + when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload")); + when(mockContext.getMessage()).thenReturn(mockMessage); + + ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext); + + assertTrue(errorLatch.await(5L, TimeUnit.SECONDS), + "One error message should be routed to the error channel after retries exhausted"); + assertEquals(1, errorMessages.size(), "Exactly one error message should reach the error channel"); + } + + @Test + void retryTemplateExhaustedWithoutErrorChannelRethrowsException() { + ServiceBusMessageListenerContainer listenerContainer = + new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer); + + // Configure retry: maxAttempts=2, no backoff + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(2); + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(new NoBackOffPolicy()); + channelAdapter.setRetryTemplate(retryTemplate); + + DirectChannel channel = new DirectChannel(); + channel.setBeanName("output"); + // Handler always fails + channel.subscribe(message -> { + throw new RuntimeException("Always fails"); + }); + + channelAdapter.setOutputChannel(channel); + // No error channel configured + channelAdapter.onInit(); + channelAdapter.doStart(); + + MessageListener messageListener = listenerContainer.getContainerProperties().getMessageListener(); + assertTrue(messageListener instanceof ServiceBusRecordMessageListener); + + ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class); + ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class); + when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload")); + when(mockContext.getMessage()).thenReturn(mockMessage); + + // Without error channel the exception must propagate to the caller + assertThrows(RuntimeException.class, + () -> ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext)); + } + }