23. Creating a Stream

The Spring Cloud Data Flow Server exposes a full RESTful API for managing the lifecycle of stream definitions, but the easiest way to use is it is via the Spring Cloud Data Flow shell. Start the shell as described in the Getting Started section.

New streams are created by with the help of stream definitions. The definitions are built from a simple DSL. For example, let’s walk through what happens if we execute the following shell command:

dataflow:> stream create --definition "time | log" --name ticktock

This defines a stream named ticktock based off the DSL expression time | log. The DSL uses the "pipe" symbol |, to connect a source to a sink.

Then to deploy the stream execute the following shell command (or alternatively add the --deploy flag when creating the stream so that this step is not needed):

dataflow:> stream deploy --name ticktock

The Data Flow Server resolves time and log to maven coordinates and uses those to launch the time and log applications of the stream.

2016-06-01 09:41:21.728  INFO 79016 --- [nio-9393-exec-6] o.s.c.d.spi.local.LocalAppDeployer       : deploying app ticktock.log instance 0
   Logs will be in /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log
2016-06-01 09:41:21.914  INFO 79016 --- [nio-9393-exec-6] o.s.c.d.spi.local.LocalAppDeployer       : deploying app ticktock.time instance 0
   Logs will be in /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481910/ticktock.time

In this example, the time source simply sends the current time as a message each second, and the log sink outputs it using the logging framework. You can tail the stdout log (which has an "_<instance>" suffix). The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above.

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

23.1 Application properties

Application properties are the properties associated with each application in the stream. When the application is deployed, the application properties are applied to the application via command line arguments or environment variables based on the underlying deployment implementation.

23.1.1 Passing application properties when creating a stream

The following stream

dataflow:> stream create --definition "time | log" --name ticktock

can have application properties defined at the time of stream creation.

The shell command app info <appType>:<appName> displays the white-listed application properties for the application. For more info on the property white listing refer to Section 21.1, “Whitelisting application properties”

Below are the white listed properties for the app time:

dataflow:> app info source:time
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit             │The TimeUnit to apply to delay│<none>                        │java.util.concurrent.TimeUnit ║
║                              │values.                       │                              │                              ║
║trigger.fixed-delay           │Fixed delay for periodic      │1                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.cron                  │Cron expression value for the │<none>                        │java.lang.String              ║
║                              │Cron Trigger.                 │                              │                              ║
║trigger.initial-delay         │Initial delay for periodic    │0                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.max-messages          │Maximum messages per poll, -11                             │java.lang.Long                ║
║                              │means infinity.               │                              │                              ║
║trigger.date-format           │Format for the date value.    │<none>                        │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

Below are the white listed properties for the app log:

dataflow:> app info sink:log
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

The application properties for the time and log apps can be specified at the time of stream creation as follows:

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

Note that the properties fixed-delay and level defined above for the apps time and log are the 'short-form' property names provided by the shell completion. These 'short-form' property names are applicable only for the white-listed properties and in all other cases, only fully qualified property names should be used.

23.2 Deployment properties

When deploying the stream, properties that control the deployment of the apps into the target platform are known as deployment properties. For instance, one can specify how many instances need to be deployed for the specific application defined in the stream using the deployment property called count.

23.2.1 Passing instance count as deployment property

If you would like to have multiple instances of an application in the stream, you can include a deployer property with the deploy command:

dataflow:> stream deploy --name ticktock --properties "deployer.time.count=3"

Note that count is the reserved property name used by the underlying deployer. Hence, if the application also has a custom property named count, it is not supported when specified in 'short-form' form during stream deployment as it could conflict with the instance count deployer property. Instead, the count as a custom application property can be specified in its fully qualified form (example: app.foo.bar.count) during stream deployment or it can be specified using 'short-form' or fully qualified form during the stream creation where it will be considered as an app property.

23.2.2 Inline vs file reference properties

When using the Spring Cloud Data Flow Shell, there are two ways to provide deployment properties: either inline or via a file reference. Those two ways are exclusive and documented below:

Inline properties
use the --properties shell option and list properties as a comma separated list of key=value pairs, like so:
stream deploy foo
    --properties "deployer.transform.count=2,app.transform.producer.partitionKeyExpression=payload"
Using a file reference
use the --propertiesFile option and point it to a local .properties, .yaml or .yml file (i.e. that lives in the filesystem of the machine running the shell). Being read as a .properties file, normal rules apply (ISO 8859-1 encoding, =, <space> or : delimiter, etc.) although we recommend using = as a key-value pair delimiter for consistency:
stream deploy foo --propertiesFile myprops.properties

where myprops.properties contains:

deployer.transform.count=2
app.transform.producer.partitionKeyExpression=payload

Both the above properties will be passed as deployment properties for the stream foo above.

In case of using YAML as the format for the deployment properties, use the .yaml or .yml file extention when deploying the stream,

stream deploy foo --propertiesFile myprops.yaml

where myprops.yaml contains:

deployer:
  transform:
    count: 2
app:
  transform:
    producer:
      partitionKeyExpression: payload

23.2.3 Passing application properties when deploying a stream

The application properties can also be specified when deploying a stream. When specified during deployment, these application properties can either be specified as 'short-form' property names (applicable for white-listed properties) or fully qualified property names. The application properties should have the prefix "app.<appName/label>".

For example, the stream

dataflow:> stream create --definition "time | log" --name ticktock

can be deployed with application properties using the 'short-form' property names:

dataflow:>stream deploy ticktock --properties "app.time.fixed-delay=5,app.log.level=ERROR"

When using the app label,

stream create ticktock --definition "a: time | b: log"

the application properties can be defined as:

stream deploy ticktock --properties "app.a.fixed-delay=4,app.b.level=ERROR"

