3. Processors

3.1 Aggregator Processor

Use the aggregator application to combine multiple messages into one, based on some correlation mechanism.

This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.

3.1.1 Input

Headers

If the aggregation and correlation logic is based on the default strategies, the correlationId, sequenceNumber and sequenceSize headers must be presented in the incoming message.

Payload

Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder. If payload is JSON, the JsonPropertyAccessor helps to build straightforward SpEL expressions for correlation, release and aggregation functions.

3.1.2 Output

Headers

Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.

Payload

By default the DefaultAggregatingMessageGroupProcessor is used for aggregation function with meaning return the java.util.List of payloads of incoming messages. The custom aggregation SpEL expression may produce any required object to be sent to the output of the processor.

3.1.3 Options

The aggregator processor has the following options:

aggregator.aggregation
SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default: <none>)
aggregator.correlation
SpEL expression for correlation key. Default to correlationId header (Expression, default: <none>)
aggregator.group-timeout
SpEL expression for timeout to expiring uncompleted groups (Expression, default: <none>)
aggregator.message-store-entity
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default: <none>)
aggregator.message-store-type
Message store type (String, default: <none>)
aggregator.release
SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default: <none>)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (java.lang.Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with uri. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with uri. (char[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with uri. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: <none>)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with uri. (String, default: <none>)
spring.datasource.continue-on-error
Do not stop if an error occurs while initializing the database. (Boolean, default: false)
spring.datasource.data
Data (DML) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.data-password
Password of the database to execute DML scripts. (String, default: <none>)
spring.datasource.data-username
User of the database to execute DML scripts. (String, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.generate-unique-name
Generate a random datasource name. (Boolean, default: false)
spring.datasource.initialize
Populate the database using 'data.sql'. (Boolean, default: true)
spring.datasource.jndi-name
JNDI location of the datasource. Class, url, username & password are ignored when set. (String, default: <none>)
spring.datasource.name
Name of the datasource. (String, default: testdb)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.platform
Platform to use in the schema resource (schema-${platform}.sql). (String, default: all)
spring.datasource.schema
Schema (DDL) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.schema-password
Password of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.schema-username
User of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.separator
Statement separator in SQL initialization scripts. (String, default: ;)
spring.datasource.sql-script-encoding
SQL scripts encoding. (Charset, default: <none>)
spring.datasource.type
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (java.lang.Class<? extends javax.sql.DataSource>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login user of the database. (String, default: <none>)
spring.mongodb.embedded.features
Comma-separated list of features to enable. (java.util.Set<de.flapdoodle.embed.mongo.distribution.Feature>, default: <none>)
spring.mongodb.embedded.version
Version of Mongo to use. (String, default: 3.2.2)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Enable SSL. (Boolean, default: false)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)
spring.redis.url
Redis url, which will overrule host, port and password if set. (String, default: <none>)

By default the aggregator processor uses: - HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) - for correlation; - SequenceSizeReleaseStrategy - for release; - DefaultAggregatingMessageGroupProcessor - for aggregation; - SimpleMessageStore - for messageStoreType.

The aggregator application can be configured for persistent MessageGroupStore implementations. The configuration for target technology is fully based on the Spring Boot auto-configuration. But default JDBC, MongoDb and Redis auto-configurations are excluded. They are @Import ed basing on the aggregator.messageStoreType configuration property. Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator.

The JDBC JdbcMessageStore requires particular tables in the target data base. You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc package of the spring-integration-jdbc jar. Those scripts can be used for automatic data base initialization via Spring Boot.

For example:

java -jar aggregator-rabbit-1.0.0.RELEASE.jar
               --aggregator.message-store-type=jdbc
               --spring.datasource.url=jdbc:h2:mem:test
               --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql

3.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.1.5 Examples

java -jar aggregator_processor.jar
               --aggregator.message-store-type=jdbc
               --spring.datasource.url=jdbc:h2:mem:test
               --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql

java -jar aggregator_processor.jar
               --spring.data.mongodb.port=0
               --aggregator.correlation=T(Thread).currentThread().id
               --aggregator.release="!#this.?[payload == 'bar'].empty"
               --aggregator.aggregation="#this.?[payload == 'foo'].![payload]"
               --aggregator.message-store-type=mongodb
               --aggregator.message-store-entity=aggregatorTest

3.1.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.2 Bridge Processor

A Processor module that returns messages that is passed by connecting just the input and output channels.

3.2.1 Input

Headers

Payload

Any

3.2.2 Output

Headers

Payload

Any

3.2.3 Options

3.2.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.2.5 Examples

java -jar bridge-processor.jar

3.3 Filter Processor

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

3.3.1 Input

Headers

N/A

Payload

Any

3.3.2 Output

Headers

N/A

Payload

Any

3.3.3 Options

The filter processor has the following options:

filter.expression
A SpEL expression to be evaluated against each message, to decide whether or not to accept it. (Expression, default: true)

3.3.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.3.5 Examples

java -jar filter-processor.jar --expression="payload"

3.4 Groovy Filter Processor

A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.

3.4.1 Input

Headers

N/A

Payload

  • Any

3.4.2 Output

Headers

N/A

Payload

  • Any

3.4.3 Options

The groovy-filter processor has the following options:

groovy-filter.script
The resource location of the groovy script (Resource, default: <none>)
groovy-filter.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-filter.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.4.5 Examples

java -jar groovy-filter-processor.jar --script=script.groovy

3.5 Groovy Transform Processor

A Processor module that transforms messages using a Groovy script.

3.5.1 Input

Headers

N/A

Payload

  • Any

3.5.2 Output

Headers

N/A

Payload

  • Any

3.5.3 Options

The groovy-transform processor has the following options:

groovy-transformer.script
Reference to a script used to process messages. (Resource, default: <none>)
groovy-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.5.5 Examples

java -jar groovy-transform-processor.jar --script=script.groovy

3.6 Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.6.1 Input

Headers

N/A

Payload

  • Any

3.6.2 Output

Headers

N/A

Payload

  • Any

3.6.3 Options

The header-enricher processor has the following options:

header.enricher.headers
\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz (Properties, default: <none>)
header.enricher.overwrite
set to true to overwrite any existing message headers (Boolean, default: false)

3.6.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.6.5 Examples

java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.6.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.7 Http Client Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.

3.7.1 Input

Headers

Any Required HTTP headers must be explicitly set via the headers-expression property. See examples below. Header values may also be used to construct the request body when referenced in the body-expression property.

Payload

The Message payload may be any Java type. Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended. A Map should work without too much effort. By default, the payload will become HTTP request body (if needed). You may also set the body-expression property to construct a value derived from the Message, or body to use a static (literal) value.

Internally, the processor uses RestTemplate.exchange(…​).

The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. (Note user defined payload types will require adding required dependencies to your pom file)

3.7.2 Output

Headers

No HTTP message headers are mapped to the outbound Message.

Payload

The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body.

3.7.3 Options

The httpclient processor has the following options:

httpclient.body
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
httpclient.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
httpclient.expected-response-type
The type used to interpret the response. (java.lang.Class<?>, default: <none>)
httpclient.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
httpclient.http-method
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
httpclient.reply-expression
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
httpclient.url
The URL to issue an http request to, as a static value. (String, default: <none>)
httpclient.url-expression
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

3.7.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.7.5 Examples

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.reply-expression="statusCode.name()"

3.8 PMML Processor

A processor that evaluates a machine learning model stored in PMML format.

3.8.1 Input

Headers

N/A

Payload

  • PMML model data

3.8.2 Output

Headers

N/A

Payload

  • Tuple carrying information about the evaluated data

3.8.3 Options

The pmml processor has the following options:

pmml.inputs
How to compute model active fields from input message properties as modelField->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)
pmml.model-location
The location of the PMML model file. (Resource, default: <none>)
pmml.model-name
If the model file contains multiple models, the name of the one to use. (String, default: <none>)
pmml.model-name-expression
If the model file contains multiple models, the name of the one to use, as a SpEL expression. (Expression, default: <none>)
pmml.outputs
How to emit evaluation results in the output message as msgProperty->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)

