This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.5.2! |
Provided Advice Classes
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
-
RequestHandlerRetryAdvice
(described in Retry Advice) -
RequestHandlerCircuitBreakerAdvice
(described in Circuit Breaker Advice) -
ExpressionEvaluatingRequestHandlerAdvice
(described in Expression Advice) -
RateLimiterRequestHandlerAdvice
(described in Rate Limiter Advice) -
CacheRequestHandlerAdvice
(described in Caching Advice) -
ReactiveRequestHandlerAdvice
(described in Reactive Advice) -
ContextHolderRequestHandlerAdvice
(described in Context Holder Advice) -
LockRequestHandlerAdvice
(described in Lock Advice)
Retry Advice
The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice
) leverages the rich retry mechanisms provided by the Retry Support in Spring Framework.
The core component of this advice is the RetryTemplate
, which allows configuration of sophisticated retry scenarios, including RetryPolicy
as well as a RecoveryCallback
strategy to determine the action to take when retries are exhausted.
- Stateless Retry
-
Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action.
- Stateful Retry
-
Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator, (for example, JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission. For this purpose, the
RequestHandlerRetryAdvice
expose thestateKeyFunction
,newMessagePredicate
andstateCacheSize
properties. Where later two of them make sense only if the first is provided. Essentially, thestateKeyFunction
is an indicator to switchRequestHandlerRetryAdvice
logic from stateless to stateful. The meaning of thenewMessagePredicate
is two refresh an existing retry state for the key based on the message to handle. ThestateCacheSize
is100
by default, where elder entries are removed from the cache when more new retry states are coming. Perhaps those old messages are not redelivered anymore from the upstream flow, e.g., the message broker dead-lettered those messages according to its redelivery policy.
The default back off behavior is to not back off. Retries are attempted immediately. Using a back off policy that causes threads to pause between attempts may cause performance issues, including excessive memory use and thread starvation. In high-volume environments, back off policies should be used with caution. |
Configuring the Retry Advice
The examples in this section use the following @ServiceActivator
that always throws an exception:
public class FailingService {
@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
public void service(String message) {
throw new RuntimeException("error");
}
}
- Simple Stateless Retry
-
The default
RetryPolicy
is to try three times, plus original call for the targetMessageHandler
. There is no backoff by default, so the three attempts are made back-to-back-to-back with no delay between attempts. There is noRecoveryCallback
, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using anerror-channel
on the inbound endpoint. The following example uses default configuration for theRequestHandlerRetryAdvice
:@Bean RequestHandlerRetryAdvice retryAdvice() { return new RequestHandlerRetryAdvice(); }
- Simple Stateless Retry with Recovery
-
The following example adds a
RecoveryCallback
to the preceding example and uses anErrorMessageSendingRecoverer
to send anErrorMessage
to a channel:@Bean RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel)); return requestHandlerRetryAdvice; }
- Stateless Retry with Customized Policies, and Recovery
-
For more sophistication, a customized
RetryPolicy
can be provided for theRequestHandlerRetryAdvice
. This example continues to use the simpleRetryPolicy
but increases the attempts to four. It also adds anExponentialBackoff
where the first retry waits one second, the second waits five seconds, and the third waits 25 (for four attempts in all). The following listing shows the example of such a configuration:@Bean RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel())); RetryPolicy retryPolicy = RetryPolicy.builder() .maxAttempts(4) .delay(Duration.ofSeconds(1)) .multiplier(5.0) .maxDelay(Duration.ofMinutes(1)) .build(); requestHandlerRetryAdvice.setRetryPolicy(retryPolicy); return requestHandlerRetryAdvice; }
- Namespace Support for Stateless Retry
-
The following examples demonstrate how the
RequestHandlerRetryAdvice
can be configured using Spring Integration XML namespace with its custom tags:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
In the preceding example, the advice is defined as a top-level bean so that it can be used in multiple
request-handler-advice-chain
instances. You can also define the advice directly within the chain, as the following example shows:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A
<handler-retry-advice>
can have a<fixed-back-off>
or<exponential-back-off>
child element or have no child element. A<handler-retry-advice>
with no child element uses no back off. If there is norecovery-channel
, the exception is thrown when retries are exhausted. The namespace can only be used with stateless retry.For more complex environments (custom policies etc.), use normal
<bean>
definitions. - Simple Stateful Retry with Recovery
-
To make retry stateful, a
Function<Message<?>, Object> stateKeyFunction
must be provided for theRequestHandlerRetryAdvice
instance. This function is used to identify a message as being a resubmission so that theRequestHandlerRetryAdvice
can determine the current state of retry for this message. The idea behind stateful retry is to not block the current thread, but rather cache the retry state for this message and re-throwMessageHandler
failure back to the caller. Usually this works well with message originators which are able to resubmit (or redeliver) events, for example, message brokers like RabbitMQ withnack
, or Apache Kafka with seek functionality; or JMS after rollback on the consumption. If there is no cached state yet (or thePredicate<Message<?>> newMessagePredicate
returnstrue
for the message), theMessageHandler
call is treated as the first one, and on its failure an internalRetryState
based on aBackOffExecution
, is cached under the mentioned key. On the next message arrival, the cached state provides a backoff interval forThread.sleep()
before an attempt to call theMessageHandler
. If this backoff interval is equal toBackOffExecution.STOP
(e.g.,maxAttempts
have been reached), that mean no more retries for this message: the whole retry cycle is treated as exhausted, and respectiveRetryException
is thrown back to the caller or used forRecoveryCallback
invocation if provided. In general, the exception handling logic and backoff execution are similar to the stateless behavior, with only difference that thread is not blocked for all themaxAttempts
. It is up to the message originator to redeliver a message for the next retry call.
Circuit Breaker Advice
The general idea of the circuit breaker pattern is that, if a service is not currently available, do not waste time (and resources) trying to use it.
The o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
implements this pattern.
When the circuit breaker is in the closed state, the endpoint attempts to invoke the service.
The circuit breaker goes to the open state if a certain number of consecutive attempts fail.
When it is in the open state, new requests “fail fast” and no attempt is made to invoke the service until some time has expired.
When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occurs. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.
Typically, this advice might be used for external services, where it might take some time to fail, such as a timeout attempting to make a network connection.
The RequestHandlerCircuitBreakerAdvice
has two properties: threshold
and halfOpenAfter
.
The threshold
property represents the number of consecutive failures that need to occur before the breaker goes open.
It defaults to 5
.
The halfOpenAfter
property represents the time after the last failure that the breaker waits before attempting another request.
The default is 1000 milliseconds.
The following example configures a circuit breaker and shows its DEBUG
and ERROR
output:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
In the preceding example, the threshold is set to 2
and halfOpenAfter
is set to 12
seconds.
A new request arrives every 5 seconds.
The first two attempts invoked the service.
The third and fourth failed with an exception indicating that the circuit breaker is open.
The fifth request was attempted because the request was 15 seconds after the last failure.
The sixth attempt fails immediately because the breaker immediately went to open.
Expression Evaluating Advice
The final supplied advice class is the o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
.
This advice is more general than the other two advices.
It provides a mechanism to evaluate an expression on the original inbound message sent to the endpoint.
Separate expressions are available to be evaluated, after either success or failure.
Optionally, a message containing the evaluation result, together with the input message, can be sent to a message channel.
A typical use case for this advice might be with an <ftp:outbound-channel-adapter/>
, perhaps to move the file to one directory if the transfer was successful or to another directory if it fails:
The advice has properties to set an expression when successful, an expression for failures, and corresponding channels for each.
For the successful case, the message sent to the successChannel
is an AdviceMessage
, with the payload being the result of the expression evaluation.
An additional property, called inputMessage
, contains the original message sent to the handler.
A message sent to the failureChannel
(when the handler throws an exception) is an ErrorMessage
with a payload of MessageHandlingExpressionEvaluatingAdviceException
.
Like all MessagingException
instances, this payload has failedMessage
and cause
properties, as well as an additional property called evaluationResult
, which contains the result of the expression evaluation.
Starting with version 5.1.3, if channels are configured, but expressions are not provided, the default expression is used to evaluate to the payload of the message.
|
When an exception is thrown in the scope of the advice, by default, that exception is thrown to the caller after any failureExpression
is evaluated.
If you wish to suppress throwing the exception, set the trapException
property to true
.
The following advice shows how to configure an advice
with Java DSL:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
Rate Limiter Advice
The Rate Limiter advice (RateLimiterRequestHandlerAdvice
) allows to ensure that an endpoint does not get overloaded with requests.
When the rate limit is breached, the request will go in a blocked state.
A typical use case for this advice might be an external service provider not allowing more than n
number of request per minute.
The RateLimiterRequestHandlerAdvice
implementation is fully based on the Resilience4j project and requires either RateLimiter
or RateLimiterConfig
injections.
Can also be configured with defaults and/or custom name.
The following example configures a rate limiter advice with one request per 1 second:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
Caching Advice
Starting with version 5.2, the CacheRequestHandlerAdvice
has been introduced.
It is based on the caching abstraction in Spring Framework and aligned with the concepts and functionality provided by the @Caching
annotation family.
The logic internally is based on the CacheAspectSupport
extension, where proxying for caching operations is done around the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
method with the request Message<?>
as the argument.
This advice can be configured with a SpEL expression or a Function
to evaluate a cache key.
The request Message<?>
is available as the root object for the SpEL evaluation context, or as the Function
input argument.
By default, the payload
of the request message is used for the cache key.
The CacheRequestHandlerAdvice
must be configured with cacheNames
, when a default cache operation is a CacheableOperation
, or with a set of any arbitrary CacheOperation
s.
Every CacheOperation
can be configured separately or have shared options, like a CacheManager
, CacheResolver
and CacheErrorHandler
, can be reused from the CacheRequestHandlerAdvice
configuration.
This configuration functionality is similar to Spring Framework’s @CacheConfig
and @Caching
annotation combination.
If a CacheManager
is not provided, a single bean is resolved by default from the BeanFactory
in the CacheAspectSupport
.
The following example configures two advice with different sets of caching operations:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}