23.2.4 Passing Spring Cloud Stream properties for the application

Spring Cloud Data Flow sets the required Spring Cloud Stream properties for the applications inside the stream. Most importantly, the spring.cloud.stream.bindings.<input/output>.destination is set internally for the apps to bind.

If someone wants to override any of the Spring Cloud Stream properties, they can be set via deployment properties.

For example, for the below stream

dataflow:> stream create --definition "http | transform --expression=payload.getValue('hello').toUpperCase() | log" --name ticktock

if there are multiple binders available in the classpath for each of the applications and the binder is chosen for each deployment then the stream can be deployed with the specific Spring Cloud Stream properties as:

dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.bindings.output.binder=kafka,app.transform.spring.cloud.stream.bindings.input.binder=kafka,app.transform.spring.cloud.stream.bindings.output.binder=rabbit,app.log.spring.cloud.stream.bindings.input.binder=rabbit"
[Note]Note

Overriding the destination names is not recommended as Spring Cloud Data Flow takes care of setting this internally.

23.2.5 Passing per-binding producer consumer properties

A Spring Cloud Stream application can have producer and consumer properties set per-binding basis. While Spring Cloud Data Flow supports specifying short-hand notation for per binding producer properties such as partitionKeyExpression, partitionKeyExtractorClass as described in Section 23.2.6, “Passing stream partition properties during stream deployment”, all the supported Spring Cloud Stream producer/consumer properties can be set as Spring Cloud Stream properties for the app directly as well.

The consumer properties can be set for the inbound channel name with the prefix app.[app/label name].spring.cloud.stream.bindings.<channelName>.consumer. and the producer properties can be set for the outbound channel name with the prefix app.[app/label name].spring.cloud.stream.bindings.<channelName>.producer.. For example, the stream

dataflow:> stream create --definition "time | log" --name ticktock

can be deployed with producer/consumer properties as:

dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.bindings.output.producer.requiredGroups=myGroup,app.time.spring.cloud.stream.bindings.output.producer.headerMode=raw,app.log.spring.cloud.stream.bindings.input.consumer.concurrency=3,app.log.spring.cloud.stream.bindings.input.consumer.maxAttempts=5"

The binder specific producer/consumer properties can also be specified in a similar way.

For instance

dataflow:>stream deploy ticktock --properties "app.time.spring.cloud.stream.rabbit.bindings.output.producer.autoBindDlq=true,app.log.spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true"

23.2.6 Passing stream partition properties during stream deployment

A common pattern in stream processing is to partition the data as it is streamed. This entails deploying multiple instances of a message consuming app and using content-based routing so that messages with a given key (as determined at runtime) are always routed to the same app instance. You can pass the partition properties during stream deployment to declaratively configure a partitioning strategy to route each message to a specific consumer instance.

See below for examples of deploying partitioned streams:

app.[app/label name].producer.partitionKeyExtractorClass
The class name of a PartitionKeyExtractorStrategy (default null)
app.[app/label name].producer.partitionKeyExpression
A SpEL expression, evaluated against the message, to determine the partition key; only applies if partitionKeyExtractorClass is null. If both are null, the app is not partitioned (default null)
app.[app/label name].producer.partitionSelectorClass
The class name of a PartitionSelectorStrategy (default null)
app.[app/label name].producer.partitionSelectorExpression
A SpEL expression, evaluated against the partition key, to determine the partition index to which the message will be routed. The final partition index will be the return value (an integer) modulo [nextModule].count. If both the class and expression are null, the underlying binder’s default PartitionSelectorStrategy will be applied to the key (default null)

In summary, an app is partitioned if its count is > 1 and the previous app has a partitionKeyExtractorClass or partitionKeyExpression (class takes precedence). When a partition key is extracted, the partitioned app instance is determined by invoking the partitionSelectorClass, if present, or the partitionSelectorExpression % partitionCount, where partitionCount is application count in the case of RabbitMQ, and the underlying partition count of the topic in the case of Kafka.

If neither a partitionSelectorClass nor a partitionSelectorExpression is present the result is key.hashCode() % partitionCount.

23.2.7 Passing application content type properties

In a stream definition you can specify that the input or the output of an application need to be converted to a different type. You can use the inputType and outputType properties to specify the content type for the incoming data and outgoing data, respectively.

For example, consider the following stream:

dataflow:>stream create tuple --definition "http | filter --inputType=application/x-spring-tuple
 --expression=payload.hasFieldName('hello') | transform --expression=payload.getValue('hello').toUpperCase()
 | log" --deploy

The http app is expected to send the data in JSON and the filter app receives the JSON data and processes it as a Spring Tuple. In order to do so, we use the inputType property on the filter app to convert the data into the expected Spring Tuple format. The transform application processes the Tuple data and sends the processed data to the downstream log application.

When sending some data to the http application:

dataflow:>http post --data {"hello":"world","foo":"bar"} --contentType application/json --target http://localhost:<http-port>

At the log application you see the content as follows:

INFO 18745 --- [transform.tuple-1] log.sink                                 : WORLD

Depending on how applications are chained, the content type conversion can be specified either as via the --outputType in the upstream app or as an --inputType in the downstream app. For instance, in the above stream, instead of specifying the --inputType on the 'transform' application to convert, the option --outputType=application/x-spring-tuple can also be specified on the 'http' application.

For the complete list of message conversion and message converters, please refer to Spring Cloud Stream documentation.

23.2.8 Overriding application properties during stream deployment

Application properties that are defined during deployment override the same properties defined during the stream creation.

For example, the following stream has application properties defined during stream creation:

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

To override these application properties, one can specify the new property values during deployment:

dataflow:>stream deploy ticktock --properties "app.time.fixed-delay=4,app.log.level=ERROR"