Splitter
The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an aggregator.
Programming Model
The API for performing splitting consists of one base class, AbstractMessageSplitter
.
It is a MessageHandler
implementation that encapsulates features common to splitters, such as filling in the appropriate message headers (CORRELATION_ID
, SEQUENCE_SIZE
, and SEQUENCE_NUMBER
) on the messages that are produced.
This filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints).
The values can then be used, for example, by a composed message processor.
The following example shows an excerpt from AbstractMessageSplitter
:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
To implement a specific splitter in an application, you can extend AbstractMessageSplitter
and implement the splitMessage
method, which contains logic for splitting the messages.
The return value can be one of the following:
-
A
Collection
or an array of messages or anIterable
(orIterator
) that iterates over messages. In this case, the messages are sent as messages (after theCORRELATION_ID
,SEQUENCE_SIZE
andSEQUENCE_NUMBER
are populated). Using this approach gives you more control — for example, to populate custom message headers as part of the splitting process. -
A
Collection
or an array of non-message objects or anIterable
(orIterator
) that iterates over non-message objects. It works like the prior case, except that each collection element is used as a message payload. Using this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test. -
a
Message
or non-message object (but not a collection or an array). It works like the previous cases, except that a single message is sent out.
In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value.
In this case, the return value of the method is interpreted as described earlier.
The input argument might either be a Message
or a simple POJO.
In the latter case, the splitter receives the payload of the incoming message.
We recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.
Iterators
Starting with version 4.1, the AbstractMessageSplitter
supports the Iterator
type for the value
to split.
Note, in the case of an Iterator
(or Iterable
), we don’t have access to the number of underlying items and the SEQUENCE_SIZE
header is set to 0
.
This means that the default SequenceSizeReleaseStrategy
of an <aggregator>
won’t work and the group for the CORRELATION_ID
from the splitter
won’t be released; it will remain as incomplete
.
In this case you should use an appropriate custom ReleaseStrategy
or rely on send-partial-result-on-expiry
together with group-timeout
or a MessageGroupStoreReaper
.
Starting with version 5.0, the AbstractMessageSplitter
provides protected obtainSizeIfPossible()
methods to allow the determination of the size of the Iterable
and Iterator
objects if that is possible.
For example XPathMessageSplitter
can determine the size of the underlying NodeList
object.
And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode
.
An Iterator
object is useful to avoid the need for building an entire collection in the memory before splitting.
For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET
) using iterations or streams.
Stream and Flux
Starting with version 5.0, the AbstractMessageSplitter
supports the Java Stream
and Reactive Streams Publisher
types for the value
to split.
In this case, the target Iterator
is built on their iteration functionality.
In addition, if the splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel
, the AbstractMessageSplitter
produces a Flux
result instead of an Iterator
, and the output channel is subscribed to this Flux
for back-pressure-based splitting on downstream flow demand.
Starting with version 5.2, the splitter supports a discardChannel
option for sending those request messages for which a split function has returned an empty container (collection, array, stream, Flux
etc.).
In this case there is just no item to iterate for sending to the outputChannel
.
The null
splitting result remains as an end of flow indicator.
Configuring a Splitter with Java, Groovy and Kotlin DSLs
An example of simple splitter based on a Message
and its iterable payload with DSL configuration:
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
See more information about the DSLs in the respective chapters:
Configuring a Splitter with XML
A splitter can be configured through XML as follows:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | The ID of the splitter is optional. |
2 | A reference to a bean defined in the application context.
The bean must implement the splitting logic, as described in the earlier section.
Optional.
If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic is applied to the collection, incorporating each individual element into a message and sending it to the output-channel . |
3 | The method (defined on the bean) that implements the splitting logic. Optional. |
4 | The input channel of the splitter. Required. |
5 | The channel to which the splitter sends the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves). |
6 | The channel to which the request message is sent in case of empty splitting result.
Optional (they will stop as in case of null result). |
We recommend using a ref
attribute if the custom splitter implementation can be referenced in other <splitter>
definitions.
However, if the custom splitter handler implementation should be scoped to a single definition of the <splitter>
, you can configure an inner bean definition, as the following example follows:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
Using both a ref attribute and an inner handler definition in the same <int:splitter> configuration is not allowed, as it creates an ambiguous condition and results in an exception being thrown.
|
If the ref attribute references a bean that extends AbstractMessageProducingHandler (such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly.
In this case, each ref must be a separate bean instance (or a prototype -scoped bean) or use the inner <bean/> configuration type.
However, this optimization applies only if you do not provide any splitter-specific attributes in the splitter XML definition.
If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.
|
Configuring a Splitter with Annotations
The @Splitter
annotation is applicable to methods that expect either the Message
type or the message payload type, and the return values of the method should be a Collection
of any type.
If the returned values are not actual Message
objects, each item is wrapped in a Message
as the payload of the Message
.
Each resulting Message
is sent to the designated output channel for the endpoint on which the @Splitter
is defined.
The following example shows how to configure a splitter by using the @Splitter
annotation:
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
See also Advising Endpoints Using Annotations and File Splitter.