Provided Advice Classes
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
-
RequestHandlerRetryAdvice
(described in Retry Advice) -
RequestHandlerCircuitBreakerAdvice
(described in Circuit Breaker Advice) -
ExpressionEvaluatingRequestHandlerAdvice
(described in Expression Advice) -
RateLimiterRequestHandlerAdvice
(described in Rate Limiter Advice) -
CacheRequestHandlerAdvice
(described in Caching Advice) -
ReactiveRequestHandlerAdvice
(described in Reactive Advice) -
ContextHolderRequestHandlerAdvice
(described in Context Holder Advice)
Retry Advice
The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice
) leverages the rich retry mechanisms provided by the Spring Retry project.
The core component of spring-retry
is the RetryTemplate
, which allows configuration of sophisticated retry scenarios, including RetryPolicy
and BackoffPolicy
strategies (with a number of implementations) as well as a RecoveryCallback
strategy to determine the action to take when retries are exhausted.
- Stateless Retry
-
Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action.
- Stateful Retry
-
Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator (for example,JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission.
For more information on spring-retry
, see the project’s Javadoc and the reference documentation for Spring Batch, where spring-retry
originated.
The default back off behavior is to not back off. Retries are attempted immediately. Using a back off policy that causes threads to pause between attempts may cause performance issues, including excessive memory use and thread starvation. In high-volume environments, back off policies should be used with caution. |
Configuring the Retry Advice
The examples in this section use the following <service-activator>
that always throws an exception:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- Simple Stateless Retry
-
The default
RetryTemplate
has aSimpleRetryPolicy
which tries three times. There is noBackOffPolicy
, so the three attempts are made back-to-back-to-back with no delay between attempts. There is noRecoveryCallback
, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using anerror-channel
on the inbound endpoint. The following example usesRetryTemplate
and shows itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- Simple Stateless Retry with Recovery
-
The following example adds a
RecoveryCallback
to the preceding example and uses anErrorMessageSendingRecoverer
to send anErrorMessage
to a channel:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- Stateless Retry with Customized Policies, and Recovery
-
For more sophistication, we can provide the advice with a customized
RetryTemplate
. This example continues to use theSimpleRetryPolicy
but increases the attempts to four. It also adds anExponentialBackoffPolicy
where the first retry waits one second, the second waits five seconds and the third waits 25 (for four attempts in all). The following listing shows the example and itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- Namespace Support for Stateless Retry
-
Starting with version 4.0, the preceding configuration can be greatly simplified, thanks to the namespace support for the retry advice, as the following example shows:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
In the preceding example, the advice is defined as a top-level bean so that it can be used in multiple
request-handler-advice-chain
instances. You can also define the advice directly within the chain, as the following example shows:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A
<handler-retry-advice>
can have a<fixed-back-off>
or<exponential-back-off>
child element or have no child element. A<handler-retry-advice>
with no child element uses no back off. If there is norecovery-channel
, the exception is thrown when retries are exhausted. The namespace can only be used with stateless retry.For more complex environments (custom policies etc.), use normal
<bean>
definitions. - Simple Stateful Retry with Recovery
-
To make retry stateful, we need to provide the advice with a
RetryStateGenerator
implementation. This class is used to identify a message as being a resubmission so that theRetryTemplate
can determine the current state of retry for this message. The framework provides aSpelExpressionRetryStateGenerator
, which determines the message identifier by using a SpEL expression. This example again uses the default policies (three attempts with no back off). As with stateless retry, these policies can be customized. The following listing shows the example and itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
If you compare the preceding example with the stateless examples, you can see that, with stateful retry, the exception is thrown to the caller on each failure.
- Exception Classification for Retry
-
Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions and the exception classifier looks at the top-level exception. If you configure it to, say, retry only on
MyException
and your application throws aSomeOtherException
where the cause is aMyException
, retry does not occur.Since Spring Retry 1.0.3, the
BinaryExceptionClassifier
has a property calledtraverseCauses
(the default isfalse
). Whentrue
, it traverses exception causes until it finds a match or runs out of causes traversing.To use this classifier for retry, use a
SimpleRetryPolicy
created with the constructor that takes the max attempts, theMap
ofException
objects, and thetraverseCauses
boolean. Then you can inject this policy into theRetryTemplate
.
traverseCauses is required in this case because user exceptions may be wrapped in a MessagingException .
|
Circuit Breaker Advice
The general idea of the circuit breaker pattern is that, if a service is not currently available, do not waste time (and resources) trying to use it.
The o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
implements this pattern.
When the circuit breaker is in the closed state, the endpoint attempts to invoke the service.
The circuit breaker goes to the open state if a certain number of consecutive attempts fail.
When it is in the open state, new requests “fail fast” and no attempt is made to invoke the service until some time has expired.
When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occur. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.
Typically, this advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection).
The RequestHandlerCircuitBreakerAdvice
has two properties: threshold
and halfOpenAfter
.
The threshold
property represents the number of consecutive failures that need to occur before the breaker goes open.
It defaults to 5
.
The halfOpenAfter
property represents the time after the last failure that the breaker waits before attempting another request.
The default is 1000 milliseconds.
The following example configures a circuit breaker and shows its DEBUG
and ERROR
output:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
In the preceding example, the threshold is set to 2
and halfOpenAfter
is set to 12
seconds.
A new request arrives every 5 seconds.
The first two attempts invoked the service.
The third and fourth failed with an exception indicating that the circuit breaker is open.
The fifth request was attempted because the request was 15 seconds after the last failure.
The sixth attempt fails immediately because the breaker immediately went to open.
Expression Evaluating Advice
The final supplied advice class is the o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
.
This advice is more general than the other two advices.
It provides a mechanism to evaluate an expression on the original inbound message sent to the endpoint.
Separate expressions are available to be evaluated, after either success or failure.
Optionally, a message containing the evaluation result, together with the input message, can be sent to a message channel.
A typical use case for this advice might be with an <ftp:outbound-channel-adapter/>
, perhaps to move the file to one directory if the transfer was successful or to another directory if it fails:
The advice has properties to set an expression when successful, an expression for failures, and corresponding channels for each.
For the successful case, the message sent to the successChannel
is an AdviceMessage
, with the payload being the result of the expression evaluation.
An additional property, called inputMessage
, contains the original message sent to the handler.
A message sent to the failureChannel
(when the handler throws an exception) is an ErrorMessage
with a payload of MessageHandlingExpressionEvaluatingAdviceException
.
Like all MessagingException
instances, this payload has failedMessage
and cause
properties, as well as an additional property called evaluationResult
, which contains the result of the expression evaluation.
Starting with version 5.1.3, if channels are configured, but expressions are not provided, the default expression is used to evaluate to the payload of the message.
|
When an exception is thrown in the scope of the advice, by default, that exception is thrown to the caller after any failureExpression
is evaluated.
If you wish to suppress throwing the exception, set the trapException
property to true
.
The following advice shows how to configure an advice
with Java DSL:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
Rate Limiter Advice
The Rate Limiter advice (RateLimiterRequestHandlerAdvice
) allows to ensure that an endpoint does not get overloaded with requests.
When the rate limit is breached the request will go in a blocked state.
A typical use case for this advice might be an external service provider not allowing more than n
number of request per minute.
The RateLimiterRequestHandlerAdvice
implementation is fully based on the Resilience4j project and requires either RateLimiter
or RateLimiterConfig
injections.
Can also be configured with defaults and/or custom name.
The following example configures a rate limiter advice with one request per 1 second:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
Caching Advice
Starting with version 5.2, the CacheRequestHandlerAdvice
has been introduced.
It is based on the caching abstraction in Spring Framework and aligned with the concepts and functionality provided by the @Caching
annotation family.
The logic internally is based on the CacheAspectSupport
extension, where proxying for caching operations is done around the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
method with the request Message<?>
as the argument.
This advice can be configured with a SpEL expression or a Function
to evaluate a cache key.
The request Message<?>
is available as the root object for the SpEL evaluation context, or as the Function
input argument.
By default, the payload
of the request message is used for the cache key.
The CacheRequestHandlerAdvice
must be configured with cacheNames
, when a default cache operation is a CacheableOperation
, or with a set of any arbitrary CacheOperation
s.
Every CacheOperation
can be configured separately or have shared options, like a CacheManager
, CacheResolver
and CacheErrorHandler
, can be reused from the CacheRequestHandlerAdvice
configuration.
This configuration functionality is similar to Spring Framework’s @CacheConfig
and @Caching
annotation combination.
If a CacheManager
is not provided, a single bean is resolved by default from the BeanFactory
in the CacheAspectSupport
.
The following example configures two advices with different set of caching operations:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}