3.8.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.8.5 Examples

java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelName="
java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelNameExpression="

3.9 Python Http Processor

Spring Cloud Stream App Starters for integrating with python

This application invokes a REST service, similar to the standard httpclient processor. In fact, this application embeds the httpclient processor. As a convenience for Python developers, this allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.

The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:

def input():
    return "Pre" + payload;

def output():
    return payload + "Post";

result = locals()[channel]()

The function names input and output map to the conventional channel names used by Spring Cloud Stream processors. The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable channel. Implemented with Spring Integration Scripting, headers and payload are always bound to the Message headers and payload respectively. The payload on the input side is the object you use to build the REST request. The output side transforms the response. If you don’t need any additional processing on one side, implement the function with pass as the body:

def output():
   pass
[Note]Note

The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.

[Note]Note

The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.

 

PythonHttpProcessor

3.9.1 Input

Headers

Headers will be bound automatically to the wrapper script variable headers.

Payload

Any type. Payload will be automatically bound to the wrapper script variable payload. Jython scripts can effectively access any Java type on the app’s classpath.

3.9.2 Output

Headers

Headers may be set by the Jython wrapper script if the output() script function returns a Message.

Payload

Whatever the `output()`wrapper script function returns.

[Note]Note

The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after the response is received. The return value of the input adapter will be the inbound payload of the httpclient processor and shoud conform to its requirements. Likewise the HTTP reply-expression will bound to be the payload when the output() function is invoked.

