3. Processors

3.1 Aggregator Processor

Use the aggregator application to combine multiple messages into one, based on some correlation mechanism.

This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.

3.1.1 Input


If the aggregation and correlation logic is based on the default strategies, the correlationId, sequenceNumber and sequenceSize headers must be presented in the incoming message.


Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder. If payload is JSON, the JsonPropertyAccessor helps to build straightforward SpEL expressions for correlation, release and aggregation functions.

3.1.2 Output


Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.


By default the DefaultAggregatingMessageGroupProcessor is used for aggregation function with meaning return the java.util.List of payloads of incoming messages. The custom aggregation SpEL expression may produce any required object to be sent to the output of the processor.

3.1.3 Options

The aggregator processor has the following options:

SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default: <none>)
SpEL expression for correlation key. Default to correlationId header (Expression, default: <none>)
SpEL expression for timeout to expiring uncompleted groups (Expression, default: <none>)
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default: <none>)
Message store type (String, default: <none>)
SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default: <none>)
Authentication database name. (String, default: <none>)
Database name. (String, default: <none>)
Fully qualified name of the FieldNamingStrategy to use. (java.lang.Class<?>, default: <none>)
GridFS database name. (String, default: <none>)
Mongo server host. Cannot be set with uri. (String, default: <none>)
Login password of the mongo server. Cannot be set with uri. (char[], default: <none>)
Mongo server port. Cannot be set with uri. (Integer, default: <none>)
Mongo database URI. Cannot be set with host, port and credentials. (String, default: <none>)
Login user of the mongo server. Cannot be set with uri. (String, default: <none>)
Do not stop if an error occurs while initializing the database. (Boolean, default: false)
Data (DML) script resource references. (java.util.List<java.lang.String>, default: <none>)
Password of the database to execute DML scripts. (String, default: <none>)
User of the database to execute DML scripts. (String, default: <none>)
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
Generate a random datasource name. (Boolean, default: false)
Populate the database using 'data.sql'. (Boolean, default: true)
JNDI location of the datasource. Class, url, username & password are ignored when set. (String, default: <none>)
Name of the datasource. (String, default: testdb)
Login password of the database. (String, default: <none>)
Platform to use in the schema resource (schema-${platform}.sql). (String, default: all)
Schema (DDL) script resource references. (java.util.List<java.lang.String>, default: <none>)
Password of the database to execute DDL scripts (if different). (String, default: <none>)
User of the database to execute DDL scripts (if different). (String, default: <none>)
Statement separator in SQL initialization scripts. (String, default: ;)
SQL scripts encoding. (Charset, default: <none>)
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (java.lang.Class<? extends javax.sql.DataSource>, default: <none>)
JDBC url of the database. (String, default: <none>)
Login user of the database. (String, default: <none>)
Comma-separated list of features to enable. (java.util.Set<de.flapdoodle.embed.mongo.distribution.Feature>, default: <none>)
Version of Mongo to use. (String, default: 3.2.2)
Database index used by the connection factory. (Integer, default: 0)
Redis server host. (String, default: localhost)
Login password of the redis server. (String, default: <none>)
Redis server port. (Integer, default: 6379)
Enable SSL. (Boolean, default: false)
Connection timeout in milliseconds. (Integer, default: 0)
Redis url, which will overrule host, port and password if set. (String, default: <none>)

By default the aggregator processor uses: - HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) - for correlation; - SequenceSizeReleaseStrategy - for release; - DefaultAggregatingMessageGroupProcessor - for aggregation; - SimpleMessageStore - for messageStoreType.

The aggregator application can be configured for persistent MessageGroupStore implementations. The configuration for target technology is fully based on the Spring Boot auto-configuration. But default JDBC, MongoDb and Redis auto-configurations are excluded. They are @Import ed basing on the aggregator.messageStoreType configuration property. Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator.

The JDBC JdbcMessageStore requires particular tables in the target data base. You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc package of the spring-integration-jdbc jar. Those scripts can be used for automatic data base initialization via Spring Boot.

For example:

java -jar aggregator-rabbit-1.0.0.RELEASE.jar

3.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.1.5 Examples

java -jar aggregator_processor.jar

java -jar aggregator_processor.jar
               --aggregator.release="!#this.?[payload == 'bar'].empty"
               --aggregator.aggregation="#this.?[payload == 'foo'].![payload]"

3.1.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.2 Bridge Processor

A Processor module that returns messages that is passed by connecting just the input and output channels.

3.2.1 Input




3.2.2 Output




3.2.3 Options

3.2.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.2.5 Examples

java -jar bridge-processor.jar

3.3 Filter Processor

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

3.3.1 Input





3.3.2 Output





3.3.3 Options

The filter processor has the following options:

A SpEL expression to be evaluated against each message, to decide whether or not to accept it. (Expression, default: true)

3.3.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.3.5 Examples

java -jar filter-processor.jar --expression="payload"

3.4 Groovy Filter Processor

A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.

3.4.1 Input




  • Any

3.4.2 Output




  • Any

3.4.3 Options

The groovy-filter processor has the following options:

The resource location of the groovy script (Resource, default: <none>)
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.4.5 Examples

java -jar groovy-filter-processor.jar --script=script.groovy

3.5 Groovy Transform Processor

A Processor module that transforms messages using a Groovy script.

3.5.1 Input




  • Any

3.5.2 Output




  • Any

3.5.3 Options

The groovy-transform processor has the following options:

Reference to a script used to process messages. (Resource, default: <none>)
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.5.5 Examples

java -jar groovy-transform-processor.jar --script=script.groovy

3.6 Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.6.1 Input




  • Any

3.6.2 Output




  • Any

3.6.3 Options

The header-enricher processor has the following options:

\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz (Properties, default: <none>)
set to true to overwrite any existing message headers (Boolean, default: false)

3.6.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.6.5 Examples

java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.6.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.7 Http Client Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.

3.7.1 Input


Any Required HTTP headers must be explicitly set via the headers-expression property. See examples below. Header values may also be used to construct the request body when referenced in the body-expression property.


The Message payload may be any Java type. Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended. A Map should work without too much effort. By default, the payload will become HTTP request body (if needed). You may also set the body-expression property to construct a value derived from the Message, or body to use a static (literal) value.

Internally, the processor uses RestTemplate.exchange(…​).

The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. (Note user defined payload types will require adding required dependencies to your pom file)

3.7.2 Output


No HTTP message headers are mapped to the outbound Message.


The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body.

3.7.3 Options

The httpclient processor has the following options:

The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
The type used to interpret the response. (java.lang.Class<?>, default: <none>)
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
The URL to issue an http request to, as a static value. (String, default: <none>)
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

3.7.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.7.5 Examples

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.reply-expression="statusCode.name()"

3.8 PMML Processor

A processor that evaluates a machine learning model stored in PMML format.

3.8.1 Input




  • PMML model data

3.8.2 Output




  • Tuple carrying information about the evaluated data

3.8.3 Options

The pmml processor has the following options:

How to compute model active fields from input message properties as modelField->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)
The location of the PMML model file. (Resource, default: <none>)
If the model file contains multiple models, the name of the one to use. (String, default: <none>)
If the model file contains multiple models, the name of the one to use, as a SpEL expression. (Expression, default: <none>)
How to emit evaluation results in the output message as msgProperty->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default: <none>)

3.8.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.8.5 Examples

java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelName="
java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelNameExpression="

3.9 Python Http Processor

Spring Cloud Stream App Starters for integrating with python

This application invokes a REST service, similar to the standard httpclient processor. In fact, this application embeds the httpclient processor. As a convenience for Python developers, this allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.

The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:

def input():
    return "Pre" + payload;

def output():
    return payload + "Post";

result = locals()[channel]()

The function names input and output map to the conventional channel names used by Spring Cloud Stream processors. The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable channel. Implemented with Spring Integration Scripting, headers and payload are always bound to the Message headers and payload respectively. The payload on the input side is the object you use to build the REST request. The output side transforms the response. If you don’t need any additional processing on one side, implement the function with pass as the body:

def output():

The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.


The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.



3.9.1 Input


Headers will be bound automatically to the wrapper script variable headers.


Any type. Payload will be automatically bound to the wrapper script variable payload. Jython scripts can effectively access any Java type on the app’s classpath.

3.9.2 Output


Headers may be set by the Jython wrapper script if the output() script function returns a Message.


Whatever the `output()`wrapper script function returns.


The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after the response is received. The return value of the input adapter will be the inbound payload of the httpclient processor and shoud conform to its requirements. Likewise the HTTP reply-expression will bound to be the payload when the output() function is invoked.

3.9.3 Options

The python-http processor has the following options:

The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
The label or branch to clone. (String, default: master)
The passphrase for the remote repository. (String, default: <none>)
The password for the remote repository. (String, default: <none>)
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
The URI of the remote repository. (String, default: <none>)
The username for the remote repository. (String, default: <none>)
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
The type used to interpret the response. (java.lang.Class<?>, default: <none>)
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
The URL to issue an http request to, as a static value. (String, default: <none>)
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
The Python script file name. (String, default: <none>)
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.9.5 Examples

See httpclient processor for more examples on httpclient properties.

$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl
--httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper
.variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl

3.10 Jython Processor

This application executes a Jython script that binds payload and headers variables to the Message payload and headers respectively. In addition you may provide a jython.variables property containing a (comma delimited by default) delimited string, e.g., var1=val1,var2=val2,…​.

This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.


The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.


The script is evaluated for every message which may limit your performance with high message loads. This also tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.

3.10.1 Input


Headers will be bound automatically to the script variable headers.


Any type. Payload will be automatically bound to the script variable payload.

3.10.2 Output


Headers may be set by the Jython script if the script returns a Message.


Whatever the script returns.



3.10.3 Options

The jython processor has the following options:

The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
The label or branch to clone. (String, default: master)
The passphrase for the remote repository. (String, default: <none>)
The password for the remote repository. (String, default: <none>)
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
The URI of the remote repository. (String, default: <none>)
The username for the remote repository. (String, default: <none>)
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
The Python script file name. (String, default: <none>)
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.10.5 Examples

$java -jar python-jython-processor.jar --jython.script=/local/directory/to_uppercase.py

$java -jar python-jython-processor.jar --git.uri=https://github.com/some-repo --jython
.script=map-tweet-sentiments.py --jython.variables=neutral=0.45,positive=0.55

3.11 Scripable Transform Processor

A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

3.11.1 Input




  • Any

3.11.2 Output




  • Any

3.11.3 Options

The scriptable-transform processor has the following options:

Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)
Text of the script. (String, default: <none>)
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.11.5 Examples

java -jar scriptable-transform-processor.jar --language=ruby --script="return ""#{payload.upcase}"""

3.12 Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

3.12.1 Input




  • Any

3.12.2 Output




  • A collection of split messages based on a given expression, delimiter or file marker.

3.12.3 Options

Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)
The charset to use when converting bytes in text-based files to String. (String, default: <none>)
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)
A SpEL expression for splitting payloads. (Expression, default: <none>)
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters. When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers or charset to split files, which must be text-based - they are split into lines). Otherwise, an ExpressionEvaluatingMessageSplitter is configured.

When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.


Ambiguous properties are not allowed.

3.12.4 JSON Example

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload, '<json path expression>').

For example, consider the following JSON:

