diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md
index 19b5398d7261..13c2b3c2b539 100644
--- a/sdk/spring/CHANGELOG.md
+++ b/sdk/spring/CHANGELOG.md
@@ -9,6 +9,17 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
- Fixed JDBC/Azure Database and Redis passwordless connection scope defaulting using the wrong `azure.scopes` value for Azure China and Azure US Government when `spring.cloud.azure.profile.cloud-type` is set to `azure_china` or `azure_us_government`. The scopes are now correctly derived from the merged cloud type. ([#47096](https://github.com/Azure/azure-sdk-for-java/issues/47096))
+### Spring Cloud Azure Stream Binder Service Bus
+This section includes changes in `spring-cloud-azure-stream-binder-servicebus` module.
+
+#### Features Added
+
+- Add support for Spring Cloud Stream consumer retry properties (`maxAttempts`, `backOffInitialInterval`,
+ `backOffMaxInterval`, `backOffMultiplier`) to enable retry with exponential backoff for message processing
+ failures. [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135).
+- Add support for injecting a custom `RetryTemplate` from Spring context for advanced retry scenarios.
+ [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135).
+
## 7.2.0 (2026-04-17)
- This release is compatible with Spring Boot 4.0.0-4.0.5. (Note: 4.0.x (x>5) should be supported, but they aren't tested with this release.)
- This release is compatible with Spring Cloud 2025.1.0-2025.1.1. (Note: 2025.1.x (x>1) should be supported, but they aren't tested with this release.)
diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml
index 15a232e6631c..cd90f2a4611f 100644
--- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml
+++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml
@@ -59,6 +59,11 @@
4.0.5
true
+
+ org.springframework.retry
+ spring-retry
+ 2.0.12
+
@@ -156,6 +161,7 @@
org.springframework.boot:spring-boot-starter-actuator:[4.0.5]
+ org.springframework.retry:spring-retry:[2.0.12]
diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java
index b2e84d4e8ecb..e25751160363 100644
--- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java
+++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java
@@ -53,6 +53,9 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
+import org.springframework.retry.backoff.ExponentialBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -78,7 +81,6 @@ public class ServiceBusMessageChannelBinder extends
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusMessageChannelBinder.class);
private static final DefaultErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = new DefaultErrorMessageStrategy();
private static final String EXCEPTION_MESSAGE = "exception-message";
-
private ServiceBusExtendedBindingProperties bindingProperties = new ServiceBusExtendedBindingProperties();
private NamespaceProperties namespaceProperties;
private ServiceBusTemplate serviceBusTemplate;
@@ -91,6 +93,7 @@ public class ServiceBusMessageChannelBinder extends
private final List producerFactoryCustomizers = new ArrayList<>();
private final List processorFactoryCustomizers = new ArrayList<>();
+ private RetryTemplate retryTemplate;
/**
* Construct a {@link ServiceBusMessageChannelBinder} with the specified headersToEmbed and {@link ServiceBusChannelProvisioner}.
@@ -148,6 +151,16 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
inboundAdapter.setInstrumentationId(instrumentationId);
inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
inboundAdapter.setMessageConverter(messageConverter);
+
+ // Configure retry only when retry is enabled by the consumer properties.
+ if (shouldConfigureRetry(properties)) {
+ // Once retry is enabled, use the injected RetryTemplate if available; otherwise create one from properties.
+ RetryTemplate retryTemplateToUse = this.retryTemplate != null
+ ? this.retryTemplate
+ : createRetryTemplate(properties);
+ inboundAdapter.setRetryTemplate(retryTemplateToUse);
+ }
+
return inboundAdapter;
}
@@ -377,4 +390,63 @@ public void addProcessorFactoryCustomizer(ServiceBusProcessorFactoryCustomizer p
}
}
+ /**
+ * Set a custom retry template for message processing retries.
+ * If not set, a retry template will be created automatically based on consumer properties when maxAttempts > 1.
+ *
+ * @param retryTemplate the retry template to use
+ */
+ public void setRetryTemplate(RetryTemplate retryTemplate) {
+ this.retryTemplate = retryTemplate;
+ }
+
+ /**
+ * Get the retry template configured for this binder.
+ *
+ * @return the retry template, or {@code null} if none has been set
+ */
+ public RetryTemplate getRetryTemplate() {
+ return this.retryTemplate;
+ }
+
+ private boolean shouldConfigureRetry(ExtendedConsumerProperties properties) {
+ return properties.getMaxAttempts() > 1;
+ }
+
+ /**
+ * Create a RetryTemplate based on the consumer properties.
+ *
+ * @param properties the extended consumer properties
+ * @return the configured RetryTemplate
+ */
+ private RetryTemplate createRetryTemplate(ExtendedConsumerProperties properties) {
+ RetryTemplate retryTemplate = new RetryTemplate();
+
+ // Configure retry policy
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(properties.getMaxAttempts());
+ retryTemplate.setRetryPolicy(retryPolicy);
+
+ // Configure backoff policy
+ retryTemplate.setBackOffPolicy(createExponentialBackOffPolicy(properties));
+
+ return retryTemplate;
+ }
+
+ /**
+ * Create an {@link ExponentialBackOffPolicy} from the consumer properties.
+ * Package-private to allow direct verification in tests without reflective access to RetryTemplate internals.
+ *
+ * @param properties the extended consumer properties
+ * @return the configured ExponentialBackOffPolicy
+ */
+ ExponentialBackOffPolicy createExponentialBackOffPolicy(
+ ExtendedConsumerProperties properties) {
+ ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
+ backOffPolicy.setInitialInterval(properties.getBackOffInitialInterval());
+ backOffPolicy.setMultiplier(properties.getBackOffMultiplier());
+ backOffPolicy.setMaxInterval(properties.getBackOffMaxInterval());
+ return backOffPolicy;
+ }
+
}
diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java
index 015d30383c1d..da543fb76938 100644
--- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java
+++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java
@@ -37,6 +37,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
+import org.springframework.retry.support.RetryTemplate;
import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME;
@@ -96,6 +97,7 @@ ServiceBusChannelProvisioner serviceBusChannelProvisioner() {
* @param messageConverter the message converter.
* @param producerFactoryCustomizers customizers to customize producer factories.
* @param processorFactoryCustomizers customizers to customize processor factories.
+ * @param retryTemplate optional custom retry template for message processing retries; must be a bean named {@code serviceBusRetryTemplate}.
*
* @return the {@link ServiceBusMessageChannelBinder} bean.
*/
@@ -106,7 +108,8 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha
ObjectProvider namespaceProperties,
ObjectProvider messageConverter,
ObjectProvider producerFactoryCustomizers,
- ObjectProvider processorFactoryCustomizers) {
+ ObjectProvider processorFactoryCustomizers,
+ @Qualifier("serviceBusRetryTemplate") ObjectProvider retryTemplate) {
ServiceBusMessageChannelBinder binder = new ServiceBusMessageChannelBinder(null, channelProvisioner);
binder.setBindingProperties(bindingProperties);
@@ -114,6 +117,7 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha
binder.setMessageConverter(messageConverter.getIfAvailable());
producerFactoryCustomizers.orderedStream().forEach(binder::addProducerFactoryCustomizer);
processorFactoryCustomizers.orderedStream().forEach(binder::addProcessorFactoryCustomizer);
+ retryTemplate.ifAvailable(binder::setRetryTemplate);
return binder;
}
diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java
new file mode 100644
index 000000000000..7de33a40cda8
--- /dev/null
+++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusRetryTest.java
@@ -0,0 +1,207 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.cloud.stream.binder.servicebus.implementation;
+
+import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType;
+import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner;
+import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusBindingProperties;
+import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties;
+import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties;
+import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.retry.backoff.ExponentialBackOffPolicy;
+import org.springframework.retry.support.RetryTemplate;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for retry functionality in ServiceBusMessageChannelBinder.
+ */
+@ExtendWith(MockitoExtension.class)
+class ServiceBusRetryTest {
+
+ @Mock
+ private ConsumerDestination consumerDestination;
+
+ private final ServiceBusExtendedBindingProperties extendedBindingProperties =
+ new ServiceBusExtendedBindingProperties();
+
+ private ExtendedConsumerProperties consumerProperties;
+
+ private final ServiceBusConsumerProperties serviceBusConsumerProperties = new ServiceBusConsumerProperties();
+
+ private ServiceBusMessageChannelTestBinder binder;
+
+ private GenericApplicationContext applicationContext;
+
+ private static final String ENTITY_NAME = "test-entity";
+ private static final String GROUP = "test";
+ private static final String NAMESPACE_NAME = "test-namespace";
+
+ @BeforeEach
+ void init() {
+ binder = new ServiceBusMessageChannelTestBinder(
+ BinderHeaders.STANDARD_HEADERS, new ServiceBusChannelProvisioner());
+ applicationContext = new GenericApplicationContext();
+ applicationContext.refresh();
+ binder.setApplicationContext(applicationContext);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (applicationContext != null) {
+ applicationContext.close();
+ }
+ }
+
+ @Test
+ void testRetryTemplateConfiguredWhenMaxAttemptsGreaterThanOne() {
+ // Arrange
+ prepareConsumerProperties();
+ consumerProperties.setMaxAttempts(3);
+ consumerProperties.setBackOffInitialInterval(1000);
+ consumerProperties.setBackOffMultiplier(2.0);
+ consumerProperties.setBackOffMaxInterval(5000);
+ when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
+
+ // Act
+ MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
+
+ // Assert
+ assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
+ ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
+ RetryTemplate retryTemplate = adapter.getRetryTemplate();
+ assertThat(retryTemplate).isNotNull();
+
+ // Verify maxAttempts=3 by executing the template and counting actual attempts
+ AtomicInteger callCount = new AtomicInteger(0);
+ assertThatThrownBy(() -> retryTemplate.execute(ctx -> {
+ callCount.incrementAndGet();
+ throw new RuntimeException("test");
+ })).isInstanceOf(RuntimeException.class);
+ assertThat(callCount.get()).isEqualTo(3);
+
+ // Verify backoff policy configuration via the binder's factory method (no reflection needed)
+ ExponentialBackOffPolicy backOffPolicy = binder.createExponentialBackOffPolicy(consumerProperties);
+ assertThat(backOffPolicy.getInitialInterval()).isEqualTo(1000L);
+ assertThat(backOffPolicy.getMultiplier()).isEqualTo(2.0);
+ assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000L);
+ }
+
+ @Test
+ void testRetryTemplateNotConfiguredWhenMaxAttemptsIsOne() {
+ // Arrange
+ prepareConsumerProperties();
+ consumerProperties.setMaxAttempts(1);
+ when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
+
+ // Act
+ MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
+
+ // Assert
+ assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
+ assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull();
+ }
+
+ @Test
+ void testRetryTemplateConfiguredWithDefaultSettings() {
+ // Arrange
+ prepareConsumerProperties();
+ // Spring Cloud Stream default maxAttempts is 3 (> 1), so a RetryTemplate should be created.
+ when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
+
+ // Act
+ MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
+
+ // Assert
+ assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
+ ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
+ RetryTemplate retryTemplate = adapter.getRetryTemplate();
+ assertThat(retryTemplate).isNotNull();
+
+ // Verify maxAttempts matches Spring Cloud Stream's default via observable behavior
+ int expectedMaxAttempts = new ExtendedConsumerProperties<>(new ServiceBusConsumerProperties()).getMaxAttempts();
+ AtomicInteger callCount = new AtomicInteger(0);
+ assertThatThrownBy(() -> retryTemplate.execute(ctx -> {
+ callCount.incrementAndGet();
+ throw new RuntimeException("test");
+ })).isInstanceOf(RuntimeException.class);
+ assertThat(callCount.get()).isEqualTo(expectedMaxAttempts);
+ }
+
+ @Test
+ void testCustomRetryTemplateIsUsed() {
+ // Arrange
+ prepareConsumerProperties();
+ consumerProperties.setMaxAttempts(3);
+ when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
+
+ // Create a custom RetryTemplate
+ RetryTemplate customRetryTemplate = new RetryTemplate();
+ binder.setRetryTemplate(customRetryTemplate);
+
+ // Act
+ MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
+
+ // Assert
+ assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
+ ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
+ assertThat(adapter.getRetryTemplate()).isNotNull();
+ assertThat(adapter.getRetryTemplate()).isSameAs(customRetryTemplate);
+ }
+
+ @Test
+ void testCustomRetryTemplateNotAppliedWhenMaxAttemptsIsOne() {
+ // Arrange: maxAttempts=1 disables retry even when a custom RetryTemplate bean is injected
+ prepareConsumerProperties();
+ consumerProperties.setMaxAttempts(1);
+ when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
+
+ RetryTemplate customRetryTemplate = new RetryTemplate();
+ binder.setRetryTemplate(customRetryTemplate);
+
+ // Act
+ MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
+
+ // Assert
+ assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
+ assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull();
+ }
+
+ private void prepareConsumerProperties() {
+ serviceBusConsumerProperties.setEntityName(ENTITY_NAME);
+ serviceBusConsumerProperties.setSubscriptionName(GROUP);
+ serviceBusConsumerProperties.setEntityType(ServiceBusEntityType.TOPIC);
+ serviceBusConsumerProperties.setNamespace(NAMESPACE_NAME);
+ serviceBusConsumerProperties.getRetry().setTryTimeout(Duration.ofMinutes(5));
+ serviceBusConsumerProperties.setAutoComplete(false);
+ ServiceBusBindingProperties bindingProperties = new ServiceBusBindingProperties();
+ bindingProperties.setConsumer(serviceBusConsumerProperties);
+
+ Map bindings = new HashMap<>();
+ bindings.put(ENTITY_NAME, bindingProperties);
+ extendedBindingProperties.setBindings(bindings);
+ binder.setBindingProperties(extendedBindingProperties);
+
+ consumerProperties = new ExtendedConsumerProperties<>(serviceBusConsumerProperties);
+ consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
+ }
+}
diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java
index 1510974d625b..5c074d0dd9e6 100644
--- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java
+++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java
@@ -30,6 +30,7 @@
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.util.ReflectionTestUtils;
import java.lang.reflect.Field;
@@ -151,6 +152,19 @@ void testExtendedBindingPropertiesShouldBind() {
});
}
+ @Test
+ void retryTemplateShouldBeWiredWhenBeanProvided() {
+ this.contextRunner
+ .withBean("serviceBusRetryTemplate", RetryTemplate.class, RetryTemplate::new)
+ .run(context -> {
+ assertThat(context).hasSingleBean(ServiceBusMessageChannelBinder.class);
+ assertThat(context).hasSingleBean(RetryTemplate.class);
+ RetryTemplate retryTemplate = context.getBean(RetryTemplate.class);
+ ServiceBusMessageChannelBinder binder = context.getBean(ServiceBusMessageChannelBinder.class);
+ assertThat(binder.getRetryTemplate()).isSameAs(retryTemplate);
+ });
+ }
+
@Test
void clientMessageConverterShouldBeConfigured() {
this.contextRunner
diff --git a/sdk/spring/spring-integration-azure-servicebus/pom.xml b/sdk/spring/spring-integration-azure-servicebus/pom.xml
index 91b74bf51501..4f8aa93e8f02 100644
--- a/sdk/spring/spring-integration-azure-servicebus/pom.xml
+++ b/sdk/spring/spring-integration-azure-servicebus/pom.xml
@@ -48,6 +48,11 @@
spring-messaging-azure-servicebus
7.3.0-beta.1
+
+ org.springframework.retry
+ spring-retry
+ 2.0.12
+
@@ -148,6 +153,20 @@
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.6.2
+
+
+
+
+ org.springframework.retry:spring-retry:[2.0.12]
+
+
+
+
+
diff --git a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java
index b803c45518f8..d2a71d0897b5 100644
--- a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java
+++ b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java
@@ -23,7 +23,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
+import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
@@ -81,6 +83,7 @@ public class ServiceBusInboundChannelAdapter extends MessageProducerSupport {
private String instrumentationId;
private final boolean isAutoComplete;
private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s";
+ private volatile RetryTemplate retryTemplate;
/**
* Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}.
@@ -153,7 +156,38 @@ public void setInstrumentationManager(InstrumentationManager instrumentationMana
*/
public void setInstrumentationId(String instrumentationId) {
this.instrumentationId = instrumentationId;
+ }
+ /**
+ * Set retry template for message processing retries.
+ *
+ * @param retryTemplate the retry template
+ */
+ public void setRetryTemplate(RetryTemplate retryTemplate) {
+ this.retryTemplate = retryTemplate;
+ }
+
+ /**
+ * Get the retry template configured on this adapter, or {@code null} if retry is not enabled.
+ *
+ * @return the retry template, or {@code null}
+ */
+ public RetryTemplate getRetryTemplate() {
+ return this.retryTemplate;
+ }
+
+ /**
+ * Sends the message directly to the output channel without routing exceptions to the error channel.
+ * This is used inside the retry template so that exceptions propagate back to the retry logic.
+ * Uses the adapter's configured sendTimeout (via MessagingTemplate) to match the non-retry path.
+ * The caller is responsible for routing to the error channel after retries are exhausted.
+ *
+ * @param message the message to send
+ */
+ private void sendMessageDirectly(Message> message) {
+ MessageChannel outputCh = getOutputChannel();
+ Assert.notNull(outputCh, "Output channel must not be null");
+ getMessagingTemplate().send(outputCh, message);
}
private class IntegrationErrorHandler implements ServiceBusErrorHandler {
@@ -199,7 +233,25 @@ public void onMessage(ServiceBusReceivedMessageContext messageContext) {
Message> message = getMessageConverter().toMessage(messageContext.getMessage(), new MessageHeaders(headers),
payloadType);
- sendMessage(message);
+
+ RetryTemplate localRetryTemplate = retryTemplate;
+ if (localRetryTemplate != null) {
+ try {
+ localRetryTemplate.execute(context -> {
+ // Bypass sendMessage()'s error-channel routing so exceptions propagate
+ // back to the retry template for retry. After all retries are exhausted
+ // the catch block routes to the error channel.
+ sendMessageDirectly(message);
+ return null;
+ });
+ } catch (RuntimeException e) {
+ if (!sendErrorMessageIfNecessary(message, e)) {
+ throw e;
+ }
+ }
+ } else {
+ sendMessage(message);
+ }
}
}
diff --git a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java
index bb72a17d3a18..6b266af3788b 100644
--- a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java
+++ b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java
@@ -31,6 +31,9 @@
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
+import org.springframework.retry.backoff.NoBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
import java.time.Duration;
import java.util.Arrays;
@@ -211,4 +214,191 @@ void instrumentationErrorHandler() {
}
+ @Test
+ void retryTemplateRetriesMessageOnFailure() throws InterruptedException {
+ ServiceBusMessageListenerContainer listenerContainer =
+ new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
+ ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
+
+ // Configure retry: maxAttempts=3, no backoff (for test speed)
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(3);
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(retryPolicy);
+ retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
+ channelAdapter.setRetryTemplate(retryTemplate);
+
+ DirectChannel channel = new DirectChannel();
+ channel.setBeanName("output");
+
+ final int[] attemptCount = {0};
+ final CountDownLatch successLatch = new CountDownLatch(1);
+ channel.subscribe(message -> {
+ attemptCount[0]++;
+ if (attemptCount[0] < 3) {
+ throw new RuntimeException("Simulated failure on attempt " + attemptCount[0]);
+ }
+ successLatch.countDown();
+ });
+
+ channelAdapter.setOutputChannel(channel);
+ channelAdapter.onInit();
+ channelAdapter.doStart();
+
+ MessageListener> messageListener = listenerContainer.getContainerProperties().getMessageListener();
+ assertTrue(messageListener instanceof ServiceBusRecordMessageListener);
+
+ ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class);
+ ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class);
+ when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload"));
+ when(mockContext.getMessage()).thenReturn(mockMessage);
+
+ ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext);
+
+ assertTrue(successLatch.await(5L, TimeUnit.SECONDS), "Message should have been delivered after retries");
+ assertEquals(3, attemptCount[0], "Message should have been attempted exactly 3 times");
+ }
+
+ @Test
+ void retryTemplateWorksWithErrorChannelConfigured() throws InterruptedException {
+ ServiceBusMessageListenerContainer listenerContainer =
+ new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
+ ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
+
+ // Configure retry: maxAttempts=3, no backoff (for test speed)
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(3);
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(retryPolicy);
+ retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
+ channelAdapter.setRetryTemplate(retryTemplate);
+
+ DirectChannel channel = new DirectChannel();
+ channel.setBeanName("output");
+
+ // Handler fails first 2 attempts, succeeds on 3rd
+ final int[] attemptCount = {0};
+ final CountDownLatch successLatch = new CountDownLatch(1);
+ channel.subscribe(message -> {
+ attemptCount[0]++;
+ if (attemptCount[0] < 3) {
+ throw new RuntimeException("Simulated failure on attempt " + attemptCount[0]);
+ }
+ successLatch.countDown();
+ });
+
+ // Set an error channel — in the binder flow the adapter always has one configured
+ DirectChannel errorCh = new DirectChannel();
+ List> errorMessages = new CopyOnWriteArrayList<>();
+ errorCh.subscribe(msg -> errorMessages.add(msg));
+
+ channelAdapter.setOutputChannel(channel);
+ channelAdapter.setErrorChannel(errorCh);
+ channelAdapter.onInit();
+ channelAdapter.doStart();
+
+ MessageListener> messageListener = listenerContainer.getContainerProperties().getMessageListener();
+ assertTrue(messageListener instanceof ServiceBusRecordMessageListener);
+
+ ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class);
+ ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class);
+ when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload"));
+ when(mockContext.getMessage()).thenReturn(mockMessage);
+
+ ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext);
+
+ assertTrue(successLatch.await(5L, TimeUnit.SECONDS), "Message should have been delivered after retries");
+ assertEquals(3, attemptCount[0], "Message should have been attempted exactly 3 times");
+ assertTrue(errorMessages.isEmpty(), "No error message should be sent to error channel when retries succeed");
+ }
+
+ @Test
+ void retryTemplateExhaustedWithErrorChannelRoutesToErrorChannel() throws InterruptedException {
+ ServiceBusMessageListenerContainer listenerContainer =
+ new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
+ ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
+
+ // Configure retry: maxAttempts=2, no backoff
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(2);
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(retryPolicy);
+ retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
+ channelAdapter.setRetryTemplate(retryTemplate);
+
+ DirectChannel channel = new DirectChannel();
+ channel.setBeanName("output");
+ // Handler always fails
+ channel.subscribe(message -> {
+ throw new RuntimeException("Always fails");
+ });
+
+ // Wire error channel
+ DirectChannel errorCh = new DirectChannel();
+ List> errorMessages = new CopyOnWriteArrayList<>();
+ CountDownLatch errorLatch = new CountDownLatch(1);
+ errorCh.subscribe(msg -> {
+ errorMessages.add(msg);
+ errorLatch.countDown();
+ });
+
+ channelAdapter.setOutputChannel(channel);
+ channelAdapter.setErrorChannel(errorCh);
+ channelAdapter.onInit();
+ channelAdapter.doStart();
+
+ MessageListener> messageListener = listenerContainer.getContainerProperties().getMessageListener();
+ assertTrue(messageListener instanceof ServiceBusRecordMessageListener);
+
+ ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class);
+ ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class);
+ when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload"));
+ when(mockContext.getMessage()).thenReturn(mockMessage);
+
+ ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext);
+
+ assertTrue(errorLatch.await(5L, TimeUnit.SECONDS),
+ "One error message should be routed to the error channel after retries exhausted");
+ assertEquals(1, errorMessages.size(), "Exactly one error message should reach the error channel");
+ }
+
+ @Test
+ void retryTemplateExhaustedWithoutErrorChannelRethrowsException() {
+ ServiceBusMessageListenerContainer listenerContainer =
+ new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
+ ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
+
+ // Configure retry: maxAttempts=2, no backoff
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(2);
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(retryPolicy);
+ retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
+ channelAdapter.setRetryTemplate(retryTemplate);
+
+ DirectChannel channel = new DirectChannel();
+ channel.setBeanName("output");
+ // Handler always fails
+ channel.subscribe(message -> {
+ throw new RuntimeException("Always fails");
+ });
+
+ channelAdapter.setOutputChannel(channel);
+ // No error channel configured
+ channelAdapter.onInit();
+ channelAdapter.doStart();
+
+ MessageListener> messageListener = listenerContainer.getContainerProperties().getMessageListener();
+ assertTrue(messageListener instanceof ServiceBusRecordMessageListener);
+
+ ServiceBusReceivedMessageContext mockContext = mock(ServiceBusReceivedMessageContext.class);
+ ServiceBusReceivedMessage mockMessage = mock(ServiceBusReceivedMessage.class);
+ when(mockMessage.getBody()).thenReturn(BinaryData.fromString("test-payload"));
+ when(mockContext.getMessage()).thenReturn(mockMessage);
+
+ // Without error channel the exception must propagate to the caller
+ assertThrows(RuntimeException.class,
+ () -> ((ServiceBusRecordMessageListener) messageListener).onMessage(mockContext));
+ }
+
}