3. Processors

3.1 Aggregator Processor

Use the aggregator application to combine multiple messages into one, based on some correlation mechanism.

This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.

3.1.1 Options

The aggregator processor has the following options:

aggregator.aggregation
SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default: <none>)
aggregator.correlation
SpEL expression for correlation key. Default to correlationId header (Expression, default: <none>)
aggregator.group-timeout
SpEL expression for timeout to expiring uncompleted groups (Expression, default: <none>)
aggregator.message-store-entity
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default: <none>)
aggregator.message-store-type
Message store type (String, default: <none>)
aggregator.release
SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default: <none>)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (java.lang.Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with uri. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with uri. (char[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with uri. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: <none>)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with uri. (String, default: <none>)
spring.datasource.continue-on-error
Do not stop if an error occurs while initializing the database. (Boolean, default: false)
spring.datasource.data
Data (DML) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.data-password
Password of the database to execute DML scripts. (String, default: <none>)
spring.datasource.data-username
User of the database to execute DML scripts. (String, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.generate-unique-name
Generate a random datasource name. (Boolean, default: false)
spring.datasource.initialize
Populate the database using 'data.sql'. (Boolean, default: true)
spring.datasource.jndi-name
JNDI location of the datasource. Class, url, username & password are ignored when set. (String, default: <none>)
spring.datasource.name
Name of the datasource. (String, default: testdb)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.platform
Platform to use in the schema resource (schema-${platform}.sql). (String, default: all)
spring.datasource.schema
Schema (DDL) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.schema-password
Password of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.schema-username
User of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.separator
Statement separator in SQL initialization scripts. (String, default: ;)
spring.datasource.sql-script-encoding
SQL scripts encoding. (Charset, default: <none>)
spring.datasource.type
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (java.lang.Class<? extends javax.sql.DataSource>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login user of the database. (String, default: <none>)
spring.mongodb.embedded.features
Comma-separated list of features to enable. (java.util.Set<de.flapdoodle.embed.mongo.distribution.Feature>, default: <none>)
spring.mongodb.embedded.version
Version of Mongo to use. (String, default: 3.2.2)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Enable SSL. (Boolean, default: false)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)
spring.redis.url
Redis url, which will overrule host, port and password if set. (String, default: <none>)

By default the aggregator processor uses: - HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) - for correlation; - SequenceSizeReleaseStrategy - for release; - DefaultAggregatingMessageGroupProcessor - for aggregation; - SimpleMessageStore - for messageStoreType.

The aggregator application can be configured for persistent MessageGroupStore implementations. The configuration for target technology is fully based on the Spring Boot auto-configuration. But default JDBC, MongoDb and Redis auto-configurations are excluded. They are @Import ed basing on the aggregator.messageStoreType configuration property. Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator.

The JDBC JdbcMessageStore requires particular tables in the target data base. You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc package of the spring-integration-jdbc jar. Those scripts can be used for automatic data base initialization via Spring Boot.

For example:

java -jar aggregator-rabbit-1.0.0.RELEASE
               --aggregator.message-store-type=jdbc
               --spring.datasource.url=jdbc:h2:mem:test
               --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql

3.2 Bridge Processor

A Processor module that returns messages that is passed by connecting just the input and output channels.

3.3 Filter Processor

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

3.3.1 Options

The filter processor has the following options:

filter.expression
A SpEL expression to be evaluated against each message, to decide whether or not to accept it. (Expression, default: true)

3.4 Groovy Filter Processor

A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.

3.4.1 Options

The groovy-filter processor has the following options:

groovy-filter.script
The resource location of the groovy script (Resource, default: <none>)
groovy-filter.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-filter.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.5 Groovy Transform Processor

A Processor module that transforms messages using a Groovy script.

3.5.1 Options

The groovy-transform processor has the following options:

groovy-transformer.script
Reference to a script used to process messages. (Resource, default: <none>)
groovy-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.6 Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of a JSON map document, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.6.1 Options

The header-enricher processor has the following options:

headerenricher.headers
\n separated properties representing headers in which values are SpEL expressions, e.g. foo='bar'\nbaz=payload.baz (String, default: none)
headerenricher.overwrite
set to true to overwrite any existing message headers (boolean, default: false)

3.7 Http Client Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.

3.7.1 Options

The httpclient processor has the following options:

httpclient.body
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
httpclient.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
httpclient.expected-response-type
The type used to interpret the response. (java.lang.Class<?>, default: <none>)
httpclient.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
httpclient.http-method
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
httpclient.reply-expression
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
httpclient.url-expression
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

3.8 PMML Processor

A processor that evaluates a machine learning model stored in PMML format.

3.8.1 Options

The pmml processor has the following options:

pmml.inputs
How to compute model active fields from input message properties as modelField->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)
pmml.model-location
The location of the PMML model file. (Resource, default: <none>)
pmml.model-name
If the model file contains multiple models, the name of the one to use. (String, default: <none>)
pmml.model-name-expression
If the model file contains multiple models, the name of the one to use, as a SpEL expression. (Expression, default: <none>)
pmml.outputs
How to emit evaluation results in the output message as msgProperty->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)

3.9 Scripable Transform Processor

A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

3.9.1 Options

The scriptable-transform processor has the following options:

scriptable-transformer.language
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)
scriptable-transformer.script
Text of the script. (String, default: <none>)
scriptable-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
scriptable-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.10 Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

3.10.1 Options

splitter.apply-sequence
Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)
splitter.charset
The charset to use when converting bytes in text-based files to String. (String, default: <none>)
splitter.delimiters
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)
splitter.expression
A SpEL expression for splitting payloads. (Expression, default: <none>)
splitter.file-markers
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)
splitter.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters. When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers or charset to split files, which must be text-based - they are split into lines). Otherwise, an ExpressionEvaluatingMessageSplitter is configured.

When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.

[Caution]Caution

Ambiguous properties are not allowed.

3.10.2 JSON Example

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload, '<json path expression>').

For example, consider the following JSON:

{ "store": {
    "book": [
        {
            "category": "reference",
            "author": "Nigel Rees",
            "title": "Sayings of the Century",
            "price": 8.95
        },
        {
            "category": "fiction",
            "author": "Evelyn Waugh",
            "title": "Sword of Honour",
            "price": 12.99
        },
        {
            "category": "fiction",
            "author": "Herman Melville",
            "title": "Moby Dick",
            "isbn": "0-553-21311-3",
            "price": 8.99
        },
        {
            "category": "fiction",
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings",
            "isbn": "0-395-19395-8",
            "price": 22.99
        }
    ],
    "bicycle": {
        "color": "red",
        "price": 19.95
    }
}}

and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload containing the properties of a single book.

3.11 TCP Client as a processor which connects to a TCP server, sends data to it and also receives data.

3.11.1 Options

The tcp-client processor has the following options:

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: localhost)
tcp.nio
<documentation missing> (Boolean, default: <none>)
tcp.port
<documentation missing> (Integer, default: <none>)
tcp.retry-interval
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
tcp.reverse-lookup
<documentation missing> (Boolean, default: <none>)
tcp.socket-timeout
<documentation missing> (Integer, default: <none>)
tcp.use-direct-buffers
<documentation missing> (Boolean, default: <none>)

3.12 Transform Processor

Use the transform app in a stream to convert a Message’s content or structure.

The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload. For example, --expression=payload.toUpperCase().

This transform will convert all message payloads to upper case.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

3.12.1 Options

The transform processor has the following options:

transformer.expression
<documentation missing> (Expression, default: payload)