{ "store": {
    "book": [
            "category": "reference",
            "author": "Nigel Rees",
            "title": "Sayings of the Century",
            "price": 8.95
            "category": "fiction",
            "author": "Evelyn Waugh",
            "title": "Sword of Honour",
            "price": 12.99
            "category": "fiction",
            "author": "Herman Melville",
            "title": "Moby Dick",
            "isbn": "0-553-21311-3",
            "price": 8.99
            "category": "fiction",
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings",
            "isbn": "0-395-19395-8",
            "price": 22.99
    "bicycle": {
        "color": "red",
        "price": 19.95

and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload containing the properties of a single book.

3.12.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.12.6 Examples

java -jar splitter-processor.jar --splitter.expression=expression
java -jar splitter-processor.jar --splitter.delimiters=delimiters

3.13 TCP Client as a processor which connects to a TCP server, sends data to it and also receives data.

3.13.1 Input


  • Content-Type: application/octet-stream


  • byte[]


  • Content-Type: text/plain


  • String

3.13.2 Output


  • Content-Type: application/octet-stream


  • byte[]

3.13.3 Options

The tcp-client processor has the following options:

The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
The charset used when converting from bytes to String. (String, default: UTF-8)
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
The host to which this sink will connect. (String, default: localhost)
<documentation missing> (Boolean, default: <none>)
<documentation missing> (Integer, default: <none>)
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
<documentation missing> (Boolean, default: <none>)
<documentation missing> (Integer, default: <none>)
<documentation missing> (Boolean, default: <none>)

3.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.13.5 Examples

java -jar tcp-client-processor.jar --tcp.decoder=LF --tcp.encoder=LF

3.14 Transform Processor

Use the transform app in a stream to convert a Message’s content or structure.

The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

3.14.1 Input




  • Any

3.14.2 Output




  • Any

3.14.3 Options

The transform processor has the following options:

<documentation missing> (Expression, default: payload)

3.14.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.14.5 Examples

java -jar transform-processor.jar --expression=payload.toUpperCase()

This transform will convert all message payloads to upper case.

3.15 TensorFlow Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.


3.15.1 Input




The TensorFlow Processor uses a TensorflowInputConverter to convert the input data into data format compliant with the TensorFlow Model used. The input converter converts the input Messages into key/value Map, where the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType compliant value. The default converter implementation expects either Map payload or flat json message that can be converted into a Map.

The TensorflowInputConverter can be extended and customized. See TwitterSentimentTensorflowInputConverter.java for example.

Following snippet shows how to export any TensorFlow model (trained as well) into ProtocolBuffer binary format as required by the Processor.

from tensorflow.python.framework.graph_util import convert_variables_to_constants
SAVE_DIR = os.path.abspath(os.path.curdir)
minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>'])
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False)
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)

3.15.2 Output




Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses Tuple triple.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.15.3 Options

The tensorflow processor has the following options:

How to obtain the input data from the input message. If empty it defaults to the input message payload. The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for a Tuple key. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
Defines how to store the output data and if the input payload is passed through or discarded. Payload (Default) stores the output data in the outbound message payload. The input payload is discarded. Header stores the output data in outputName message's header. The the input payload is passed through. Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default: <none>, possible values: payload,tuple,header)
The location of the TensorFlow model file. (Resource, default: <none>)
The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default: <none>)
The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default: 0)
The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default: <none>)

3.15.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.15.5 Examples

java -jar tensorflow-processor.jar --model= --modelFetch= --mode="

3.16 Twitter Sentiment Analysis Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn

SCDF TF Sentiment

Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow

3.16.1 Input


  • content-type: application/json


  • JSON tweet message

3.16.2 Output


  • content-type: application/json


Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:



Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses Tuple triple.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.16.3 Options

The twitter-sentiment processor has the following options:

How to obtain the input data from the input message. If empty it defaults to the input message payload. The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for a Tuple key. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
Defines how to store the output data and if the input payload is passed through or discarded. Payload (Default) stores the output data in the outbound message payload. The input payload is discarded. Header stores the output data in outputName message's header. The the input payload is passed through. Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default: <none>, possible values: payload,tuple,header)
The location of the TensorFlow model file. (Resource, default: <none>)
The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default: <none>)
The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default: 0)
The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default: <none>)
The location of the word vocabulary file, used for training the model (Resource, default: <none>)

3.16.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.16.5 Examples

java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \
    --tensorflow.modelFetch= --tensorflow.mode="

And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream source and using the pre-build minimal_graph.proto and vocab.csv:

tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \
| filter --expression=#jsonPath(payload,'$.lang')=='en' \
| twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \
   --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \
   --model-fetch=output/Softmax \
| log