3.9.3 Options

The python-http processor has the following options:

git.basedir
The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
git.clone-on-start
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
git.label
The label or branch to clone. (String, default: master)
git.passphrase
The passphrase for the remote repository. (String, default: <none>)
git.password
The password for the remote repository. (String, default: <none>)
git.timeout
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
git.uri
The URI of the remote repository. (String, default: <none>)
git.username
The username for the remote repository. (String, default: <none>)
httpclient.body
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
httpclient.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
httpclient.expected-response-type
The type used to interpret the response. (java.lang.Class<?>, default: <none>)
httpclient.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
httpclient.http-method
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
httpclient.reply-expression
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
httpclient.url
The URL to issue an http request to, as a static value. (String, default: <none>)
httpclient.url-expression
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)
wrapper.delimiter
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
wrapper.script
The Python script file name. (String, default: <none>)
wrapper.variables
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.9.5 Examples

See httpclient processor for more examples on httpclient properties.

$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl
--httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper
.variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl

3.10 Jython Processor

This application executes a Jython script that binds payload and headers variables to the Message payload and headers respectively. In addition you may provide a jython.variables property containing a (comma delimited by default) delimited string, e.g., var1=val1,var2=val2,…​.

This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.

[Note]Note

The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.

[Note]Note

The script is evaluated for every message which may limit your performance with high message loads. This also tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.

3.10.1 Input

Headers

Headers will be bound automatically to the script variable headers.

Payload

Any type. Payload will be automatically bound to the script variable payload.

3.10.2 Output

Headers

Headers may be set by the Jython script if the script returns a Message.

Payload

Whatever the script returns.

 

JythonProcessor

3.10.3 Options

The jython processor has the following options:

git.basedir
The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
git.clone-on-start
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
git.label
The label or branch to clone. (String, default: master)
git.passphrase
The passphrase for the remote repository. (String, default: <none>)
git.password
The password for the remote repository. (String, default: <none>)
git.timeout
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
git.uri
The URI of the remote repository. (String, default: <none>)
git.username
The username for the remote repository. (String, default: <none>)
jython.delimiter
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
jython.script
The Python script file name. (String, default: <none>)
jython.variables
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.10.5 Examples

$java -jar python-jython-processor.jar --jython.script=/local/directory/to_uppercase.py

$java -jar python-jython-processor.jar --git.uri=https://github.com/some-repo --jython
.script=map-tweet-sentiments.py --jython.variables=neutral=0.45,positive=0.55

3.11 Scripable Transform Processor

A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

3.11.1 Input

Headers

N/A

Payload

  • Any

3.11.2 Output

Headers

N/A

Payload

  • Any

3.11.3 Options

The scriptable-transform processor has the following options:

scriptable-transformer.language
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)
scriptable-transformer.script
Text of the script. (String, default: <none>)
scriptable-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
scriptable-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.11.5 Examples

java -jar scriptable-transform-processor.jar --language=ruby --script="return ""#{payload.upcase}"""

3.12 Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

3.12.1 Input

Headers

N/A

Payload

  • Any

3.12.2 Output

Headers

N/A

Payload

  • A collection of split messages based on a given expression, delimiter or file marker.

3.12.3 Options

splitter.apply-sequence
Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)
splitter.charset
The charset to use when converting bytes in text-based files to String. (String, default: <none>)
splitter.delimiters
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)
splitter.expression
A SpEL expression for splitting payloads. (Expression, default: <none>)
splitter.file-markers
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)
splitter.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters. When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers or charset to split files, which must be text-based - they are split into lines). Otherwise, an ExpressionEvaluatingMessageSplitter is configured.

When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.

[Caution]Caution

Ambiguous properties are not allowed.

3.12.4 JSON Example

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload, '<json path expression>').

For example, consider the following JSON:

{ "store": {
    "book": [
        {
            "category": "reference",
            "author": "Nigel Rees",
            "title": "Sayings of the Century",
            "price": 8.95
        },
        {
            "category": "fiction",
            "author": "Evelyn Waugh",
            "title": "Sword of Honour",
            "price": 12.99
        },
        {
            "category": "fiction",
            "author": "Herman Melville",
            "title": "Moby Dick",
            "isbn": "0-553-21311-3",
            "price": 8.99
        },
        {
            "category": "fiction",
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings",
            "isbn": "0-395-19395-8",
            "price": 22.99
        }
    ],
    "bicycle": {
        "color": "red",
        "price": 19.95
    }
}}

and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload containing the properties of a single book.

3.12.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.12.6 Examples

java -jar splitter-processor.jar --splitter.expression=expression
java -jar splitter-processor.jar --splitter.delimiters=delimiters

3.13 TCP Client as a processor which connects to a TCP server, sends data to it and also receives data.

3.13.1 Input

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

Headers:

  • Content-Type: text/plain

Payload:

  • String

3.13.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

3.13.3 Options

The tcp-client processor has the following options:

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: localhost)
tcp.nio
<documentation missing> (Boolean, default: <none>)
tcp.port
<documentation missing> (Integer, default: <none>)
tcp.retry-interval
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
tcp.reverse-lookup
<documentation missing> (Boolean, default: <none>)
tcp.socket-timeout
<documentation missing> (Integer, default: <none>)
tcp.use-direct-buffers
<documentation missing> (Boolean, default: <none>)

3.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.13.5 Examples

java -jar tcp-client-processor.jar --tcp.decoder=LF --tcp.encoder=LF

3.14 Transform Processor

Use the transform app in a stream to convert a Message’s content or structure.

The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

3.14.1 Input

Headers

N/A

Payload

  • Any

3.14.2 Output

Headers

N/A

Payload

  • Any

3.14.3 Options

The transform processor has the following options:

transformer.expression
<documentation missing> (Expression, default: payload)

3.14.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.14.5 Examples

java -jar transform-processor.jar --expression=payload.toUpperCase()

This transform will convert all message payloads to upper case.

3.15 TensorFlow Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.

TensorFlowProcessorArcutectureOverview

3.15.1 Input

Headers

N/A

Payload

The TensorFlow Processor uses a TensorflowInputConverter to convert the input data into data format compliant with the TensorFlow Model used. The input converter converts the input Messages into key/value Map, where the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType compliant value. The default converter implementation expects either Map payload or flat json message that can be converted into a Map.

The TensorflowInputConverter can be extended and customized. See TwitterSentimentTensorflowInputConverter.java for example.

Following snippet shows how to export any TensorFlow model (trained as well) into ProtocolBuffer binary format as required by the Processor.

from tensorflow.python.framework.graph_util import convert_variables_to_constants
...
SAVE_DIR = os.path.abspath(os.path.curdir)
minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>'])
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False)
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)

3.15.2 Output

Headers

N/A

Payload

Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses Tuple triple.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.15.3 Options

The tensorflow processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for a Tuple key. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
Defines how to store the output data and if the input payload is passed through or discarded. Payload (Default) stores the output data in the outbound message payload. The input payload is discarded. Header stores the output data in outputName message's header. The the input payload is passed through. Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default: <none>, possible values: payload,tuple,header)
tensorflow.model
The location of the TensorFlow model file. (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default: <none>)
tensorflow.model-fetch-index
The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default: 0)
tensorflow.output-name
The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default: <none>)

3.15.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.15.5 Examples

java -jar tensorflow-processor.jar --model= --modelFetch= --mode="

3.16 Twitter Sentiment Analysis Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn

SCDF TF Sentiment

Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow

3.16.1 Input

Headers

  • content-type: application/json

Payload

  • JSON tweet message

3.16.2 Output

Headers

  • content-type: application/json

Payload

Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:

N/A

Payload

Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses Tuple triple.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.16.3 Options

The twitter-sentiment processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for a Tuple key. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
Defines how to store the output data and if the input payload is passed through or discarded. Payload (Default) stores the output data in the outbound message payload. The input payload is discarded. Header stores the output data in outputName message's header. The the input payload is passed through. Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default: <none>, possible values: payload,tuple,header)
tensorflow.model
The location of the TensorFlow model file. (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default: <none>)
tensorflow.model-fetch-index
The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default: 0)
tensorflow.output-name
The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default: <none>)
tensorflow.twitter.vocabulary
The location of the word vocabulary file, used for training the model (Resource, default: <none>)

3.16.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.16.5 Examples

java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \
    --tensorflow.modelFetch= --tensorflow.mode="

And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream source and using the pre-build minimal_graph.proto and vocab.csv:

tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \
| filter --expression=#jsonPath(payload,'$.lang')=='en' \
| twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \
   --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \
   --model-fetch=output/Softmax \
| log