Adding Behavior to Endpoints
Prior to Spring Integration 2.2, you could add behavior to an entire Integration flow by adding an AOP Advice to a poller’s <advice-chain/>
element.
However, suppose you want to retry, say, just a REST Web Service call, and not any downstream endpoints.
For example, consider the following flow:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
If you configure some retry-logic into an advice chain on the poller and the call to http-gateway2
failed because of a network glitch, the retry causes both http-gateway1
and http-gateway2
to be called a second time.
Similarly, after a transient failure in the jdbc-outbound-adapter, both HTTP gateways are called a second time before again calling the jdbc-outbound-adapter
.
Spring Integration 2.2 adds the ability to add behavior to individual endpoints.
This is achieved by the addition of the <request-handler-advice-chain/>
element to many endpoints.
The following example shows how to the <request-handler-advice-chain/>
element within an outbound-gateway
:
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
In this case, myRetryAdvice
is applied only locally to this gateway and does not apply to further actions taken downstream after the reply is sent to nextChannel
.
The scope of the advice is limited to the endpoint itself.
At this time, you cannot advise an entire However, a |
Provided Advice Classes
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
-
RequestHandlerRetryAdvice
(described in Retry Advice) -
RequestHandlerCircuitBreakerAdvice
(described in Circuit Breaker Advice) -
ExpressionEvaluatingRequestHandlerAdvice
(described in Expression Evaluating Advice) -
RateLimiterRequestHandlerAdvice
(described in Rate Limiter Advice) -
CacheRequestHandlerAdvice
(described in Caching Advice) -
ReactiveRequestHandlerAdvice
(described in Reactive Advice)
Retry Advice
The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice
) leverages the rich retry mechanisms provided by the Spring Retry project.
The core component of spring-retry
is the RetryTemplate
, which allows configuration of sophisticated retry scenarios, including RetryPolicy
and BackoffPolicy
strategies (with a number of implementations) as well as a RecoveryCallback
strategy to determine the action to take when retries are exhausted.
- Stateless Retry
-
Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action.
- Stateful Retry
-
Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator (for example,JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission.
For more information on spring-retry
, see the project’s Javadoc and the reference documentation for Spring Batch, where spring-retry
originated.
The default back off behavior is to not back off. Retries are attempted immediately. Using a back off policy that causes threads to pause between attempts may cause performance issues, including excessive memory use and thread starvation. In high-volume environments, back off policies should be used with caution. |
Configuring the Retry Advice
The examples in this section use the following <service-activator>
that always throws an exception:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- Simple Stateless Retry
-
The default
RetryTemplate
has aSimpleRetryPolicy
which tries three times. There is noBackOffPolicy
, so the three attempts are made back-to-back-to-back with no delay between attempts. There is noRecoveryCallback
, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using anerror-channel
on the inbound endpoint. The following example usesRetryTemplate
and shows itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- Simple Stateless Retry with Recovery
-
The following example adds a
RecoveryCallback
to the preceding example and uses anErrorMessageSendingRecoverer
to send anErrorMessage
to a channel:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- Stateless Retry with Customized Policies, and Recovery
-
For more sophistication, we can provide the advice with a customized
RetryTemplate
. This example continues to use theSimpleRetryPolicy
but increases the attempts to four. It also adds anExponentialBackoffPolicy
where the first retry waits one second, the second waits five seconds and the third waits 25 (for four attempts in all). The following listing shows the example and itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- Namespace Support for Stateless Retry
-
Starting with version 4.0, the preceding configuration can be greatly simplified, thanks to the namespace support for the retry advice, as the following example shows:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
In the preceding example, the advice is defined as a top-level bean so that it can be used in multiple
request-handler-advice-chain
instances. You can also define the advice directly within the chain, as the following example shows:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A
<handler-retry-advice>
can have a<fixed-back-off>
or<exponential-back-off>
child element or have no child element. A<handler-retry-advice>
with no child element uses no back off. If there is norecovery-channel
, the exception is thrown when retries are exhausted. The namespace can only be used with stateless retry.For more complex environments (custom policies etc), use normal
<bean>
definitions. - Simple Stateful Retry with Recovery
-
To make retry stateful, we need to provide the advice with a
RetryStateGenerator
implementation. This class is used to identify a message as being a resubmission so that theRetryTemplate
can determine the current state of retry for this message. The framework provides aSpelExpressionRetryStateGenerator
, which determines the message identifier by using a SpEL expression. This example again uses the default policies (three attempts with no back off). As with stateless retry, these policies can be customized. The following listing shows the example and itsDEBUG
output:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
If you compare the preceding example with the stateless examples, you can see that, with stateful retry, the exception is thrown to the caller on each failure.
- Exception Classification for Retry
-
Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions and the exception classifier looks at the top-level exception. If you configure it to, say, retry only on
MyException
and your application throws aSomeOtherException
where the cause is aMyException
, retry does not occur.Since Spring Retry 1.0.3, the
BinaryExceptionClassifier
has a property calledtraverseCauses
(the default isfalse
). Whentrue
, it traverses exception causes until it finds a match or runs out of causes to traverse.To use this classifier for retry, use a
SimpleRetryPolicy
created with the constructor that takes the max attempts, theMap
ofException
objects, and thetraverseCauses
boolean. Then you can inject this policy into theRetryTemplate
.
traverseCauses is required in this case because user exceptions may be wrapped in a MessagingException .
|
Circuit Breaker Advice
The general idea of the circuit breaker pattern is that, if a service is not currently available, do not waste time (and resources) trying to use it.
The o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
implements this pattern.
When the circuit breaker is in the closed state, the endpoint attempts to invoke the service.
The circuit breaker goes to the open state if a certain number of consecutive attempts fail.
When it is in the open state, new requests “fail fast” and no attempt is made to invoke the service until some time has expired.
When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occur. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.
Typically, this advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection).
The RequestHandlerCircuitBreakerAdvice
has two properties: threshold
and halfOpenAfter
.
The threshold
property represents the number of consecutive failures that need to occur before the breaker goes open.
It defaults to 5
.
The halfOpenAfter
property represents the time after the last failure that the breaker waits before attempting another request.
The default is 1000 milliseconds.
The following example configures a circuit breaker and shows its DEBUG
and ERROR
output:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
In the preceding example, the threshold is set to 2
and halfOpenAfter
is set to 12
seconds.
A new request arrives every 5 seconds.
The first two attempts invoked the service.
The third and fourth failed with an exception indicating that the circuit breaker is open.
The fifth request was attempted because the request was 15 seconds after the last failure.
The sixth attempt fails immediately because the breaker immediately went to open.
Expression Evaluating Advice
The final supplied advice class is the o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
.
This advice is more general than the other two advices.
It provides a mechanism to evaluate an expression on the original inbound message sent to the endpoint.
Separate expressions are available to be evaluated, after either success or failure.
Optionally, a message containing the evaluation result, together with the input message, can be sent to a message channel.
A typical use case for this advice might be with an <ftp:outbound-channel-adapter/>
, perhaps to move the file to one directory if the transfer was successful or to another directory if it fails:
The advice has properties to set an expression when successful, an expression for failures, and corresponding channels for each.
For the successful case, the message sent to the successChannel
is an AdviceMessage
, with the payload being the result of the expression evaluation.
An additional property, called inputMessage
, contains the original message sent to the handler.
A message sent to the failureChannel
(when the handler throws an exception) is an ErrorMessage
with a payload of MessageHandlingExpressionEvaluatingAdviceException
.
Like all MessagingException
instances, this payload has failedMessage
and cause
properties, as well as an additional property called evaluationResult
, which contains the result of the expression evaluation.
Starting with version 5.1.3, if channels are configured, but expressions are not provided, the default expression is used to evaluate to the payload of the message.
|
When an exception is thrown in the scope of the advice, by default, that exception is thrown to the caller after any failureExpression
is evaluated.
If you wish to suppress throwing the exception, set the trapException
property to true
.
The following advice shows how to configure an advice with Java DSL:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
Rate Limiter Advice
The Rate Limiter advice (RateLimiterRequestHandlerAdvice
) allows to ensure that an endpoint does not get overloaded with requests.
When the rate limit is breached the request will go in a blocked state.
A typical use case for this advice might be an external service provider not allowing more than n
number of request per minute.
The RateLimiterRequestHandlerAdvice
implementation is fully based on the Resilience4j project and requires either RateLimiter
or RateLimiterConfig
injections.
Can also be configured with defaults and/or custom name.
The following example configures a rate limiter advice with one request per 1 second:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
Caching Advice
Starting with version 5.2, the CacheRequestHandlerAdvice
has been introduced.
It is based on the caching abstraction in Spring Framework and aligned with the concepts and functionality provided by the @Caching
annotation family.
The logic internally is based on the CacheAspectSupport
extension, where proxying for caching operations is done around the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
method with the request Message<?>
as the argument.
This advice can be configured with a SpEL expression or a Function
to evaluate a cache key.
The request Message<?>
is available as the root object for the SpEL evaluation context, or as the Function
input argument.
By default, the payload
of the request message is used for the cache key.
The CacheRequestHandlerAdvice
must be configured with cacheNames
, when a default cache operation is a CacheableOperation
, or with a set of any arbitrary CacheOperation
s.
Every CacheOperation
can be configured separately or have shared options, like a CacheManager
, CacheResolver
and CacheErrorHandler
, can be reused from the CacheRequestHandlerAdvice
configuration.
This configuration functionality is similar to Spring Framework’s @CacheConfig
and @Caching
annotation combination.
If a CacheManager
is not provided, a single bean is resolved by default from the BeanFactory
in the CacheAspectSupport
.
The following example configures two advices with different set of caching operations:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
Reactive Advice
Starting with version 5.3, a ReactiveRequestHandlerAdvice
can be used for request message handlers producing a Mono
replies.
A BiFunction<Message<?>, Mono<?>, Publisher<?>>
has to be provided for this advice and it is called from the Mono.transform()
operator on a reply produced by the intercepted handleRequestMessage()
method implementation.
Typically such a Mono
customization is necessary when we would like to control network fluctuations via timeout()
, retry()
and similar support operators.
For example when we can an HTTP request over WebFlux client, we could use below configuration to not wait for response more than 5 seconds:
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
The message
argument is the request message for the message handler and can be used to determine request-scope attributes.
The mono
argument is the result of this message handler’s handleRequestMessage()
method implementation.
A nested Mono.transform()
can also be called from this function to apply, for example, a Reactive Circuit Breaker.
Custom Advice Classes
In addition to the provided advice classes described earlier, you can implement your own advice classes.
While you can provide any implementation of org.aopalliance.aop.Advice
(usually org.aopalliance.intercept.MethodInterceptor
), we generally recommend that you subclass o.s.i.handler.advice.AbstractRequestHandlerAdvice
.
This has the benefit of avoiding the writing of low-level aspect-oriented programming code as well as providing a starting point that is specifically tailored for use in this environment.
Subclasses need to implement the doInvoke()
method, the definition of which follows:
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
The callback parameter is a convenience to avoid subclasses that deal with AOP directly.
Invoking the callback.execute()
method invokes the message handler.
The target
parameter is provided for those subclasses that need to maintain state for a specific handler, perhaps by maintaining that state in a Map
keyed by the target.
This feature allows the same advice to be applied to multiple handlers.
The RequestHandlerCircuitBreakerAdvice
uses advice this to keep circuit breaker state for each handler.
The message
parameter is the message sent to the handler.
While the advice cannot modify the message before invoking the handler, it can modify the payload (if it has mutable properties).
Typically, an advice would use the message for logging or to send a copy of the message somewhere before or after invoking the handler.
The return value would normally be the value returned by callback.execute()
.
However, the advice does have the ability to modify the return value.
Note that only AbstractReplyProducingMessageHandler
instances return values.
The following example shows a custom advice class that extends AbstractRequestHandlerAdvice
:
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
In addition to the For more information, see the ReflectiveMethodInvocation Javadoc. |
Other Advice Chain Elements
While the abstract class mentioned above is a convenience, you can add any Advice
, including a transaction advice, to the chain.
Handling Message Advice
As discussed in the introduction to this section, advice objects in a request handler advice chain are applied to just the current endpoint, not the downstream flow (if any).
For MessageHandler
objects that produce a reply (such as those that extend AbstractReplyProducingMessageHandler
), the advice is applied to an internal method: handleRequestMessage()
(called from MessageHandler.handleMessage()
).
For other message handlers, the advice is applied to MessageHandler.handleMessage()
.
There are some circumstances where, even if a message handler is an AbstractReplyProducingMessageHandler
, the advice must be applied to the handleMessage
method.
For example, the idempotent receiver might return null
, which would cause an exception if the handler’s replyRequired
property is set to true
.
Another example is the BoundRabbitChannelAdvice
— see Strict Message Ordering.
Starting with version 4.3.1, a new HandleMessageAdvice
interface and its base implementation (AbstractHandleMessageAdvice
) have been introduced.
Advice
objects that implement HandleMessageAdvice
are always applied to the handleMessage()
method, regardless of the handler type.
It is important to understand that HandleMessageAdvice
implementations (such as idempotent receiver), when applied to a handlers that return responses, are dissociated from the adviceChain
and properly applied to the MessageHandler.handleMessage()
method.
Because of this disassociation, the advice chain order is not honored. |
Consider the following configuration:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
In the preceding example, the <tx:advice>
is applied to the AbstractReplyProducingMessageHandler.handleRequestMessage()
.
However, myHandleMessageAdvice
is applied for to MessageHandler.handleMessage()
.
Therefore, it is invoked before the <tx:advice>
.
To retain the order, you should follow the standard Spring AOP configuration approach and use an endpoint id
together with the .handler
suffix to obtain the target MessageHandler
bean.
Note that, in that case, the entire downstream flow is within the transaction scope.
In the case of a MessageHandler
that does not return a response, the advice chain order is retained.
Starting with version 5.3, the HandleMessageAdviceAdapter
is present to let apply any existing MethodInterceptor
for the MessageHandler.handleMessage()
and, therefore, whole sub-flow.
For example a RetryOperationsInterceptor
could be applied for the whole sub-flow starting from some endpoint, which is not possible by default because consumer endpoint applies advices only for the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
.
Starting with version 5.3, the HandleMessageAdviceAdapter
is provided to apply any MethodInterceptor
for the MessageHandler.handleMessage()
method and, therefore, the whole sub-flow.
For example, a RetryOperationsInterceptor
could be applied to the whole sub-flow starting from some endpoint; this is not possible, by default, because the consumer endpoint applies advices only to the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
.
Transaction Support
Starting with version 5.0, a new TransactionHandleMessageAdvice
has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice
implementation.
When a regular TransactionInterceptor
is used in the <request-handler-advice-chain>
element (for example, through configuring <tx:advice>
), a started transaction is only applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage()
and is not propagated to the downstream flow.
To simplify XML configuration, along with the <request-handler-advice-chain>
, a <transactional>
element has been added to all <outbound-gateway>
and <service-activator>
and related components.
The following example shows <transactional>
in use:
<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
request-channel="good" reply-channel="reply" port="#{@port}">
<int-rmi:transactional/>
</int-rmi:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller>
or a message-driven channel adapter such as JMS.
Java configuration can be simplified by using the TransactionInterceptorBuilder
, and the result bean name can be used in the messaging annotations adviceChain
attribute, as the following example shows:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
Note the true
parameter on the TransactionInterceptorBuilder
constructor.
It causes the creation of a TransactionHandleMessageAdvice
, not a regular TransactionInterceptor
.
Java DSL supports an Advice
through the .transactional()
options on the endpoint configuration, as the following example shows:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
Advising Filters
There is an additional consideration when advising Filter
advices.
By default, any discard actions (when the filter returns false
) are performed within the scope of the advice chain.
This could include all the flow downstream of the discard channel.
So, for example, if an element downstream of the discard channel throws an exception and there is a retry advice, the process is retried.
Also, if throwExceptionOnRejection
is set to true
(the exception is thrown within the scope of the advice).
Setting discard-within-advice
to false
modifies this behavior and the discard (or exception) occurs after the advice chain is called.
Advising Endpoints Using Annotations
When configuring certain endpoints by using annotations (@Filter
, @ServiceActivator
, @Splitter
, and @Transformer
), you can supply a bean name for the advice chain in the adviceChain
attribute.
In addition, the @Filter
annotation also has the discardWithinAdvice
attribute, which can be used to configure the discard behavior, as discussed in Advising Filters.
The following example causes the discard to be performed after the advice:
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
Ordering Advices within an Advice Chain
Advice classes are “around” advices and are applied in a nested fashion. The first advice is the outermost, while the last advice is the innermost (that is, closest to the handler being advised). It is important to put the advice classes in the correct order to achieve the functionality you desire.
For example, suppose you want to add a retry advice and a transaction advice.
You may want to place the retry advice advice first, followed by the transaction advice.
Consequently, each retry is performed in a new transaction.
On the other hand, if you want all the attempts and any recovery operations (in the retry RecoveryCallback
) to be scoped within the transaction, you could put the transaction advice first.
Advised Handler Properties
Sometimes, it is useful to access handler properties from within the advice.
For example, most handlers implement NamedComponent
to let you access the component name.
The target object can be accessed through the target
argument (when subclassing AbstractRequestHandlerAdvice
) or invocation.getThis()
(when implementing org.aopalliance.intercept.MethodInterceptor
).
When the entire handler is advised (such as when the handler does not produce replies or the advice implements HandleMessageAdvice
), you can cast the target object to an interface, such as NamedComponent
, as shown in the following example:
String componentName = ((NamedComponent) target).getComponentName();
When you implement MethodInterceptor
directly, you could cast the target object as follows:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
When only the handleRequestMessage()
method is advised (in a reply-producing handler), you need to access the full handler, which is an AbstractReplyProducingMessageHandler
.
The following example shows how to do so:
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
Idempotent Receiver Enterprise Integration Pattern
Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern.
It is a functional pattern and the whole idempotency logic should be implemented in the application.
However, to simplify the decision-making, the IdempotentReceiverInterceptor
component is provided.
This is an AOP Advice
that is applied to the MessageHandler.handleMessage()
method and that can filter
a request message or mark it as a duplicate
, according to its configuration.
Previously, you could have implemented this pattern by using a custom MessageSelector
in a <filter/>
(see Filter), for example.
However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component.
Rather, it is applied to endpoints declared in the application.
The logic of the IdempotentReceiverInterceptor
is based on the provided MessageSelector
and, if the message is not accepted by that selector, it is enriched with the duplicateMessage
header set to true
.
The target MessageHandler
(or downstream flow) can consult this header to implement the correct idempotency logic.
If the IdempotentReceiverInterceptor
is configured with a discardChannel
or throwExceptionOnRejection = true
, the duplicate message is not sent to the target MessageHandler.handleMessage()
.
Rather, it is discarded.
If you want to discard (do nothing with) the duplicate message, the discardChannel
should be configured with a NullChannel
, such as the default nullChannel
bean.
To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector
.
It accepts a MessageProcessor
implementation (which creates a lookup key based on the Message
) and an optional ConcurrentMetadataStore
(Metadata Store).
See the MetadataStoreSelector
Javadoc for more information.
You can also customize the value
for ConcurrentMetadataStore
by using an additional MessageProcessor
.
By default, MetadataStoreSelector
uses the timestamp
message header.
Normally, the selector selects a message for acceptance if there is no existing value for the key.
In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted.
Starting with version 5.3, the compareValues
property is provided which references a BiPredicate<String, String>
; the first parameter is the old value; return true
to accept the message and replace the old value with the new value in the MetadataStore
.
This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value.
Then, after a restart, you can skip lines that have already been processed.
See Idempotent Downstream Processing a Split File for an example.
For convenience, the MetadataStoreSelector
options are configurable directly on the <idempotent-receiver>
component.
The following listing shows all the possible attributes:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | The ID of the IdempotentReceiverInterceptor bean.
Optional. |
2 | Consumer endpoint name(s) or pattern(s) to which this interceptor is applied.
Separate names (patterns) with commas (, ), such as endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
Endpoint bean names matching these patterns are then used to retrieve the target endpoint’s MessageHandler bean (using its .handler suffix), and the IdempotentReceiverInterceptor is applied to those beans.
Required. |
3 | A MessageSelector bean reference.
Mutually exclusive with metadata-store and key-strategy (key-expression) .
When selector is not provided, one of key-strategy or key-strategy-expression is required. |
4 | Identifies the channel to which to send a message when the IdempotentReceiverInterceptor does not accept it.
When omitted, duplicate messages are forwarded to the handler with a duplicateMessage header.
Optional. |
5 | A ConcurrentMetadataStore reference.
Used by the underlying MetadataStoreSelector .
Mutually exclusive with selector .
Optional.
The default MetadataStoreSelector uses an internal SimpleMetadataStore that does not maintain state across application executions. |
6 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey from the request message.
Mutually exclusive with selector and key-expression .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
7 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and key-strategy .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
8 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey from the request message.
Mutually exclusive with selector and value-expression .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'. |
9 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and value-strategy .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'. |
10 | A reference to a BiPredicate<String, String> bean which allows you to optionally select a message by comparing the old and new values for the key; null by default. |
11 | Whether to throw an exception if the IdempotentReceiverInterceptor rejects the message.
Defaults to false .
It is applied regardless of whether or not a discard-channel is provided. |
For Java configuration, Spring Integration provides the method-level @IdempotentReceiver
annotation.
It is used to mark a method
that has a messaging annotation (@ServiceActivator
, @Router, and others) to specify which `IdempotentReceiverInterceptor
objects are applied to this endpoint.
The following example shows how to use the @IdempotentReceiver
annotation:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
The IdempotentReceiverInterceptor is designed only for the MessageHandler.handleMessage(Message<?>) method.
Starting with version 4.3.1, it implements HandleMessageAdvice , with the AbstractHandleMessageAdvice as a base class, for better dissociation.
See Handling Message Advice for more information.
|