Use the aggregator
application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.
If the aggregation and correlation logic is based on the default strategies, the correlationId
, sequenceNumber
and sequenceSize
headers must be presented in the incoming message.
Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler
and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder.
If payload is JSON, the JsonPropertyAccessor
helps to build straightforward SpEL expressions for correlation, release and aggregation functions.
Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.
The aggregator processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)mongodb://localhost/test
)<none>
)false
)<none>
)<none>
)<none>
)<none>
)false
)embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
)<none>
)<none>
)<none>
)all
)<none>
)<none>
)<none>
);
)<none>
)<none>
)<none>
)<none>
)[sync_delay]
)3.2.2
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)By default the aggregator
processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
- for correlation
;
- SequenceSizeReleaseStrategy
- for release
;
- DefaultAggregatingMessageGroupProcessor
- for aggregation
;
- SimpleMessageStore
- for messageStoreType
.
The aggregator
application can be configured for persistent MessageGroupStore
implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import
ed basing on the aggregator.messageStoreType
configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator
.
The JDBC JdbcMessageStore
requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc
package of the spring-integration-jdbc
jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar aggregator_processor.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql java -jar aggregator_processor.jar --spring.data.mongodb.port=0 --aggregator.correlation=T(Thread).currentThread().id --aggregator.release="!#this.?[payload == 'bar'].empty" --aggregator.aggregation="#this.?[payload == 'foo'].![payload]" --aggregator.message-store-type=mongodb --aggregator.message-store-entity=aggregatorTest
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A Processor module that returns messages that is passed by connecting just the input and output channels.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the filter module in a stream to determine whether a Message should be passed to the output channel.
The filter processor has the following options:
true
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
The header-enricher processor has the following options:
<none>
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
Any Required HTTP headers must be explicitly set via the headers-expression
property. See examples below.
Header values may also be used to construct the request body when referenced in the body-expression
property.
The Message payload may be any Java type.
Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended.
A Map should work without too much effort.
By default, the payload will become HTTP request body (if needed).
You may also set the body-expression
property to construct a value derived from the Message, or body
to use a static (literal) value.
Internally, the processor uses RestTemplate.exchange(…).
The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
(Note user defined payload types will require adding required dependencies to your pom file)
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
The httpclient processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)body
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
$ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.reply-expression="statusCode.name()"
A processor that evaluates a machine learning model stored in PMML format.
The pmml processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
Spring Cloud Stream App Starters for integrating with python
This application invokes a REST service, similar to the standard httpclient processor. In fact, this application embeds the httpclient processor. As a convenience for Python developers, this allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.
The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:
def input(): return "Pre" + payload; def output(): return payload + "Post"; result = locals()[channel]()
The function names input
and output
map to the conventional channel names used by Spring Cloud Stream processors.
The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable
channel
. Implemented with Spring Integration Scripting, headers
and payload
are always bound to the Message
headers and payload respectively. The payload on the input
side is the object you use to build the REST request.
The output
side transforms the response. If you don’t need any additional processing on one side, implement the
function with pass
as the body:
def output(): pass
Note | |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
Note | |
---|---|
The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts
stress on the JRE |
Headers may be set by the Jython wrapper script if the output()
script function returns a Message.
Whatever the `output()`wrapper script function returns.
Note | |
---|---|
The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after
the response is received. The return value of the input adapter will be the inbound payload of the
httpclient processor and shoud conform to its requirements. Likewise
the HTTP |
The python-http processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)body
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
See httpclient processor for more examples on
httpclient
properties.
$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper .variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl
This application executes a Jython script that binds payload
and headers
variables to the Message payload
and headers respectively. In addition you may provide a jython.variables
property containing a (comma delimited by
default) delimited string, e.g., var1=val1,var2=val2,…
.
This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.
Note | |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
Note | |
---|---|
The script is evaluated for every message which may limit your performance with high message loads. This also tends
to create a a lot of classes for each execution which puts stress on the JRE |
The jython processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
true
)<none>
)<none>
)<none>
)<none>
)true
)When no expression
, fileMarkers
, or charset
is provided, a DefaultMessageSplitter
is configured with (optional) delimiters
.
When fileMarkers
or charset
is provided, a FileSplitter
is configured (you must provide either a fileMarkers
or charset
to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter
is configured.
When splitting File
payloads, the sequenceSize
header is zero because the size cannot be determined at the beginning.
Caution | |
---|---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>')
.
For example, consider the following JSON:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }}
and an expression #jsonPath(payload, '$.store.book')
; the result will be 4 messages, each with a Map
payload
containing the properties of a single book.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
The tcp-client processor has the following options:
2048
)UTF-8
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)localhost
)false
)1234
)60000
)false
)120000
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')
The transform processor has the following options:
payload
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.
Following snippet shows how to export a TensorFlow
model into ProtocolBuffer
binary format as required by the Processor.
from tensorflow.python.framework.graph_util import convert_variables_to_constants ... SAVE_DIR = os.path.abspath(os.path.curdir) minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>']) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)
The --tensorflow.model
property configures the Processor with the location of the serialized Tensorflow model.
The TensorflowInputConverter
converts the input data into the format, specific for the given model.
The TensorflowOutputConverter
converts the computed Tensors
result into a pipeline Message
.
The --tensorflow.modelFetch
property defines the list of TensorFlow graph outputs to fetch the output Tensors from.
The --tensorflow.mode
property defines whether the computed results are passed in the message payload or in the message header.
The TensorFlow Processor uses a TensorflowInputConverter
to convert the input data into data format compliant with the
TensorFlow Model used. The input converter converts the input Messages
into key/value Map
, where
the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType
compliant value.
The default converter implementation expects either Map payload or flat json message that can be converted into a Map.
The TensorflowInputConverter
can be extended and customized.
See TwitterSentimentTensorflowInputConverter.java for example.
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The tensorflow processor has the following options:
<none>
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)result
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn
Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow
Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:
N/A
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The twitter-sentiment processor has the following options:
<none>
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)result
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \ --tensorflow.modelFetch= --tensorflow.mode="
And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream
source and
using the pre-build minimal_graph.proto
and vocab.csv
:
tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \ | filter --expression=#jsonPath(payload,'$.lang')=='en' \ | twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \ --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \ --model-fetch=output/Softmax \ | log