Register a Stream App with the App Registry using the Spring Cloud Data Flow Shell
app register
command. You must provide a unique name, application type, and a URI that can be
resolved to the app artifact. For the type, specify "source", "processor", or "sink".
Here are a few examples:
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1-SNAPSHOT dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar dataflow:>app register --name mysink --type sink --uri http://example.com/mysink-2.0.1.jar
When providing a URI with the maven
scheme, the format should conform to the following:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
For example, if you would like to register the snapshot versions of the http
and log
applications built with the RabbitMQ binder, you could do the following:
dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
If you would like to register multiple apps at one time, you can store them in a properties file
where the keys are formatted as <type>.<name>
and the values are the URIs.
For example, if you would like to register the snapshot versions of the http
and log
applications built with the RabbitMQ binder, you could have the following in a properties file [eg: stream-apps.properties]:
source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
Then to import the apps in bulk, use the app import
command and provide the location of the properties file via --uri
:
dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties
For convenience, we have the static files with application-URIs (for both maven and docker) available for all the out-of-the-box stream and task/batch app-starters. You can point to this file and import all the application-URIs in bulk. Otherwise, as explained in previous paragraphs, you can register them individually or have your own custom property file with only the required application-URIs in it. It is recommended, however, to have a "focused" list of desired application-URIs in a custom property file.
List of available Stream Application Starters:
Artifact Type | Stable Release | SNAPSHOT Release |
---|---|---|
RabbitMQ + Maven | bit.ly/Bacon-BUILD-SNAPSHOT-stream-applications-rabbit-maven | |
RabbitMQ + Docker | N/A | |
Kafka 0.9 + Maven | bit.ly/Bacon-BUILD-SNAPSHOT-stream-applications-kafka-09-maven | |
Kafka 0.9 + Docker | N/A | |
Kafka 0.10 + Maven | bit.ly/Bacon-BUILD-SNAPSHOT-stream-applications-kafka-10-maven | |
Kafka 0.10 + Docker | N/A |
List of available Task Application Starters:
Artifact Type | Stable Release | SNAPSHOT Release |
---|---|---|
Maven | ||
Docker | N/A |
You can find more information about the available task starters in the Task App Starters Project Page and related reference documentation. For more information about the available stream starters look at the Stream App Starters Project Page and related reference documentation.
As an example, if you would like to register all out-of-the-box stream applications built with the RabbitMQ binder in bulk, you can with the following command.
dataflow:>app import --uri http://bit.ly/Bacon-RELEASE-stream-applications-rabbit-maven
You can also pass the --local
option (which is true
by default) to indicate whether the
properties file location should be resolved within the shell process itself. If the location should
be resolved from the Data Flow Server process, specify --local false
.
Warning | |
---|---|
When using either Note however that once downloaded, applications may be cached locally on the Data Flow server, based on the resource
location. If the resource location doesn’t change (even though the actual resource bytes may be different), then it
won’t be re-downloaded. When using Moreover, if a stream is already deployed and using some version of a registered app, then (forcibly) re-registering a different app will have no effect until the stream is deployed anew. |
Note | |
---|---|
In some cases the Resource is resolved on the server side, whereas in others the URI will be passed to a runtime container instance where it is resolved. Consult the specific documentation of each Data Flow Server for more detail. |
Stream and Task applications are Spring Boot applications which are aware of many Section 26.3.3, “Common application properties”, e.g. server.port
but also families of properties such as those with the prefix spring.jmx
and logging
. When creating your own application it is desirable to whitelist properties so that the shell and the UI can display them first as primary properties when presenting options via TAB completion or in drop-down boxes.
To whitelist application properties create a file named spring-configuration-metadata-whitelist.properties
in the META-INF
resource directory. There are two property keys that can be used inside this file. The first key is named configuration-properties.classes
. The value is a comma separated list of fully qualified @ConfigurationProperty
class names. The second key is configuration-properties.names
whose value is a comma separated list of property names. This can contain the full name of property, such as server.port
or a partial name to whitelist a category of property names, e.g. spring.jmx
.
The Spring Cloud Stream application starters are a good place to look for examples of usage. Here is a simple example of the file sink’s spring-configuration-metadata-whitelist.properties
file
configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties
If we also wanted to add server.port
to be white listed, then it would look like this:
configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties configuration-properties.names=server.port
Important | |
---|---|
Make sure to add 'spring-boot-configuration-processor' as an optional dependency to generate configuration metadata file for the properties. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> |
You can go a step further in the process of describing the main properties that your stream or task app supports by creating a so-called metadata companion artifact. This simple jar file contains only the Spring boot JSON file about configuration properties metadata, as well as the whitelisting file described in the previous section.
Here is the contents of such an artifact, for the canonical log
sink:
$ jar tvf log-sink-rabbit-1.2.1.BUILD-SNAPSHOT-metadata.jar 373848 META-INF/spring-configuration-metadata.json 174 META-INF/spring-configuration-metadata-whitelist.properties
Note that the spring-configuration-metadata.json
file is quite large. This is because it contains the concatenation of all the properties that
are available at runtime to the log
sink (some of them come from spring-boot-actuator.jar
, some of them come from
spring-boot-autoconfigure.jar
, even some more from spring-cloud-starter-stream-sink-log.jar
, etc.) Data Flow
always relies on all those properties, even when a companion artifact is not available, but here all have been merged
into a single file.
To help with that (as a matter of fact, you don’t want to try to craft this giant JSON file by hand), you can use the following plugin in your build:
<plugin> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId> <executions> <execution> <id>aggregate-metadata</id> <phase>compile</phase> <goals> <goal>aggregate-metadata</goal> </goals> </execution> </executions> </plugin>
Note | |
---|---|
This plugin comes in addition to the |
The benefits of a companion artifact are manifold:
app info
or the Dashboard UIRemember though, that this is entirely optional when dealing with uberjars. The uberjar itself also includes the metadata in it already.
Once you have a companion artifact at hand, you need to make the system aware of it so that it can be used.
When registering a single app via app register
, you can use the optional --metadata-uri
option in the shell, like so:
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-kafka-10:1.2.1.BUILD-SNAPSHOT --metadata-uri=maven://org.springframework.cloud.stream.app:log-sink-kafka-10:jar:metadata:1.2.1.BUILD-SNAPSHOT
When registering several files using the app import
command, the file should contain a <type>.<name>.metadata
line
in addition to each <type>.<name>
line. This is optional (i.e. if some apps have it but some others don’t, that’s fine).
Here is an example for a Dockerized app, where the metadata artifact is being hosted in a Maven repository (but retrieving
it via http://
or file://
would be equally possible).
... source.http=docker:springcloudstream/http-source-rabbit:latest source.http.metadata=maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:metadata:1.2.1.BUILD-SNAPSHOT ...
While there are out of the box source, processor, sink applications available, one can extend these applications or write a custom Spring Cloud Stream application.
The process of creating Spring Cloud Stream applications via Spring Initializr is detailed in the Spring Cloud Stream documentation. It is possible to include multiple binders to an application. If doing so, refer the instructions in the section called “Passing Spring Cloud Stream properties for the application” on how to configure them.
For supporting property whitelisting, Spring Cloud Stream applications running in Spring Cloud Data Flow may include the Spring Boot configuration-processor
as an optional dependency, as in the following example.
<dependencies> <!-- other dependencies --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies>
Note | |
---|---|
Make sure that the |
Once a custom application has been created, it can be registered as described in Section 26.1, “Register a Stream App”.
The Spring Cloud Data Flow Server exposes a full RESTful API for managing the lifecycle of stream definitions, but the easiest way to use is it is via the Spring Cloud Data Flow shell. Start the shell as described in the Getting Started section.
New streams are created by with the help of stream definitions. The definitions are built from a simple DSL. For example, let’s walk through what happens if we execute the following shell command:
dataflow:> stream create --definition "time | log" --name ticktock
This defines a stream named ticktock
based off the DSL expression time | log
. The DSL uses the "pipe" symbol |
, to connect a source to a sink.
Then to deploy the stream execute the following shell command (or alternatively add the --deploy
flag when creating the stream so that this step is not needed):
dataflow:> stream deploy --name ticktock
The Data Flow Server resolves time
and log
to maven coordinates and uses those to launch the time
and log
applications of the stream.
2016-06-01 09:41:21.728 INFO 79016 --- [nio-9393-exec-6] o.s.c.d.spi.local.LocalAppDeployer : deploying app ticktock.log instance 0 Logs will be in /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log 2016-06-01 09:41:21.914 INFO 79016 --- [nio-9393-exec-6] o.s.c.d.spi.local.LocalAppDeployer : deploying app ticktock.time instance 0 Logs will be in /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481910/ticktock.time
In this example, the time source simply sends the current time as a message each second, and the log sink outputs it using the logging framework.
You can tail the stdout
log (which has an "_<instance>" suffix). The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above.
$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log 2016-06-01 09:45:11.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:11 2016-06-01 09:45:12.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:12 2016-06-01 09:45:13.251 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:13
Application properties are the properties associated with each application in the stream. When the application is deployed, the application properties are applied to the application via command line arguments or environment variables based on the underlying deployment implementation.
The following stream
dataflow:> stream create --definition "time | log" --name ticktock
can have application properties defined at the time of stream creation.
The shell command app info <appType>:<appName>
displays the white-listed application properties for the application.
For more info on the property white listing refer to Section 26.1.1, “Whitelisting application properties”
Below are the white listed properties for the app time
:
dataflow:> app info source:time ╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗ ║ Option Name │ Description │ Default │ Type ║ ╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣ ║trigger.time-unit │The TimeUnit to apply to delay│<none> │java.util.concurrent.TimeUnit ║ ║ │values. │ │ ║ ║trigger.fixed-delay │Fixed delay for periodic │1 │java.lang.Integer ║ ║ │triggers. │ │ ║ ║trigger.cron │Cron expression value for the │<none> │java.lang.String ║ ║ │Cron Trigger. │ │ ║ ║trigger.initial-delay │Initial delay for periodic │0 │java.lang.Integer ║ ║ │triggers. │ │ ║ ║trigger.max-messages │Maximum messages per poll, -1 │1 │java.lang.Long ║ ║ │means infinity. │ │ ║ ║trigger.date-format │Format for the date value. │<none> │java.lang.String ║ ╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
Below are the white listed properties for the app log
:
dataflow:> app info sink:log ╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗ ║ Option Name │ Description │ Default │ Type ║ ╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣ ║log.name │The name of the logger to use.│<none> │java.lang.String ║ ║log.level │The level at which to log │<none> │org.springframework.integratio║ ║ │messages. │ │n.handler.LoggingHandler$Level║ ║log.expression │A SpEL expression (against the│payload │java.lang.String ║ ║ │incoming message) to evaluate │ │ ║ ║ │as the logged message. │ │ ║ ╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
The application properties for the time
and log
apps can be specified at the time of stream
creation as follows:
dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock
Note that the properties fixed-delay
and level
defined above for the apps time
and log
are the 'short-form' property names provided by the shell completion.
These 'short-form' property names are applicable only for the white-listed properties and in all other cases, only fully qualified property names should be used.
When deploying the stream, properties that control the deployment of the apps into the target platform are known as deployment
properties.
For instance, one can specify how many instances need to be deployed for the specific application defined in the stream using the deployment property called count
.
Starting with version 1.2, the distinction between properties that are meant for the deployed app and properties that
govern how this app is deployed (thanks to some implementation of a
spring cloud deployer) is more explicit. The former should be
passed using the syntax app.<app-name>.<property-name>=<value>
while the latter use the
deployer.<app-name>.<short-property-name>=<value>
The following table recaps the difference in behavior between the two.
Application Properties | Deployer Properties | |
---|---|---|
Example Syntax |
|
|
What the application "sees" |
| Nothing |
What the deployer "sees" | Nothing |
|
Typical usage | Passing/Overriding application properties, passing Spring Cloud Stream binder or partitionning properties | Setting the number of instances, memory, disk, etc. |
If you would like to have multiple instances of an application in the stream, you can include a deployer property with the deploy command:
dataflow:> stream deploy --name ticktock --properties "deployer.time.count=3"
Note that count
is the reserved property name used by the underlying deployer. Hence, if the application also has a custom property named count
, it is not supported
when specified in 'short-form' form during stream deployment as it could conflict with the instance count deployer property. Instead, the count
as a custom application property can be
specified in its fully qualified form (example: app.foo.bar.count
) during stream deployment or it can be specified using 'short-form' or fully qualified form during the stream creation
where it will be considered as an app property.
Important | |
---|---|
See ???. |
When using the Spring Cloud Data Flow Shell, there are two ways to provide deployment properties: either inline or via a file reference. Those two ways are exclusive and documented below:
--properties
shell option and list properties as a comma separated
list of key=value pairs, like so:stream deploy foo
--properties "deployer.transform.count=2,app.transform.producer.partitionKeyExpression=payload"
--propertiesFile
option and point it to a local .properties
, .yaml
or .yml
file
(i.e. that lives in the filesystem of the machine running the shell). Being read
as a .properties
file, normal rules apply (ISO 8859-1 encoding, =
, <space>
or
:
delimiter, etc.) although we recommend using =
as a key-value pair delimiter
for consistency:stream deploy foo --propertiesFile myprops.properties
where myprops.properties
contains:
deployer.transform.count=2 app.transform.producer.partitionKeyExpression=payload
Both the above properties will be passed as deployment properties for the stream foo
above.
In case of using YAML as the format for the deployment properties, use the .yaml
or .yml
file extention when deploying the stream,
stream deploy foo --propertiesFile myprops.yaml
where myprops.yaml
contains:
deployer: transform: count: 2 app: transform: producer: partitionKeyExpression: payload
The application properties can also be specified when deploying a stream. When specified during deployment, these application properties can either be specified as 'short-form' property names (applicable for white-listed properties) or fully qualified property names. The application properties should have the prefix "app.<appName/label>".
For example, the stream
dataflow:> stream create --definition "time | log" --name ticktock
can be deployed with application properties using the 'short-form' property names:
dataflow:>stream deploy ticktock --properties "app.time.fixed-delay=5,app.log.level=ERROR"
When using the app label,
stream create ticktock --definition "a: time | b: log"
the application properties can be defined as:
stream deploy ticktock --properties "app.a.fixed-delay=4,app.b.level=ERROR"
Spring Cloud Data Flow sets the required
Spring Cloud Stream properties for the applications inside the stream. Most importantly, the spring.cloud.stream.bindings.<input/output>.destination
is set internally for the apps to bind.
If someone wants to override any of the Spring Cloud Stream properties, they can be set via deployment properties.
For example, for the below stream
dataflow:> stream create --definition "http | transform --expression=payload.getValue('hello').toUpperCase() | log" --name ticktock
if there are multiple binders available in the classpath for each of the applications and the binder is chosen for each deployment then the stream can be deployed with the specific Spring Cloud Stream properties as:
dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.bindings.output.binder=kafka,app.transform.spring.cloud.stream.bindings.input.binder=kafka,app.transform.spring.cloud.stream.bindings.output.binder=rabbit,app.log.spring.cloud.stream.bindings.input.binder=rabbit"
Note | |
---|---|
Overriding the destination names is not recommended as Spring Cloud Data Flow takes care of setting this internally. |
A Spring Cloud Stream application can have producer and consumer properties set per-binding
basis.
While Spring Cloud Data Flow supports specifying short-hand notation for per binding producer properties such as partitionKeyExpression
, partitionKeyExtractorClass
as described in the section called “Passing stream partition properties during stream deployment”, all the supported Spring Cloud Stream producer/consumer properties can be set as Spring Cloud Stream properties for the app directly as well.
The consumer properties can be set for the inbound
channel name with the prefix app.[app/label name].spring.cloud.stream.bindings.<channelName>.consumer.
and the producer properties can be set for the outbound
channel name with the prefix app.[app/label name].spring.cloud.stream.bindings.<channelName>.producer.
.
For example, the stream
dataflow:> stream create --definition "time | log" --name ticktock
can be deployed with producer/consumer properties as:
dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.bindings.output.producer.requiredGroups=myGroup,app.time.spring.cloud.stream.bindings.output.producer.headerMode=raw,app.log.spring.cloud.stream.bindings.input.consumer.concurrency=3,app.log.spring.cloud.stream.bindings.input.consumer.maxAttempts=5"
The binder
specific producer/consumer properties can also be specified in a similar way.
For instance
dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.rabbit.bindings.output.producer.autoBindDlq=true,app.log.spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true"
A common pattern in stream processing is to partition the data as it is streamed. This entails deploying multiple instances of a message consuming app and using content-based routing so that messages with a given key (as determined at runtime) are always routed to the same app instance. You can pass the partition properties during stream deployment to declaratively configure a partitioning strategy to route each message to a specific consumer instance.
See below for examples of deploying partitioned streams:
null
)partitionKeyExtractorClass
is null. If both are null, the app
is not partitioned (default null
)null
)[nextModule].count
. If both the class and
expression are null, the underlying binder’s default PartitionSelectorStrategy
will be applied to the key (default null
)In summary, an app is partitioned if its count is > 1 and the previous app has a
partitionKeyExtractorClass
or partitionKeyExpression
(class takes precedence).
When a partition key is extracted, the partitioned app instance is determined by
invoking the partitionSelectorClass
, if present, or the partitionSelectorExpression % partitionCount
,
where partitionCount
is application count in the case of RabbitMQ, and the underlying
partition count of the topic in the case of Kafka.
If neither a partitionSelectorClass
nor a partitionSelectorExpression
is
present the result is key.hashCode() % partitionCount
.
In a stream definition you can specify that the input or the output of an application need to be converted to a different type.
You can use the inputType
and outputType
properties to specify the content type for the incoming data and outgoing data, respectively.
For example, consider the following stream:
dataflow:>stream create tuple --definition "http | filter --inputType=application/x-spring-tuple --expression=payload.hasFieldName('hello') | transform --expression=payload.getValue('hello').toUpperCase() | log" --deploy
The http
app is expected to send the data in JSON and the filter
app receives the JSON data
and processes it as a Spring Tuple.
In order to do so, we use the inputType
property on the filter app to convert the data into the expected Spring Tuple format.
The transform
application processes the Tuple data and sends the processed data to the downstream log
application.
When sending some data to the http
application:
dataflow:>http post --data {"hello":"world","foo":"bar"} --contentType application/json --target http://localhost:<http-port>
At the log application you see the content as follows:
INFO 18745 --- [transform.tuple-1] log.sink : WORLD
Depending on how applications are chained, the content type conversion can be specified either as via the --outputType
in the upstream app or as an --inputType
in the downstream app.
For instance, in the above stream, instead of specifying the --inputType
on the 'transform' application to convert, the option --outputType=application/x-spring-tuple
can also be specified on the 'http' application.
For the complete list of message conversion and message converters, please refer to Spring Cloud Stream documentation.
Application properties that are defined during deployment override the same properties defined during the stream creation.
For example, the following stream has application properties defined during stream creation:
dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock
To override these application properties, one can specify the new property values during deployment:
dataflow:>stream deploy ticktock --properties "app.time.fixed-delay=4,app.log.level=ERROR"
In addition to configuration via DSL, Spring Cloud Data Flow provides a mechanism for setting common properties to all
the streaming applications that are launched by it.
This can be done by adding properties prefixed with spring.cloud.dataflow.applicationProperties.stream
when starting
the server.
When doing so, the server will pass all the properties, without the prefix, to the instances it launches.
For example, all the launched applications can be configured to use a specific Kafka broker by launching the Data Flow server with the following options:
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092 --spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181
This will cause the properties spring.cloud.stream.kafka.binder.brokers
and spring.cloud.stream.kafka.binder.zkNodes
to be passed to all the launched applications.
Note | |
---|---|
Properties configured using this mechanism have lower precedence than stream deployment properties.
They will be overridden if a property with the same key is specified at stream deployment time (e.g.
|
You can delete a stream by issuing the stream destroy
command from the shell:
dataflow:> stream destroy --name ticktock
If the stream was deployed, it will be undeployed before the stream definition is deleted.
Often you will want to stop a stream, but retain the name and definition for future use. In that case you can undeploy
the stream by name and issue the deploy
command at a later time to restart it.
dataflow:> stream undeploy --name ticktock dataflow:> stream deploy --name ticktock