For the latest stable version, please use Spring Integration 6.4.1! |
Idempotent Receiver Enterprise Integration Pattern
Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern.
It is a functional pattern and the whole idempotency logic should be implemented in the application.
However, to simplify the decision-making, the IdempotentReceiverInterceptor
component is provided.
This is an AOP Advice
that is applied to the MessageHandler.handleMessage()
method and that can filter
a request message or mark it as a duplicate
, according to its configuration.
Previously, you could have implemented this pattern by using a custom MessageSelector
in a <filter/>
(see Filter), for example.
However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component.
Rather, it is applied to endpoints declared in the application.
The logic of the IdempotentReceiverInterceptor
is based on the provided MessageSelector
and, if the message is not accepted by that selector, it is enriched with the duplicateMessage
header set to true
.
The target MessageHandler
(or downstream flow) can consult this header to implement the correct idempotency logic.
If the IdempotentReceiverInterceptor
is configured with a discardChannel
or throwExceptionOnRejection = true
, the duplicate message is not sent to the target MessageHandler.handleMessage()
.
Rather, it is discarded.
If you want to discard (do nothing with) the duplicate message, the discardChannel
should be configured with a NullChannel
, such as the default nullChannel
bean.
To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector
.
It accepts a MessageProcessor
implementation (which creates a lookup key based on the Message
) and an optional ConcurrentMetadataStore
(Metadata Store).
See the MetadataStoreSelector
Javadoc for more information.
You can also customize the value
for ConcurrentMetadataStore
by using an additional MessageProcessor
.
By default, MetadataStoreSelector
uses the timestamp
message header.
Normally, the selector selects a message for acceptance if there is no existing value for the key.
In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted.
Starting with version 5.3, the compareValues
property is provided which references a BiPredicate<String, String>
; the first parameter is the old value; return true
to accept the message and replace the old value with the new value in the MetadataStore
.
This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value.
Then, after a restart, you can skip lines that have already been processed.
See Idempotent Downstream Processing a Split File for an example.
For convenience, the MetadataStoreSelector
options are configurable directly on the <idempotent-receiver>
component.
The following listing shows all the possible attributes:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | The ID of the IdempotentReceiverInterceptor bean.
Optional. |
2 | Consumer endpoint name(s) or pattern(s) to which this interceptor is applied.
Separate names (patterns) with commas (, ), such as endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
Endpoint bean names matching these patterns are then used to retrieve the target endpoint’s MessageHandler bean (using its .handler suffix), and the IdempotentReceiverInterceptor is applied to those beans.
Required. |
3 | A MessageSelector bean reference.
Mutually exclusive with metadata-store and key-strategy (key-expression) .
When selector is not provided, one of key-strategy or key-strategy-expression is required. |
4 | Identifies the channel to which to send a message when the IdempotentReceiverInterceptor does not accept it.
When omitted, duplicate messages are forwarded to the handler with a duplicateMessage header.
Optional. |
5 | A ConcurrentMetadataStore reference.
Used by the underlying MetadataStoreSelector .
Mutually exclusive with selector .
Optional.
The default MetadataStoreSelector uses an internal SimpleMetadataStore that does not maintain state across application executions. |
6 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey from the request message.
Mutually exclusive with selector and key-expression .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
7 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and key-strategy .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
8 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey from the request message.
Mutually exclusive with selector and value-expression .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'. |
9 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and value-strategy .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'. |
10 | A reference to a BiPredicate<String, String> bean which allows you to optionally select a message by comparing the old and new values for the key; null by default. |
11 | Whether to throw an exception if the IdempotentReceiverInterceptor rejects the message.
Defaults to false .
It is applied regardless of whether or not a discard-channel is provided. |
For Java configuration, Spring Integration provides the method-level @IdempotentReceiver
annotation.
It is used to mark a method
that has a messaging annotation (@ServiceActivator
, @Router, and others) to specify which `IdempotentReceiverInterceptor
objects are applied to this endpoint.
The following example shows how to use the @IdempotentReceiver
annotation:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
The IdempotentReceiverInterceptor is designed only for the MessageHandler.handleMessage(Message<?>) method.
Starting with version 4.3.1, it implements HandleMessageAdvice , with the AbstractHandleMessageAdvice as a base class, for better dissociation.
See Handling Message Advice for more information.
|