Use the aggregator
application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.
If the aggregation and correlation logic is based on the default strategies, the correlationId
, sequenceNumber
and sequenceSize
headers must be presented in the incoming message.
Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler
and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder.
If payload is JSON, the JsonPropertyAccessor
helps to build straightforward SpEL expressions for correlation, release and aggregation functions.
Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.
The aggregator processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)mongodb://localhost/test
)<none>
)false
)<none>
)<none>
)<none>
)<none>
)false
)embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
)<none>
)<none>
)<none>
)all
)<none>
)<none>
)<none>
);
)<none>
)<none>
)<none>
)<none>
)[sync_delay]
)3.5.5
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)By default the aggregator
processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
- for correlation
;
- SequenceSizeReleaseStrategy
- for release
;
- DefaultAggregatingMessageGroupProcessor
- for aggregation
;
- SimpleMessageStore
- for messageStoreType
.
The aggregator
application can be configured for persistent MessageGroupStore
implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import
ed basing on the aggregator.messageStoreType
configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator
.
The JDBC JdbcMessageStore
requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc
package of the spring-integration-jdbc
jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar aggregator_processor.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql java -jar aggregator_processor.jar --spring.data.mongodb.port=0 --aggregator.correlation=T(Thread).currentThread().id --aggregator.release="!#this.?[payload == 'bar'].empty" --aggregator.aggregation="#this.?[payload == 'foo'].![payload]" --aggregator.message-store-type=mongodb --aggregator.message-store-entity=aggregatorTest
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A Processor module that returns messages that is passed by connecting just the input and output channels.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Pass-Through Processor that computes multiple Counters from the messages that pass through. The input messages are re-send unchanged! Using the Micrometer library the Counter Processor integrates with the popular TSDB for persisting and processing the counter values.
By default the Counter Processor increments the message
.name
counter on every received message. The message-counter-enabled
controls the behavior of the message counter.
If tag expressions are provided (via the counter.tag.expression.<tagKey>=<tagValue SpEL expression> property) then the `name
counter is incremented. Every SpEL expression may evaluate into multiple values causing multiple counter increments for the same message (one fore every value resolved).
If fixed tags
are provided they are include in all message and expression counters.
Counter’s implementation is based on the Micrometer library which is a Vendor-neutral application metrics facade that supports the most popular monitoring systems. See the Micrometer documentation for the list of supported monitoring systems. Starting with Spring Boot 2.0, Micrometer is the instrumentation library powering the delivery of application metrics from Spring Boot.
All Spring Cloud Stream App Starters are configured to support two of the most popular monitoring systems, Prometheus and InfluxDB. You can declaratively select which monitoring system to use. If you are not using Prometheus or InfluxDB, you can customise the App starters to use a different monitoring system as well as include your preferred micrometer monitoring system library in your own custom applications.
Grafana is a popular platform for building visualization dashboards.
To enable Micrometer’s Prometheus meter registry for Spring Cloud Stream application starters, set the following properties.
management.metrics.export.prometheus.enabled=true management.endpoints.web.exposure.include=prometheus
and disable the application’s security which allows for a simple Prometheus configuration to scrape counter information by setting the following property.
spring.cloud.streamapp.security.enabled=false
To enable Micrometer’s Influx meter registry for Spring Cloud Stream application starters, set the following property.
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
![]() | Note |
---|---|
if the Data Flow Server metrics is enabled then the |
Following diagram illustrates Counter’s information collection and processing flow.
true
)<none>
)<none>
)<none>
)<none>
)Use the filter module in a stream to determine whether a Message should be passed to the output channel.
The filter processor has the following options:
true
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This processor uses gRPC to process Messages via a remote process written in any language that supports gRPC. This pattern, allows the Java app to handle the stream processing while the gRPC service handles the business logic. The service must implement the grpc service using link:. ./grpc-app-protos/src/main/proto/processor.proto[this protobuf schema].
![]() | Note |
---|---|
The gRPC client stub is blocking by default. Asynchronous and streaming stubs are provided. The Asynchronous stub will perform better if the server is multi-threaded however message ordering will not be guaranteed. If the server supports bidirectional streaming, use the streaming stub. |
![]() | Note |
---|---|
A |
Headers are available to the sidecar application via the process
schema if grpc.include-headers
is true
. The header value contains one or more string values to support multiple
values, e.g., the HTTP Accepts
header.
The payload is a byte array as defined by the schema.
In most cases the return message should simply contain the original headers provided. The sidecar application may modify or add headers however it is recommended to only add headers if necessary.
It is expected that the payload will normally be a string or byte array. However common primitive types are supported as defined by the schema.
The grpc processor has the following options:
<none>
)0
)false
)0
)true
)0
)<none>
, possible values: async
,blocking
,streaming
,riff
)Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
The header-enricher processor has the following options:
<none>
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
Any Required HTTP headers must be explicitly set via the headers-expression
property. See examples below.
Header values may also be used to construct the request body when referenced in the body-expression
property.
You can set the http-method-expression
property to derive the HTTP method from the inbound Message, or http-method
to set it statically (defaults to GET method).
The Message payload may be any Java type.
Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended.
A Map should work without too much effort.
By default, the payload will become HTTP request body (if needed).
You may also set the body-expression
property to construct a value derived from the Message, or body
to use a static (literal) value.
Internally, the processor uses RestTemplate.exchange(…).
The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
(Note user defined payload types will require adding required dependencies to your pom file)
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
The httpclient processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)<none>
)body
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
$ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.reply-expression="statusCode.name()"
A processor that evaluates a machine learning model stored in PMML format.
The pmml processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
Spring Cloud Stream App Starters for integrating with python
This application invokes a REST service, using httpclient processor. As a convenience for Python developers, this processor allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.
The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:
def input(): return "Pre" + payload; def output(): return payload + "Post"; result = locals()[channel]()
The function names input
and output
map to the conventional channel names used by Spring Cloud Stream processors.
The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable
channel
. Implemented with Spring Integration Scripting, headers
and payload
are always bound to the Message
headers and payload respectively. The payload on the input
side is the object you use to build the REST request.
The output
side transforms the response. If you don’t need any additional processing on one side, implement the
function with pass
as the body:
def output(): pass
![]() | Note |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
![]() | Note |
---|---|
The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts
stress on the JRE |
Headers may be set by the Jython wrapper script if the output()
script function returns a Message.
Whatever the `output()`wrapper script function returns.
![]() | Note |
---|---|
The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after
the response is received. The return value of the input adapter will be the inbound payload of the
httpclient processor and shoud conform to its requirements. Likewise
the HTTP |
The python-http processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)<none>
)body
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
See httpclient processor for more examples on
httpclient
properties.
$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper .variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl
This application executes a Jython script that binds payload
and headers
variables to the Message payload
and headers respectively. In addition you may provide a jython.variables
property containing a (comma delimited by
default) delimited string, e.g., var1=val1,var2=val2,…
.
This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.
![]() | Note |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
![]() | Note |
---|---|
The script is evaluated for every message which may limit your performance with high message loads. This also tends
to create a a lot of classes for each execution which puts stress on the JRE |
The jython processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here.
You can then cd
into one one of the folders and build it:
$ ./mvnw clean package
Folder target
will then contain generated jars (original, repackaged also known as executable jar, with javadoc, etc.) of which the executable jar can be used directly with java
to run the application.
Name of executable jar follows the pattern:
scriptable-transform-processor-
BINDER-
VERSION.jar
Starting from top-level folder, assuming you target rabbitmq and your pom.xml
has 2.1.0.RELEASE
for the project.version
property, you can build and run the standalone application with :
$ ./mvnw clean install -PgenerateApps $ cd apps/scriptable-transform-processor-rabbit $ ./mvnw clean package $ java -jar target/scriptable-transform-processor-rabbit-2.1.0.RELEASE.jar --scriptable-transformer.language=ruby --scriptable-transformer.script="return ""#{payload.upcase}"""
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
true
)<none>
)<none>
)<none>
)<none>
)true
)When no expression
, fileMarkers
, or charset
is provided, a DefaultMessageSplitter
is configured with (optional) delimiters
.
When fileMarkers
or charset
is provided, a FileSplitter
is configured (you must provide either a fileMarkers
or charset
to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter
is configured.
When splitting File
payloads, the sequenceSize
header is zero because the size cannot be determined at the beginning.
![]() | Caution |
---|---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>')
.
For example, consider the following JSON:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }}
and an expression #jsonPath(payload, '$.store.book')
; the result will be 4 messages, each with a Map
payload
containing the properties of a single book.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
Use the task launch request transform in a stream to create a TaskLaunchRequest to be passed to the output channel. The TaskLaunchRequest is used by a TaskLauncher to launch tasks on the platform.
The tasklaunchrequest processor has the following options:
<empty string>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The tcp-client processor has the following options:
2048
)UTF-8
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)localhost
)false
)1234
)60000
)false
)120000
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')
The transform processor has the following options:
payload
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.
Following snippet shows how to export a TensorFlow
model into ProtocolBuffer
binary format as required by the Processor.
from tensorflow.python.framework.graph_util import convert_variables_to_constants ... SAVE_DIR = os.path.abspath(os.path.curdir) minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>']) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)
The --tensorflow.model
property configures the Processor with the location of the serialized Tensorflow model.
The TensorflowInputConverter
converts the input data into the format, specific for the given model.
The TensorflowOutputConverter
converts the computed Tensors
result into a pipeline Message
.
The --tensorflow.modelFetch
property defines the list of TensorFlow graph outputs to fetch the output Tensors from.
The --tensorflow.mode
property defines whether the computed results are passed in the message payload or in the message header.
The TensorFlow Processor uses a TensorflowInputConverter
to convert the input data into data format compliant with the
TensorFlow Model used. The input converter converts the input Messages
into key/value Map
, where
the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType
compliant value.
The default converter implementation expects either Map payload or flat json message that can be converted into a Map.
The TensorflowInputConverter
can be extended and customized.
See TwitterSentimentTensorflowInputConverter.java for example.
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses JSON.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The tensorflow processor has the following options:
<none>
)<none>
, possible values: payload
,header
)<none>
)<none>
)result
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn
Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow
Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:
N/A
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses JSON.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The twitter-sentiment processor has the following options:
<none>
)<none>
, possible values: payload
,header
)<none>
)<none>
)result
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \ --tensorflow.modelFetch= --tensorflow.mode="
And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream
source and
using the pre-build minimal_graph.proto
and vocab.csv
:
tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \ | filter --expression=#jsonPath(payload,'$.lang')=='en' \ | twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \ --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \ --model-fetch=output/Softmax \ | log
A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).
Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.
The input of the model is an image as binary array.
The output is a JSON message in this format:
{ "labels" : [ {"giant panda":0.98649305} ] }
Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.
If the response-seize
is set to value higher then 1, then the result will include the top response-seize
probable labels. For example response-size=3
would return:
{ "labels": [ {"giant panda":0.98649305}, {"badger":0.010562794}, {"ice bear":0.001130851} ] }
The image-recognition processor has the following options:
<none>
)true
)<none>
)1
)<none>
, possible values: payload
,header
)<none>
)<none>
)result
)The new Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor uses one of the pre-trained object detection models and corresponding object labels.
If the pre-trained model is not set explicitly set then following defaults are used:
tensorflow.modelFetch
: detection_scores,detection_classes,detection_boxes,num_detections
tensorflow.model
: dl.bintray.com/big-data/generic/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
tensorflow.object.detection.labels
: dl.bintray.com/big-data/generic/mscoco_label_map.pbtxt
The following diagram illustrates a Spring Cloud Data Flow streaming pipeline that predicts object types from the images in real-time.
Processor’s input is an image byte array and the output is a JSON message in this format:
{ "labels" : [ {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1}, {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1}, {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23}, {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23} ] }
The output format is:
The object-detection processor has the following options:
<none>
)<none>
, possible values: payload
,header
)<none>
)<none>
)false
)0.4
)true
)true
)<none>
)result
)Real-time, multi-person Pose Estimation processor for detecting human figures in images and video. Used for determining where different body parts are located in an image an how are they spatially relate to each other. Processor is based on the Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields, OpenPose and tf-pose-estimation. |
The following diagram illustrates a Spring Cloud Data Flow streaming pipeline that predicts body postures from input images in real-time.
The Pose Estimation processor is configured with a pre-trained Tensorflow model (build with the tf-pose-estimation project). The inference of this model produces auxiliary data structures such a heatmaps with predictions about the parts locations in the image. The post-processing required for selecting the right body parts and grouping them into poses are implemented by the processor using greedy algorithms.
Use the tensorflow.model
property to set the pre-trained Tensorflow model. Here are some options available out of the box:
dl.bintray.com/big-data/generic/2018-30-05-mobilenet_thin_graph_opt.pb
(default) - fast but less accuratedl.bintray.com/big-data/generic/2018-05-14-cmu-graph_opt.pb
- accurate but slowerProcessor’s input is an image byte array and the output is a JSON message and optionally an image with annotated body poses. The output JSON format looks like:
[ { "limbs": [ {"score": 8.4396105, "from": { "type": "lShoulder", "y": 56, "x": 160 }, "to": { "type": "lEar", "y": 24, "x": 152 } }, { "score": 10.145516, "from": { "type": "neck", "y": 56, "x": 144 }, "to": { "type": "rShoulder", "y": 56, "x": 128 } }, { "score": 9.970467, "from": { "type": "neck", "y": 56, "x": 144 }, "to": { "type": "lShoulder", "y": 56, "x": 160 } } ] }, { "limbs": [ {"score": 7.85779, "from": { "type": "neck", "y": 48, "x": 328 }, "to": { "type": "rHip", "y": 128, "x": 328 } }, {"score": 6.8949876, "from": { "type": "neck", "y": 48, "x": 328 }, "to": { "type": "lHip", "y": 128, "x": 304 } } ] } ]
Every entry in the array represents a single body posture found on the image. Bodies are composed of Parts connected by Limbs represented by the limbs collection.
Every Limb instance has a PAF confidence score and the from
and to
parts it connects. The Part instances have a type
and coordinates.
![]() | Note |
---|---|
Output image annotated with body pose skeletons
When the |
The pose-estimation processor has the following options:
<none>
)<none>
, possible values: payload
,header
)<none>
)<none>
)result
)<none>
, possible values: monochrome
,bodyInstance
,limbType
)false
)./target
)2
)false
)4
)true
)5
)0.15
)4
)2
)0.1
)4.4
)