Use the aggregator application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.
The aggregator processor has the following options:
<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)<none>)false)<none>)<none>)<none>)<none>)false)true)<none>)testdb)<none>)all)<none>)<none>)<none>);)<none>)<none>)<none>)<none>)<none>)3.2.2)0)localhost)<none>)6379)false)0)<none>)By default the aggregator processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) - for correlation;
- SequenceSizeReleaseStrategy - for release;
- DefaultAggregatingMessageGroupProcessor - for aggregation;
- SimpleMessageStore - for messageStoreType.
The aggregator application can be configured for persistent MessageGroupStore implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import ed basing on the aggregator.messageStoreType configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator.
The JDBC JdbcMessageStore requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc package of the spring-integration-jdbc jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sqlA Processor module that returns messages that is passed by connecting just the input and output channels.
Use the filter module in a stream to determine whether a Message should be passed to the output channel.
A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
<none>)<none>)<none>)A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
<none>)<none>)<none>)Use the header-enricher app to add message headers.
The headers are provided in the form of a JSON map document, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
The header-enricher processor has the following options:
false)A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
The httpclient processor has the following options:
<none>)<none>)<none>)<none>)<none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)body)<none>)A processor that evaluates a machine learning model stored in PMML format.
The pmml processor has the following options:
<none>)<none>)<none>)<none>)<none>)A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
<none>)<none>)<none>)<none>)The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
true)<none>)<none>)<none>)<none>)true)When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters.
When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers
or charset to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter is configured.
When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.
![]() | Caution |
|---|---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>').
For example, consider the following JSON:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }}
and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload
containing the properties of a single book.
The tcp-client processor has the following options:
2048)UTF-8)<none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)<none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)localhost)<none>)<none>)60000)<none>)<none>)<none>)Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload. For example, --expression=payload.toUpperCase().
This transform will convert all message payloads to upper case.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')