Use the aggregator
application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration.
So, please, consult there for use-cases and functionality.
If the aggregation and correlation logic is based on the default strategies, the correlationId
, sequenceNumber
and sequenceSize
headers must be presented in the incoming message.
Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler
and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder.
If payload is JSON, the JsonPropertyAccessor
helps to build straightforward SpEL expressions for correlation, release and aggregation functions.
Returns all headers of the incoming messages that have no conflicts among the group.
An absent header on one or more messages within the group is not considered a conflict.
By default the DefaultAggregatingMessageGroupProcessor
is used for aggregation function with meaning return the java.util.List
of payloads of incoming messages.
The custom aggregation
SpEL expression may produce any required object to be sent to the output of the processor.
The aggregator processor has the following options:
- aggregator.aggregation
- SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default:
<none>
) - aggregator.correlation
- SpEL expression for correlation key. Default to correlationId header (Expression, default:
<none>
) - aggregator.group-timeout
- SpEL expression for timeout to expiring uncompleted groups (Expression, default:
<none>
) - aggregator.message-store-entity
- Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default:
<none>
) - aggregator.message-store-type
- Message store type (String, default:
<none>
) - aggregator.release
- SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default:
<none>
) - spring.data.mongodb.authentication-database
- Authentication database name. (String, default:
<none>
) - spring.data.mongodb.database
- Database name. (String, default:
<none>
) - spring.data.mongodb.field-naming-strategy
- Fully qualified name of the FieldNamingStrategy to use. (java.lang.Class<?>, default:
<none>
) - spring.data.mongodb.grid-fs-database
- GridFS database name. (String, default:
<none>
) - spring.data.mongodb.host
- Mongo server host. Cannot be set with uri. (String, default:
<none>
) - spring.data.mongodb.password
- Login password of the mongo server. Cannot be set with uri. (char[], default:
<none>
) - spring.data.mongodb.port
- Mongo server port. Cannot be set with uri. (Integer, default:
<none>
) - spring.data.mongodb.uri
- Mongo database URI. Cannot be set with host, port and credentials. (String, default:
<none>
) - spring.data.mongodb.username
- Login user of the mongo server. Cannot be set with uri. (String, default:
<none>
) - spring.datasource.continue-on-error
- Do not stop if an error occurs while initializing the database. (Boolean, default:
false
) - spring.datasource.data
- Data (DML) script resource references. (java.util.List<java.lang.String>, default:
<none>
) - spring.datasource.data-password
- Password of the database to execute DML scripts. (String, default:
<none>
) - spring.datasource.data-username
- User of the database to execute DML scripts. (String, default:
<none>
) - spring.datasource.driver-class-name
- Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.generate-unique-name
- Generate a random datasource name. (Boolean, default:
false
) - spring.datasource.initialize
- Populate the database using 'data.sql'. (Boolean, default:
true
) - spring.datasource.jndi-name
- JNDI location of the datasource. Class, url, username & password are ignored when
set. (String, default:
<none>
) - spring.datasource.name
- Name of the datasource. (String, default:
testdb
) - spring.datasource.password
- Login password of the database. (String, default:
<none>
) - spring.datasource.platform
- Platform to use in the schema resource (schema-${platform}.sql). (String, default:
all
) - spring.datasource.schema
- Schema (DDL) script resource references. (java.util.List<java.lang.String>, default:
<none>
) - spring.datasource.schema-password
- Password of the database to execute DDL scripts (if different). (String, default:
<none>
) - spring.datasource.schema-username
- User of the database to execute DDL scripts (if different). (String, default:
<none>
) - spring.datasource.separator
- Statement separator in SQL initialization scripts. (String, default:
;
) - spring.datasource.sql-script-encoding
- SQL scripts encoding. (Charset, default:
<none>
) - spring.datasource.type
- Fully qualified name of the connection pool implementation to use. By default, it
is auto-detected from the classpath. (java.lang.Class<? extends javax.sql.DataSource>, default:
<none>
) - spring.datasource.url
- JDBC url of the database. (String, default:
<none>
) - spring.datasource.username
- Login user of the database. (String, default:
<none>
) - spring.mongodb.embedded.features
- Comma-separated list of features to enable. (java.util.Set<de.flapdoodle.embed.mongo.distribution.Feature>, default:
<none>
) - spring.mongodb.embedded.version
- Version of Mongo to use. (String, default:
3.2.2
) - spring.redis.database
- Database index used by the connection factory. (Integer, default:
0
) - spring.redis.host
- Redis server host. (String, default:
localhost
) - spring.redis.password
- Login password of the redis server. (String, default:
<none>
) - spring.redis.port
- Redis server port. (Integer, default:
6379
) - spring.redis.ssl
- Enable SSL. (Boolean, default:
false
) - spring.redis.timeout
- Connection timeout in milliseconds. (Integer, default:
0
) - spring.redis.url
- Redis url, which will overrule host, port and password if set. (String, default:
<none>
)
By default the aggregator
processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
- for correlation
;
- SequenceSizeReleaseStrategy
- for release
;
- DefaultAggregatingMessageGroupProcessor
- for aggregation
;
- SimpleMessageStore
- for messageStoreType
.
The aggregator
application can be configured for persistent MessageGroupStore
implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import
ed basing on the aggregator.messageStoreType
configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator
.
The JDBC JdbcMessageStore
requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc
package of the spring-integration-jdbc
jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE.jar
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar aggregator_processor.jar
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
java -jar aggregator_processor.jar
--spring.data.mongodb.port=0
--aggregator.correlation=T(Thread).currentThread().id
--aggregator.release="!#this.?[payload == 'bar'].empty"
--aggregator.aggregation="#this.?[payload == 'foo'].![payload]"
--aggregator.message-store-type=mongodb
--aggregator.message-store-entity=aggregatorTest
This project adheres to the Contributor Covenant code of conduct.
By participating, you are expected to uphold this code.
Please report unacceptable behavior to [email protected].
A Processor module that returns messages that is passed by connecting just the input and output channels.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar bridge-processor.jar
Use the filter module in a stream to determine whether a Message should be passed to the output channel.
The filter processor has the following options:
- filter.expression
- A SpEL expression to be evaluated against each message, to decide whether or not to accept it. (Expression, default:
true
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar filter-processor.jar --expression="payload"
3.4 Groovy Filter Processor
A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
- groovy-filter.script
- The resource location of the groovy script (Resource, default:
<none>
) - groovy-filter.variables
- Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - groovy-filter.variables-location
- The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar groovy-filter-processor.jar --script=script.groovy
3.5 Groovy Transform Processor
A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
- groovy-transformer.script
- Reference to a script used to process messages. (Resource, default:
<none>
) - groovy-transformer.variables
- Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - groovy-transformer.variables-location
- The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar groovy-transform-processor.jar --script=script.groovy
3.6 Header Enricher Processor
Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
The header-enricher processor has the following options:
- header.enricher.headers
- \n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz (Properties, default:
<none>
) - header.enricher.overwrite
- set to true to overwrite any existing message headers (Boolean, default:
false
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
3.7 Http Client Processor
A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
Any Required HTTP headers must be explicitly set via the headers-expression
property. See examples below.
Header values may also be used to construct the request body when referenced in the body-expression
property.
The Message payload may be any Java type.
Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended.
A Map should work without too much effort.
By default, the payload will become HTTP request body (if needed).
You may also set the body-expression
property to construct a value derived from the Message, or body
to use a static (literal) value.
Internally, the processor uses RestTemplate.exchange(…).
The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
(Note user defined payload types will require adding required dependencies to your pom file)
No HTTP message headers are mapped to the outbound Message.
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
The httpclient processor has the following options:
- httpclient.body
- The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default:
<none>
) - httpclient.body-expression
- A SpEL expression to derive the request body from the incoming message. (Expression, default:
<none>
) - httpclient.expected-response-type
- The type used to interpret the response. (java.lang.Class<?>, default:
<none>
) - httpclient.headers-expression
- A SpEL expression used to derive the http headers map to use. (Expression, default:
<none>
) - httpclient.http-method
- The kind of http method to use. (HttpMethod, default:
<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
) - httpclient.reply-expression
- A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default:
body
) - httpclient.url
- The URL to issue an http request to, as a static value. (String, default:
<none>
) - httpclient.url-expression
- A SpEL expression against incoming message to determine the URL to use. (Expression, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
$java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"
$java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.reply-expression="statusCode.name()"
A processor that evaluates a machine learning model stored in PMML format.
Tuple carrying information about the evaluated data
The pmml processor has the following options:
- pmml.inputs
- How to compute model active fields from input message properties as modelField->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default:
<none>
) - pmml.model-location
- The location of the PMML model file. (Resource, default:
<none>
) - pmml.model-name
- If the model file contains multiple models, the name of the one to use. (String, default:
<none>
) - pmml.model-name-expression
- If the model file contains multiple models, the name of the one to use, as a SpEL expression. (Expression, default:
<none>
) - pmml.outputs
- How to emit evaluation results in the output message as msgProperty->SpEL. (java.util.Map<java.lang.String,org.springframework.expression.Expression>, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelName="
java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelNameExpression="
3.9 Python Http Processor
Spring Cloud Stream App Starters for integrating with python
This application invokes a REST service, similar to the standard httpclient processor. In fact,
this application embeds the httpclient processor. As a convenience for Python developers, this
allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to
perform any necessary data transformation. If you don’t require any custom transformations, just use the
httpclient processor.
The diagram shows input and output adapters as conceptual components. These are actually implemented as
functions defined in a single script that must conform to a simple convention:
def input():
return "Pre" + payload;
def output():
return payload + "Post";
result = locals()[channel]()
The function names input
and output
map to the conventional channel names used by Spring Cloud Stream processors.
The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable
channel
. Implemented with Spring Integration Scripting, headers
and payload
are always bound to the Message
headers and payload respectively. The payload on the input
side is the object you use to build the REST request.
The output
side transforms the response. If you don’t need any additional processing on one side, implement the
function with pass
as the body:
def output():
pass
| Note |
---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
| Note |
---|
The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts
stress on the JRE Metaspace memory region (or Permgen if using
a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore
limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize .
(see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may
override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS . You also
need to increase the container memory accordingly. Similar tuning is advised in any containerized environment. |
Headers will be bound automatically to the wrapper script variable headers
.
Any type. Payload will be automatically bound to the wrapper script variable payload
. Jython scripts can
effectively access any Java type on the app’s classpath.
Headers may be set by the Jython wrapper script if the output()
script function returns a Message.
Whatever the `output()`wrapper script function returns.
| Note |
---|
The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after
the response is received. The return value of the input adapter will be the inbound payload of the
httpclient processor and shoud conform to its requirements. Likewise
the HTTP reply-expression will bound to be the payload when the output() function is invoked. |
The python-http processor has the following options:
- git.basedir
- The base directory where the repository should be cloned. If not specified, a temporary directory will be
created. (File, default:
<none>
) - git.clone-on-start
- Flag to indicate that the repository should be cloned on startup (not on demand).
Generally leads to slower startup but faster first query. (Boolean, default:
true
) - git.label
- The label or branch to clone. (String, default:
master
) - git.passphrase
- The passphrase for the remote repository. (String, default:
<none>
) - git.password
- The password for the remote repository. (String, default:
<none>
) - git.timeout
- Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default
5 seconds. (Integer, default:
5
) - git.uri
- The URI of the remote repository. (String, default:
<none>
) - git.username
- The username for the remote repository. (String, default:
<none>
) - httpclient.body
- The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default:
<none>
) - httpclient.body-expression
- A SpEL expression to derive the request body from the incoming message. (Expression, default:
<none>
) - httpclient.expected-response-type
- The type used to interpret the response. (java.lang.Class<?>, default:
<none>
) - httpclient.headers-expression
- A SpEL expression used to derive the http headers map to use. (Expression, default:
<none>
) - httpclient.http-method
- The kind of http method to use. (HttpMethod, default:
<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
) - httpclient.reply-expression
- A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default:
body
) - httpclient.url
- The URL to issue an http request to, as a static value. (String, default:
<none>
) - httpclient.url-expression
- A SpEL expression against incoming message to determine the URL to use. (Expression, default:
<none>
) - wrapper.delimiter
- The variable delimiter. (Delimiter, default:
<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
) - wrapper.script
- The Python script file name. (String, default:
<none>
) - wrapper.variables
- Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
See httpclient processor for more examples on
httpclient
properties.
$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl
--httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"
$java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper
.variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl
This application executes a Jython script that binds payload
and headers
variables to the Message payload
and headers respectively. In addition you may provide a jython.variables
property containing a (comma delimited by
default) delimited string, e.g., var1=val1,var2=val2,…
.
This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.
| Note |
---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
| Note |
---|
The script is evaluated for every message which may limit your performance with high message loads. This also tends
to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using
a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore
limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize .
(see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may
override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS . You also
need to increase the container memory accordingly. Similar tuning is advised in any containerized environment. |
Headers will be bound automatically to the script variable headers
.
Any type. Payload will be automatically bound to the script variable payload
.
Headers may be set by the Jython script if the script returns a Message.
Whatever the script returns.
The jython processor has the following options:
- git.basedir
- The base directory where the repository should be cloned. If not specified, a temporary directory will be
created. (File, default:
<none>
) - git.clone-on-start
- Flag to indicate that the repository should be cloned on startup (not on demand).
Generally leads to slower startup but faster first query. (Boolean, default:
true
) - git.label
- The label or branch to clone. (String, default:
master
) - git.passphrase
- The passphrase for the remote repository. (String, default:
<none>
) - git.password
- The password for the remote repository. (String, default:
<none>
) - git.timeout
- Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default
5 seconds. (Integer, default:
5
) - git.uri
- The URI of the remote repository. (String, default:
<none>
) - git.username
- The username for the remote repository. (String, default:
<none>
) - jython.delimiter
- The variable delimiter. (Delimiter, default:
<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
) - jython.script
- The Python script file name. (String, default:
<none>
) - jython.variables
- Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
$java -jar python-jython-processor.jar --jython.script=/local/directory/to_uppercase.py
$java -jar python-jython-processor.jar --git.uri=https://github.com/some-repo --jython
.script=map-tweet-sentiments.py --jython.variables=neutral=0.45,positive=0.55
3.11 Scripable Transform Processor
A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly
as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
- scriptable-transformer.language
- Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default:
<none>
) - scriptable-transformer.script
- Text of the script. (String, default:
<none>
) - scriptable-transformer.variables
- Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - scriptable-transformer.variables-location
- The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar scriptable-transform-processor.jar --language=ruby --script="return ""#{payload.upcase}"""
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single
message into several distinct messages.
- A collection of split messages based on a given expression, delimiter or file marker.
- splitter.apply-sequence
- Add correlation/sequence information in headers to facilitate later
aggregation. (Boolean, default:
true
) - splitter.charset
- The charset to use when converting bytes in text-based files
to String. (String, default:
<none>
) - splitter.delimiters
- When expression is null, delimiters to use when tokenizing
{@link String} payloads. (String, default:
<none>
) - splitter.expression
- A SpEL expression for splitting payloads. (Expression, default:
<none>
) - splitter.file-markers
- Set to true or false to use a {@code FileSplitter} (to split
text-based files by line) that includes
(or not) beginning/end of file markers. (Boolean, default:
<none>
) - splitter.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
)
When no expression
, fileMarkers
, or charset
is provided, a DefaultMessageSplitter
is configured with (optional) delimiters
.
When fileMarkers
or charset
is provided, a FileSplitter
is configured (you must provide either a fileMarkers
or charset
to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter
is configured.
When splitting File
payloads, the sequenceSize
header is zero because the size cannot be determined at the beginning.
| Caution |
---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>')
.
For example, consider the following JSON:
{ "store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}}
and an expression #jsonPath(payload, '$.store.book')
; the result will be 4 messages, each with a Map
payload
containing the properties of a single book.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar splitter-processor.jar --splitter.expression=expression
java -jar splitter-processor.jar --splitter.delimiters=delimiters
3.13 TCP Client as a processor which connects to a TCP server, sends data to it and also receives data.
Content-Type: application/octet-stream
Content-Type: application/octet-stream
The tcp-client processor has the following options:
- tcp.buffer-size
- The buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - tcp.charset
- The charset used when converting from bytes to String. (String, default:
UTF-8
) - tcp.decoder
- The decoder to use when receiving messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.encoder
- The encoder to use when sending messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.host
- The host to which this sink will connect. (String, default:
localhost
) - tcp.nio
- <documentation missing> (Boolean, default:
<none>
) - tcp.port
- <documentation missing> (Integer, default:
<none>
) - tcp.retry-interval
- Retry interval (in milliseconds) to check the connection and reconnect. (Long, default:
60000
) - tcp.reverse-lookup
- <documentation missing> (Boolean, default:
<none>
) - tcp.socket-timeout
- <documentation missing> (Integer, default:
<none>
) - tcp.use-direct-buffers
- <documentation missing> (Boolean, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar tcp-client-processor.jar --tcp.decoder=LF --tcp.encoder=LF
Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')
The transform processor has the following options:
- transformer.expression
- <documentation missing> (Expression, default:
payload
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar transform-processor.jar --expression=payload.toUpperCase()
This transform will convert all message payloads to upper case.
3.15 TensorFlow Processor
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.
The TensorFlow Processor uses a TensorflowInputConverter
to convert the input data into data format compliant with the
TensorFlow Model used. The input converter converts the input Messages
into key/value Map
, where
the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType
compliant value.
The default converter implementation expects either Map payload or flat json message that can be converted into a Map.
The TensorflowInputConverter
can be extended and customized. See TwitterSentimentTensorflowInputConverter.java for example.
Following snippet shows how to export any TensorFlow
model (trained as well) into ProtocolBuffer
binary format as required by the Processor.
from tensorflow.python.framework.graph_util import convert_variables_to_constants
...
SAVE_DIR = os.path.abspath(os.path.curdir)
minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>'])
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False)
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The tensorflow processor has the following options:
- tensorflow.expression
- How to obtain the input data from the input message. If empty it defaults to the input message payload.
The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for
a Tuple key. The headers[myHeaderName] expression to get input data from message's header using
myHeaderName as a key. (Expression, default:
<none>
) - tensorflow.mode
- Defines how to store the output data and if the input payload is passed through or discarded.
Payload (Default) stores the output data in the outbound message payload. The input payload is discarded.
Header stores the output data in outputName message's header. The the input payload is passed through.
Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through
in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains
a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default:
<none>
, possible values: payload
,tuple
,header
) - tensorflow.model
- The location of the TensorFlow model file. (Resource, default:
<none>
) - tensorflow.model-fetch
- The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default:
<none>
) - tensorflow.model-fetch-index
- The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default:
0
) - tensorflow.output-name
- The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar tensorflow-processor.jar --model= --modelFetch= --mode="
3.16 Twitter Sentiment Analysis Processor
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.
It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn
Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow
content-type: application/json
content-type: application/json
Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values.
Then creates and returns a simple JSON message with this structure:
N/A
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The twitter-sentiment processor has the following options:
- tensorflow.expression
- How to obtain the input data from the input message. If empty it defaults to the input message payload.
The payload.myInTupleName expression treats the input payload as a Tuple, and myInTupleName stands for
a Tuple key. The headers[myHeaderName] expression to get input data from message's header using
myHeaderName as a key. (Expression, default:
<none>
) - tensorflow.mode
- Defines how to store the output data and if the input payload is passed through or discarded.
Payload (Default) stores the output data in the outbound message payload. The input payload is discarded.
Header stores the output data in outputName message's header. The the input payload is passed through.
Tuple stores the output data in an Tuple payload, using the outputName key. The input payload is passed through
in the same Tuple using the 'original.input.data'. If the input payload is already a Tuple that contains
a 'original.input.data' key, then copy the input Tuple into the new Tuple to be returned. (OutputMode, default:
<none>
, possible values: payload
,tuple
,header
) - tensorflow.model
- The location of the TensorFlow model file. (Resource, default:
<none>
) - tensorflow.model-fetch
- The TensorFlow graph model output. Name of TensorFlow operation to fetch the output Tensors from. (String, default:
<none>
) - tensorflow.model-fetch-index
- The modelFetch returns a list of Tensors. The modelFetchIndex specifies the index in the list to use as an output. (Integer, default:
0
) - tensorflow.output-name
- The output data key used in the Header or Tuple modes. Empty name defaults to the modelFetch property value. (String, default:
<none>
) - tensorflow.twitter.vocabulary
- The location of the word vocabulary file, used for training the model (Resource, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \
--tensorflow.modelFetch= --tensorflow.mode="