3. Processors

3.1 Aggregator Processor

Use the aggregator application to combine multiple messages into one, based on some correlation mechanism.

This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.

3.1.1 Input

Headers

If the aggregation and correlation logic is based on the default strategies, the correlationId, sequenceNumber and sequenceSize headers must be presented in the incoming message.

Payload

Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder. If payload is JSON, the JsonPropertyAccessor helps to build straightforward SpEL expressions for correlation, release and aggregation functions.

3.1.2 Output

Headers

Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.

Payload

By default the DefaultAggregatingMessageGroupProcessor is used for aggregation function with meaning return the java.util.List of payloads of incoming messages. The custom aggregation SpEL expression may produce any required object to be sent to the output of the processor.

3.1.3 Options

The aggregator processor has the following options:

aggregator.aggregation
SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default: <none>)
aggregator.correlation
SpEL expression for correlation key. Default to correlationId header (Expression, default: <none>)
aggregator.group-timeout
SpEL expression for timeout to expiring uncompleted groups (Expression, default: <none>)
aggregator.message-store-entity
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default: <none>)
aggregator.message-store-type
Message store type (String, default: <none>)
aggregator.release
SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default: <none>)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with URI. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with URI. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: mongodb://localhost/test)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with URI. (String, default: <none>)
spring.datasource.continue-on-error
Whether to stop if an error occurs while initializing the database. (Boolean, default: false)
spring.datasource.data
Data (DML) script resource references. (List<String>, default: <none>)
spring.datasource.data-password
Password of the database to execute DML scripts (if different). (String, default: <none>)
spring.datasource.data-username
Username of the database to execute DML scripts (if different). (String, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.generate-unique-name
Whether to generate a random datasource name. (Boolean, default: false)
spring.datasource.initialization-mode
Initialize the datasource with available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)
spring.datasource.jndi-name
JNDI location of the datasource. Class, url, username & password are ignored when set. (String, default: <none>)
spring.datasource.name
Name of the datasource. Default to "testdb" when using an embedded database. (String, default: <none>)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.platform
Platform to use in the DDL or DML scripts (such as schema-${platform}.sql or data-${platform}.sql). (String, default: all)
spring.datasource.schema
Schema (DDL) script resource references. (List<String>, default: <none>)
spring.datasource.schema-password
Password of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.schema-username
Username of the database to execute DDL scripts (if different). (String, default: <none>)
spring.datasource.separator
Statement separator in SQL initialization scripts. (String, default: ;)
spring.datasource.sql-script-encoding
SQL scripts encoding. (Charset, default: <none>)
spring.datasource.type
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (Class<DataSource>, default: <none>)
spring.datasource.url
JDBC URL of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)
spring.mongodb.embedded.features
Comma-separated list of features to enable. Uses the defaults of the configured version by default. (Set<Feature>, default: [sync_delay])
spring.mongodb.embedded.version
Version of Mongo to use. (String, default: 3.5.5)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

By default the aggregator processor uses: - HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) - for correlation; - SequenceSizeReleaseStrategy - for release; - DefaultAggregatingMessageGroupProcessor - for aggregation; - SimpleMessageStore - for messageStoreType.

The aggregator application can be configured for persistent MessageGroupStore implementations. The configuration for target technology is fully based on the Spring Boot auto-configuration. But default JDBC, MongoDb and Redis auto-configurations are excluded. They are @Import ed basing on the aggregator.messageStoreType configuration property. Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator.

The JDBC JdbcMessageStore requires particular tables in the target data base. You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc package of the spring-integration-jdbc jar. Those scripts can be used for automatic data base initialization via Spring Boot.

For example:

java -jar aggregator-rabbit-1.0.0.RELEASE.jar
               --aggregator.message-store-type=jdbc
               --spring.datasource.url=jdbc:h2:mem:test
               --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql

3.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.1.5 Examples

java -jar aggregator_processor.jar
               --aggregator.message-store-type=jdbc
               --spring.datasource.url=jdbc:h2:mem:test
               --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql

java -jar aggregator_processor.jar
               --spring.data.mongodb.port=0
               --aggregator.correlation=T(Thread).currentThread().id
               --aggregator.release="!#this.?[payload == 'bar'].empty"
               --aggregator.aggregation="#this.?[payload == 'foo'].![payload]"
               --aggregator.message-store-type=mongodb
               --aggregator.message-store-entity=aggregatorTest

3.1.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.2 Bridge Processor

A Processor module that returns messages that is passed by connecting just the input and output channels.

3.2.1 Input

Headers

Payload

Any

3.2.2 Output

Headers

Payload

Any

3.2.3 Options

3.2.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.2.5 Examples

java -jar bridge-processor.jar

3.3 Counter Processor

Pass-Through Processor that computes multiple Counters from the messages that pass through. The input messages are re-send unchanged! Using the Micrometer library the Counter Processor integrates with the popular TSDB for persisting and processing the counter values.

By default the Counter Processor increments the message.name counter on every received message. The message-counter-enabled controls the behavior of the message counter.

If tag expressions are provided (via the counter.tag.expression.<tagKey>=<tagValue SpEL expression> property) then the `name counter is incremented. Every SpEL expression may evaluate into multiple values causing multiple counter increments for the same message (one fore every value resolved).

If fixed tags are provided they are include in all message and expression counters.

Counter’s implementation is based on the Micrometer library which is a Vendor-neutral application metrics facade that supports the most popular monitoring systems. See the Micrometer documentation for the list of supported monitoring systems. Starting with Spring Boot 2.0, Micrometer is the instrumentation library powering the delivery of application metrics from Spring Boot.

All Spring Cloud Stream App Starters are configured to support two of the most popular monitoring systems, Prometheus and InfluxDB. You can declaratively select which monitoring system to use. If you are not using Prometheus or InfluxDB, you can customise the App starters to use a different monitoring system as well as include your preferred micrometer monitoring system library in your own custom applications.

Grafana is a popular platform for building visualization dashboards.

To enable Micrometer’s Prometheus meter registry for Spring Cloud Stream application starters, set the following properties.

management.metrics.export.prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus

and disable the application’s security which allows for a simple Prometheus configuration to scrape counter information by setting the following property.

spring.cloud.streamapp.security.enabled=false

To enable Micrometer’s Influx meter registry for Spring Cloud Stream application starters, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
[Note]Note

if the Data Flow Server metrics is enabled then the Counter will reuse the exiting configurations.

Following diagram illustrates Counter’s information collection and processing flow.

Counter Architecture

3.3.1 Options

counter.message-counter-enabled
Enables counting the number of messages processed. Uses the 'message.' counter name prefix to distinct it form the expression based counter. The message counter includes the fixed tags when provided. (Boolean, default: true)
counter.name
The name of the counter to increment. (String, default: <none>)
counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default: <none>)
counter.tag.expression
Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate counter increment. Tag expression format is: counter.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default: <none>)
counter.tag.fixed
Custom tags assigned to every counter increment measurements. This is a map so the property convention fixed tags is: counter.tag.fixed.[tag-name]=[tag-value] (Map<String, String>, default: <none>)

3.4 Filter Processor

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

3.4.1 Input

Headers

N/A

Payload

Any

3.4.2 Output

Headers

N/A

Payload

Any

3.4.3 Options

The filter processor has the following options:

filter.expression
A SpEL expression to be evaluated against each message, to decide whether or not to accept it. (Expression, default: true)

3.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.4.5 Examples

java -jar filter-processor.jar --expression="payload"

3.5 Groovy Filter Processor

A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.

3.5.1 Input

Headers

N/A

Payload

  • Any

3.5.2 Output

Headers

N/A

Payload

  • Any

3.5.3 Options

The groovy-filter processor has the following options:

groovy-filter.script
The resource location of the groovy script (Resource, default: <none>)
groovy-filter.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-filter.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.5.5 Examples

java -jar groovy-filter-processor.jar --script=script.groovy

3.6 Groovy Transform Processor

A Processor module that transforms messages using a Groovy script.

3.6.1 Input

Headers

N/A

Payload

  • Any

3.6.2 Output

Headers

N/A

Payload

  • Any

3.6.3 Options

The groovy-transform processor has the following options:

groovy-transformer.script
Reference to a script used to process messages. (Resource, default: <none>)
groovy-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
groovy-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.6.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.6.5 Examples

java -jar groovy-transform-processor.jar --script=script.groovy

3.7 gRPC Processor

This processor uses gRPC to process Messages via a remote process written in any language that supports gRPC. This pattern, allows the Java app to handle the stream processing while the gRPC service handles the business logic. The service must implement the grpc service using link:. ./grpc-app-protos/src/main/proto/processor.proto[this protobuf schema].

[Note]Note

The gRPC client stub is blocking by default. Asynchronous and streaming stubs are provided. The Asynchronous stub will perform better if the server is multi-threaded however message ordering will not be guaranteed. If the server supports bidirectional streaming, use the streaming stub.

[Note]Note

A riff stub is available for interoperability with riff function containers. This does not interact with the Riff FaaS platform but supports running an existing function container standalone, for example, docker run -it -p10382:10382 some/riff-function:latest .

3.7.1 Input

Headers

Headers are available to the sidecar application via the process schema if grpc.include-headers is true. The header value contains one or more string values to support multiple values, e.g., the HTTP Accepts header.

Payload

The payload is a byte array as defined by the schema.

3.7.2 Output

Headers

In most cases the return message should simply contain the original headers provided. The sidecar application may modify or add headers however it is recommended to only add headers if necessary.

Payload

It is expected that the payload will normally be a string or byte array. However common primitive types are supported as defined by the schema.

3.7.3 Options

The grpc processor has the following options:

grpc.host
The gRPC host name. (String, default: <none>)
grpc.idle-timeout
The idle timeout in seconds. (Long, default: 0)
grpc.include-headers
Flag to include headers in Messages to the remote process. (Boolean, default: false)
grpc.max-message-size
The maximum message size (bytes). (Integer, default: 0)
grpc.plain-text
Flag to send messages in plain text. SSL configuration required otherwise. (Boolean, default: true)
grpc.port
The gRPC server port. (Integer, default: 0)
grpc.stub
RPC communications style (default 'blocking'). (Stub, default: <none>, possible values: async,blocking,streaming,riff)

3.8 Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.8.1 Input

Headers

N/A

Payload

  • Any

3.8.2 Output

Headers

N/A

Payload

  • Any

3.8.3 Options

The header-enricher processor has the following options:

header.enricher.headers
\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz (Properties, default: <none>)
header.enricher.overwrite
set to true to overwrite any existing message headers (Boolean, default: false)

3.8.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.8.5 Examples

java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'

3.8.6 Code of Conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

3.9 Http Client Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.

3.9.1 Input

Headers

Any Required HTTP headers must be explicitly set via the headers-expression property. See examples below. Header values may also be used to construct the request body when referenced in the body-expression property.

Payload

You can set the http-method-expression property to derive the HTTP method from the inbound Message, or http-method to set it statically (defaults to GET method). The Message payload may be any Java type. Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended. A Map should work without too much effort. By default, the payload will become HTTP request body (if needed). You may also set the body-expression property to construct a value derived from the Message, or body to use a static (literal) value.

Internally, the processor uses RestTemplate.exchange(…​).

The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. (Note user defined payload types will require adding required dependencies to your pom file)

3.9.2 Output

Headers

No HTTP message headers are mapped to the outbound Message.

Payload

The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body.

3.9.3 Options

The httpclient processor has the following options:

httpclient.body
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
httpclient.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
httpclient.expected-response-type
The type used to interpret the response. (Class<?>, default: <none>)
httpclient.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
httpclient.http-method
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
httpclient.http-method-expression
A SpEL expression to derive the request method from the incoming message. (Expression, default: <none>)
httpclient.reply-expression
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
httpclient.url
The URL to issue an http request to, as a static value. (String, default: <none>)
httpclient.url-expression
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

3.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.9.5 Examples

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$ java -jar httpclient-processor.jar --httpclient.url=http://someurl  --httpclient.reply-expression="statusCode.name()"

3.10 PMML Processor

A processor that evaluates a machine learning model stored in PMML format.

3.10.1 Input

Headers

N/A

Payload

  • PMML model data

3.10.2 Output

Headers

N/A

Payload

  • Tuple carrying information about the evaluated data

3.10.3 Options

The pmml processor has the following options:

pmml.inputs
How to compute model active fields from input message properties as modelField->SpEL. (Map<String, Expression>, default: <none>)
pmml.model-location
The location of the PMML model file. (Resource, default: <none>)
pmml.model-name
If the model file contains multiple models, the name of the one to use. (String, default: <none>)
pmml.model-name-expression
If the model file contains multiple models, the name of the one to use, as a SpEL expression. (Expression, default: <none>)
pmml.outputs
How to emit evaluation results in the output message as msgProperty->SpEL. (Map<String, Expression>, default: <none>)

3.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.10.5 Examples

java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelName="
java -jar pmml-processor.jar --pmml.modelLocation= --pmml.modelNameExpression="

3.11 Python Http Processor

Spring Cloud Stream App Starters for integrating with python

This application invokes a REST service, using httpclient processor. As a convenience for Python developers, this processor allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.

The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:

def input():
    return "Pre" + payload;

def output():
    return payload + "Post";

result = locals()[channel]()

The function names input and output map to the conventional channel names used by Spring Cloud Stream processors. The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable channel. Implemented with Spring Integration Scripting, headers and payload are always bound to the Message headers and payload respectively. The payload on the input side is the object you use to build the REST request. The output side transforms the response. If you don’t need any additional processing on one side, implement the function with pass as the body:

def output():
   pass
[Note]Note

The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.

[Note]Note

The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.

 

PythonHttpProcessor

3.11.1 Input

Headers

Headers will be bound automatically to the wrapper script variable headers.

Payload

Any type. Payload will be automatically bound to the wrapper script variable payload. Jython scripts can effectively access any Java type on the app’s classpath.

3.11.2 Output

Headers

Headers may be set by the Jython wrapper script if the output() script function returns a Message.

Payload

Whatever the `output()`wrapper script function returns.

[Note]Note

The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after the response is received. The return value of the input adapter will be the inbound payload of the httpclient processor and shoud conform to its requirements. Likewise the HTTP reply-expression will bound to be the payload when the output() function is invoked.

3.11.3 Options

The python-http processor has the following options:

git.basedir
The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
git.clone-on-start
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
git.label
The label or branch to clone. (String, default: master)
git.passphrase
The passphrase for the remote repository. (String, default: <none>)
git.password
The password for the remote repository. (String, default: <none>)
git.timeout
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
git.uri
The URI of the remote repository. (String, default: <none>)
git.username
The username for the remote repository. (String, default: <none>)
httpclient.body
The (static) request body; if neither this nor bodyExpression is provided, the payload will be used. (Object, default: <none>)
httpclient.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)
httpclient.expected-response-type
The type used to interpret the response. (Class<?>, default: <none>)
httpclient.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)
httpclient.http-method
The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE)
httpclient.http-method-expression
A SpEL expression to derive the request method from the incoming message. (Expression, default: <none>)
httpclient.reply-expression
A SpEL expression used to compute the final result, applied against the whole http response. (Expression, default: body)
httpclient.url
The URL to issue an http request to, as a static value. (String, default: <none>)
httpclient.url-expression
A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)
wrapper.content-type
Sets the Content type header for the outgoing Message. (MediaType, default: <none>)
wrapper.delimiter
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
wrapper.script
The Python script file name. (String, default: <none>)
wrapper.variables
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.11.5 Examples

See httpclient processor for more examples on httpclient properties.

$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl
--httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}"

$java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper
.variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl

3.12 Jython Processor

This application executes a Jython script that binds payload and headers variables to the Message payload and headers respectively. In addition you may provide a jython.variables property containing a (comma delimited by default) delimited string, e.g., var1=val1,var2=val2,…​.

This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.

[Note]Note

The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly.

[Note]Note

The script is evaluated for every message which may limit your performance with high message loads. This also tends to create a a lot of classes for each execution which puts stress on the JRE Metaspace memory region (or Permgen if using a JRE prior to version 8). In Java 8, Metaspace is unlimited by default, allocated from native memory, and therefore limited by the native OS. If deploying to CloudFoundry, the Java Buildpack Memory Calculator sets -XXMaxMetaspaceSize. (see github.com/cloudfoundry/java-buildpack-memory-calculator for details). If using JBP v4.x, you may override the calculated value (and others) by specifying -XXMaxMetaspaceSize explicitly in JAVA_OPTS. You also need to increase the container memory accordingly. Similar tuning is advised in any containerized environment.

3.12.1 Input

Headers

Headers will be bound automatically to the script variable headers.

Payload

Any type. Payload will be automatically bound to the script variable payload.

3.12.2 Output

Headers

Headers may be set by the Jython script if the script returns a Message.

Payload

Whatever the script returns.

 

JythonProcessor

3.12.3 Options

The jython processor has the following options:

git.basedir
The base directory where the repository should be cloned. If not specified, a temporary directory will be created. (File, default: <none>)
git.clone-on-start
Flag to indicate that the repository should be cloned on startup (not on demand). Generally leads to slower startup but faster first query. (Boolean, default: true)
git.label
The label or branch to clone. (String, default: master)
git.passphrase
The passphrase for the remote repository. (String, default: <none>)
git.password
The password for the remote repository. (String, default: <none>)
git.timeout
Timeout (in seconds) for obtaining HTTP or SSH connection (if applicable). Default 5 seconds. (Integer, default: 5)
git.uri
The URI of the remote repository. (String, default: <none>)
git.username
The username for the remote repository. (String, default: <none>)
jython.content-type
Sets the Content type header for the outgoing Message. (MediaType, default: <none>)
jython.delimiter
The variable delimiter. (Delimiter, default: <none>, possible values: COMMA,SPACE,TAB,NEWLINE)
jython.script
The Python script file name. (String, default: <none>)
jython.variables
Variable bindings as a delimited string of name-value pairs, e.g. 'foo=bar,baz=car'. (String, default: <none>)

3.12.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.12.5 Examples

$java -jar python-jython-processor.jar --jython.script=/local/directory/to_uppercase.py

$java -jar python-jython-processor.jar --git.uri=https://github.com/some-repo --jython
.script=map-tweet-sentiments.py --jython.variables=neutral=0.45,positive=0.55

3.13 Scripable Transform Processor

A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

3.13.1 Input

Headers

N/A

Payload

  • Any

3.13.2 Output

Headers

N/A

Payload

  • Any

3.13.3 Options

The scriptable-transform processor has the following options:

scriptable-transformer.language
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)
scriptable-transformer.script
Text of the script. (String, default: <none>)
scriptable-transformer.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
scriptable-transformer.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

3.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Folder target will then contain generated jars (original, repackaged also known as executable jar, with javadoc, etc.) of which the executable jar can be used directly with java to run the application.

Name of executable jar follows the pattern: scriptable-transform-processor-BINDER-VERSION.jar

3.13.5 Examples

Starting from top-level folder, assuming you target rabbitmq and your pom.xml has 2.1.0.RELEASE for the project.version property, you can build and run the standalone application with :

$ ./mvnw clean install -PgenerateApps
$ cd apps/scriptable-transform-processor-rabbit
$ ./mvnw clean package
$ java -jar target/scriptable-transform-processor-rabbit-2.1.0.RELEASE.jar --scriptable-transformer.language=ruby --scriptable-transformer.script="return ""#{payload.upcase}"""

3.14 Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

3.14.1 Input

Headers

N/A

Payload

  • Any

3.14.2 Output

Headers

N/A

Payload

  • A collection of split messages based on a given expression, delimiter or file marker.

3.14.3 Options

splitter.apply-sequence
Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)
splitter.charset
The charset to use when converting bytes in text-based files to String. (String, default: <none>)
splitter.delimiters
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)
splitter.expression
A SpEL expression for splitting payloads. (Expression, default: <none>)
splitter.file-markers
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)
splitter.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters. When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers or charset to split files, which must be text-based - they are split into lines). Otherwise, an ExpressionEvaluatingMessageSplitter is configured.

When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.

[Caution]Caution

Ambiguous properties are not allowed.

3.14.4 JSON Example

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload, '<json path expression>').

For example, consider the following JSON:

{ "store": {
    "book": [
        {
            "category": "reference",
            "author": "Nigel Rees",
            "title": "Sayings of the Century",
            "price": 8.95
        },
        {
            "category": "fiction",
            "author": "Evelyn Waugh",
            "title": "Sword of Honour",
            "price": 12.99
        },
        {
            "category": "fiction",
            "author": "Herman Melville",
            "title": "Moby Dick",
            "isbn": "0-553-21311-3",
            "price": 8.99
        },
        {
            "category": "fiction",
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings",
            "isbn": "0-395-19395-8",
            "price": 22.99
        }
    ],
    "bicycle": {
        "color": "red",
        "price": 19.95
    }
}}

and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload containing the properties of a single book.

3.14.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.14.6 Examples

java -jar splitter-processor.jar --splitter.expression=expression
java -jar splitter-processor.jar --splitter.delimiters=delimiters

3.15 Task Launch Request Transform

Use the task launch request transform in a stream to create a TaskLaunchRequest to be passed to the output channel. The TaskLaunchRequest is used by a TaskLauncher to launch tasks on the platform.

3.15.1 Input

Any input type. (payload and header are discarded)

3.15.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

A byte array containing the TaskLaunchRequest

3.15.3 Options

The tasklaunchrequest processor has the following options:

task.launch.request.application-name
The name to be applied to the launched task. (String, default: <empty string>)
task.launch.request.command-line-arguments
Space delimited list of commandLineArguments to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.data-source-driver-class-name
The datasource driver class name to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.data-source-password
The datasource password to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.data-source-url
The datasource url to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.data-source-user-name
The datasource user name to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.deployment-properties
Comma delimited list of deployment properties to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.environment-properties
Comma delimited list of environment properties to be applied to the TaskLaunchRequest. (String, default: <none>)
task.launch.request.uri
The uri of the artifact to be applied to the TaskLaunchRequest. (String, default: <none>)

3.15.4 Building with Maven

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar tasklaunchrequest_transform_processor.jar --uri=maven://org.springframework.cloud.task.app:timestamp-task:1.2.0.RELEASE

3.16 TCP Client as a processor which connects to a TCP server, sends data to it and also receives data.

3.16.1 Input

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

Headers:

  • Content-Type: text/plain

Payload:

  • String

3.16.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

3.16.3 Options

The tcp-client processor has the following options:

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: localhost)
tcp.nio
Whether or not to use NIO. (Boolean, default: false)
tcp.port
The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)
tcp.retry-interval
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
tcp.reverse-lookup
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)
tcp.socket-timeout
The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)
tcp.use-direct-buffers
Whether or not to use direct buffers. (Boolean, default: false)

3.16.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.16.5 Examples

java -jar tcp-client-processor.jar --tcp.decoder=LF --tcp.encoder=LF

3.17 Transform Processor

Use the transform app in a stream to convert a Message’s content or structure.

The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

3.17.1 Input

Headers

N/A

Payload

  • Any

3.17.2 Output

Headers

N/A

Payload

  • Any

3.17.3 Options

The transform processor has the following options:

transformer.expression
<documentation missing> (Expression, default: payload)

3.17.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

3.17.5 Examples

java -jar transform-processor.jar --expression=payload.toUpperCase()

This transform will convert all message payloads to upper case.

3.18 TensorFlow Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.

Following snippet shows how to export a TensorFlow model into ProtocolBuffer binary format as required by the Processor.

from tensorflow.python.framework.graph_util import convert_variables_to_constants
...
SAVE_DIR = os.path.abspath(os.path.curdir)
minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>'])
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False)
tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)
TensorFlowProcessorArcutectureOverview

The --tensorflow.model property configures the Processor with the location of the serialized Tensorflow model.

The TensorflowInputConverter converts the input data into the format, specific for the given model.

The TensorflowOutputConverter converts the computed Tensors result into a pipeline Message.

The --tensorflow.modelFetch property defines the list of TensorFlow graph outputs to fetch the output Tensors from.

The --tensorflow.mode property defines whether the computed results are passed in the message payload or in the message header.

3.18.1 Input

Headers

N/A

Payload

The TensorFlow Processor uses a TensorflowInputConverter to convert the input data into data format compliant with the TensorFlow Model used. The input converter converts the input Messages into key/value Map, where the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType compliant value. The default converter implementation expects either Map payload or flat json message that can be converted into a Map.

The TensorflowInputConverter can be extended and customized. See TwitterSentimentTensorflowInputConverter.java for example.

3.18.2 Output

Headers

N/A

Payload

Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses JSON.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.18.3 Options

The tensorflow processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
The outbound message can store the inference result either in the payload or in a header with name outputName. The payload mode (default) stores the inference result in the outbound message payload. The inbound payload is discarded. The header mode stores the inference result in outbound message's header defined by the outputName property. The the inbound message payload is passed through to the outbound such. (OutputMode, default: <none>, possible values: payload,header)
tensorflow.model
The location of the pre-trained TensorFlow model file. The file, http and classpath schemas are supported. For archive locations takes the first file with '.pb' extension. Use the URI fragment parameter to specify an exact model name (e.g. https://foo/bar/model.tar.gz#frozen_inference_graph.pb) (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model outputs. Comma separate list of TensorFlow operation names to fetch the output Tensors from. (List<String>, default: <none>)
tensorflow.output-name
The output data key used for the Header modes. (String, default: result)

3.18.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.18.5 Examples

java -jar tensorflow-processor.jar --model= --modelFetch= --mode="

3.19 Twitter Sentiment Analysis Processor

A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn

SCDF TF Sentiment

Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow

3.19.1 Input

Headers

  • content-type: application/json

Payload

  • JSON tweet message

3.19.2 Output

Headers

  • content-type: application/json

Payload

Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:

N/A

Payload

Processor’s output uses TensorflowOutputConverter to convert the computed Tensor result into a serializable message. The default implementation uses JSON.

Custom TensorflowOutputConverter can provide more convenient data representations. See TwitterSentimentTensorflowOutputConverter.java.

3.19.3 Options

The twitter-sentiment processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
The outbound message can store the inference result either in the payload or in a header with name outputName. The payload mode (default) stores the inference result in the outbound message payload. The inbound payload is discarded. The header mode stores the inference result in outbound message's header defined by the outputName property. The the inbound message payload is passed through to the outbound such. (OutputMode, default: <none>, possible values: payload,header)
tensorflow.model
The location of the pre-trained TensorFlow model file. The file, http and classpath schemas are supported. For archive locations takes the first file with '.pb' extension. Use the URI fragment parameter to specify an exact model name (e.g. https://foo/bar/model.tar.gz#frozen_inference_graph.pb) (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model outputs. Comma separate list of TensorFlow operation names to fetch the output Tensors from. (List<String>, default: <none>)
tensorflow.output-name
The output data key used for the Header modes. (String, default: result)
tensorflow.twitter.vocabulary
The location of the word vocabulary file, used for training the model (Resource, default: <none>)

3.19.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

3.19.5 Examples

java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \
    --tensorflow.modelFetch= --tensorflow.mode="

And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream source and using the pre-build minimal_graph.proto and vocab.csv:

tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \
| filter --expression=#jsonPath(payload,'$.lang')=='en' \
| twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \
   --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \
   --model-fetch=output/Softmax \
| log

3.20 Image Recognition Processor

A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).

Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.

The input of the model is an image as binary array.

The output is a JSON message in this format:

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.

If the response-seize is set to value higher then 1, then the result will include the top response-seize probable labels. For example response-size=3 would return:

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}

3.20.1 Options

The image-recognition processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.image.recognition.draw-labels
When set to true it augment the input image with the predicted labels (Boolean, default: true)
tensorflow.image.recognition.labels
The text file containing the category names (e.g. labels) of all categories that this model is trained to recognize. Every category is on a separate line. (Resource, default: <none>)
tensorflow.image.recognition.response-size
Number of top K alternatives to add to the result. Only used when the responseSize > 0. (Integer, default: 1)
tensorflow.mode
The outbound message can store the inference result either in the payload or in a header with name outputName. The payload mode (default) stores the inference result in the outbound message payload. The inbound payload is discarded. The header mode stores the inference result in outbound message's header defined by the outputName property. The the inbound message payload is passed through to the outbound such. (OutputMode, default: <none>, possible values: payload,header)
tensorflow.model
The location of the pre-trained TensorFlow model file. The file, http and classpath schemas are supported. For archive locations takes the first file with '.pb' extension. Use the URI fragment parameter to specify an exact model name (e.g. https://foo/bar/model.tar.gz#frozen_inference_graph.pb) (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model outputs. Comma separate list of TensorFlow operation names to fetch the output Tensors from. (List<String>, default: <none>)
tensorflow.output-name
The output data key used for the Header modes. (String, default: result)

3.21 Object Detection Processor

The new Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor uses one of the pre-trained object detection models and corresponding object labels.

If the pre-trained model is not set explicitly set then following defaults are used:

The following diagram illustrates a Spring Cloud Data Flow streaming pipeline that predicts object types from the images in real-time.

scdf tensorflow object detection arch

Processor’s input is an image byte array and the output is a JSON message in this format:

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

The output format is:

  • object-name:confidence - human readable name of the detected object (e.g. label) with its confidence as a float between [0-1]
  • x1, y1, x2, y2 - Response also provides the bounding box of the detected objects represented as (x1, y1, x2, y2). The coordinates are relative to the size of the image size.
  • cid - Classification identifier as defined in the provided labels configuration file.

3.21.1 Options

The object-detection processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
The outbound message can store the inference result either in the payload or in a header with name outputName. The payload mode (default) stores the inference result in the outbound message payload. The inbound payload is discarded. The header mode stores the inference result in outbound message's header defined by the outputName property. The the inbound message payload is passed through to the outbound such. (OutputMode, default: <none>, possible values: payload,header)
tensorflow.model
The location of the pre-trained TensorFlow model file. The file, http and classpath schemas are supported. For archive locations takes the first file with '.pb' extension. Use the URI fragment parameter to specify an exact model name (e.g. https://foo/bar/model.tar.gz#frozen_inference_graph.pb) (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model outputs. Comma separate list of TensorFlow operation names to fetch the output Tensors from. (List<String>, default: <none>)
tensorflow.object.detection.color-agnostic
If disabled (default) the bounding box colors are selected as a function of the object class id. If enabled all bounding boxes are visualized with a single color. (Boolean, default: false)
tensorflow.object.detection.confidence
Probability threshold. Only objects detected with probability higher then the confidence threshold are accepted. Value is between 0 and 1. (Float, default: 0.4)
tensorflow.object.detection.draw-bounding-box
When set to true, the output image will be annotated with the detected object boxes (Boolean, default: true)
tensorflow.object.detection.draw-mask
For models with mask support enable drawing the mask of the detected objects (Boolean, default: true)
tensorflow.object.detection.labels
The text file containing the category names (e.g. labels) of all categories that this model is trained to recognize. Every category is on a separate line. (Resource, default: <none>)
tensorflow.output-name
The output data key used for the Header modes. (String, default: result)

3.22 Pose Estimation Processor

Real-time, multi-person Pose Estimation processor for detecting human figures in images and video. Used for determining where different body parts are located in an image an how are they spatially relate to each other.

Processor is based on the Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields, OpenPose and tf-pose-estimation.

webcamPoseEstimation

The following diagram illustrates a Spring Cloud Data Flow streaming pipeline that predicts body postures from input images in real-time.

scdf tensorflow pose estimation arch

The Pose Estimation processor is configured with a pre-trained Tensorflow model (build with the tf-pose-estimation project). The inference of this model produces auxiliary data structures such a heatmaps with predictions about the parts locations in the image. The post-processing required for selecting the right body parts and grouping them into poses are implemented by the processor using greedy algorithms.

Use the tensorflow.model property to set the pre-trained Tensorflow model. Here are some options available out of the box:

Processor’s input is an image byte array and the output is a JSON message and optionally an image with annotated body poses. The output JSON format looks like:

[
    {
        "limbs": [ {"score": 8.4396105, "from": { "type": "lShoulder", "y": 56, "x": 160 }, "to": { "type": "lEar", "y": 24, "x": 152 } },
                   { "score": 10.145516, "from": { "type": "neck", "y": 56, "x": 144 }, "to": { "type": "rShoulder", "y": 56, "x": 128 } },
                   { "score": 9.970467, "from": { "type": "neck", "y": 56, "x": 144 }, "to": { "type": "lShoulder", "y": 56, "x": 160 } } ]
    },
    {
        "limbs": [ {"score": 7.85779, "from": { "type": "neck", "y": 48, "x": 328 }, "to": { "type": "rHip", "y": 128, "x": 328 } },
                   {"score": 6.8949876, "from": { "type": "neck", "y": 48, "x": 328 }, "to": { "type": "lHip", "y": 128, "x": 304 } } ]
   }
]

Every entry in the array represents a single body posture found on the image. Bodies are composed of Parts connected by Limbs represented by the limbs collection. Every Limb instance has a PAF confidence score and the from and to parts it connects. The Part instances have a type and coordinates.

[Note]Note

Output image annotated with body pose skeletons When the tensorflow.mode=header property is set the JSON metadata passed inside the output message header while the payload contains a copy of the input image. If the tensorflow.pose.estimation.drawPoses=true is set the copied input image is augmented with the poses described in the JSON metadata.

3.22.1 Options

The pose-estimation processor has the following options:

tensorflow.expression
How to obtain the input data from the input message. If empty it defaults to the input message payload. The headers[myHeaderName] expression to get input data from message's header using myHeaderName as a key. (Expression, default: <none>)
tensorflow.mode
The outbound message can store the inference result either in the payload or in a header with name outputName. The payload mode (default) stores the inference result in the outbound message payload. The inbound payload is discarded. The header mode stores the inference result in outbound message's header defined by the outputName property. The the inbound message payload is passed through to the outbound such. (OutputMode, default: <none>, possible values: payload,header)
tensorflow.model
The location of the pre-trained TensorFlow model file. The file, http and classpath schemas are supported. For archive locations takes the first file with '.pb' extension. Use the URI fragment parameter to specify an exact model name (e.g. https://foo/bar/model.tar.gz#frozen_inference_graph.pb) (Resource, default: <none>)
tensorflow.model-fetch
The TensorFlow graph model outputs. Comma separate list of TensorFlow operation names to fetch the output Tensors from. (List<String>, default: <none>)
tensorflow.output-name
The output data key used for the Header modes. (String, default: result)
tensorflow.pose.estimation.body-drawing-color-schema
When drawPoses is enabled, one can decide to draw all body poses in one color (monochrome), have every body pose drawn in an unique color (bodyInstance) or use common color schema drawing different limbs. (BodyDrawingColorSchema, default: <none>, possible values: monochrome,bodyInstance,limbType)
tensorflow.pose.estimation.debug-visualisation-enabled
If enabled the inference operation will produce 4 additional debug visualization of the intermediate processing stages: - PartHeatMap - Part heat map as computed by DL - PafField - PAF limb field as computed by DL - PartCandidates - Part final candidates as computed by the post-processor - LimbCandidates - Limb final candidates as computed by the post-processor Note: Do NOT enable this feature in production or in streaming mode! (Boolean, default: false)
tensorflow.pose.estimation.debug-visualization-output-path
Parent directory to save the debug images produced for the intermediate processing stages (String, default: ./target)
tensorflow.pose.estimation.draw-line-width
When drawPoses is enabled, defines the line width for drawing the limbs (Integer, default: 2)
tensorflow.pose.estimation.draw-part-labels
if drawPoses is enabled, drawPartLabels will show the party type ids and description. (Boolean, default: false)
tensorflow.pose.estimation.draw-part-radius
When drawPoses is enabled, defines the radius of the oval drawn for each part instance (Integer, default: 4)
tensorflow.pose.estimation.draw-poses
When set to true, the output image will be augmented with the computed person skeletons (Boolean, default: true)
tensorflow.pose.estimation.min-body-part-count
Minimum number of parts a body should contain. Body instances with less parts are discarded. (Integer, default: 5)
tensorflow.pose.estimation.nms-threshold
Only return instance detections that have part score greater or equal to this value. (Float, default: 0.15)
tensorflow.pose.estimation.nms-window-size
Non-maximum suppression (NMS) distance for Part instances. Two parts suppress each other if they are less than `nmsWindowSize` pixels away. (Integer, default: 4)
tensorflow.pose.estimation.paf-count-threshold
Minimum number of integration intervals with paf score above the stepPafScoreThreshold, to consider the parts connected. (Integer, default: 2)
tensorflow.pose.estimation.step-paf-score-threshold
Minimal paf score between two Parts at individual integration step, to consider the parts connected (Float, default: 0.1)
tensorflow.pose.estimation.total-paf-score-threshold
Minimal paf score between two parts to consider them being connected and part of the same limb (Float, default: 4.4)