1.0.2.RELEASE
Copyright © 2013-2016 Pivotal Software, Inc.
Table of Contents
This section will provide you with a detailed overview of Spring Cloud Stream Application Starters, their purpose, and how to use them. It assumes familiarity with general Spring Cloud Stream concepts, which can be found in the Spring Cloud Stream reference documentation.
Spring Cloud Stream Application Starters provide you with predefined Spring Cloud Stream applications that you can run independently or with Spring Cloud Data Flow. You can also use the starters as a basis for creating your own applications. They include:
You can find a detailed listing of all the starters and as their options in the corresponding section of this guide.
As a user of Spring Cloud Stream Application Starters you have access to two types of artifacts.
Starters are libraries that contain the complete configuration of a Spring Cloud Stream application with a specific role (e.g. an HTTP source that receives HTTP POST requests and forwards the data on its output channel to downstream Spring Cloud Stream applications). Starters are not executable applications, and are intended to be included in other Spring Boot applications, along with a Binder implementation.
Prebuilt applications are Spring Boot applications that include the starters and a Binder implementation. Prebuilt applications are uberjars and include minimal code required to execute standalone. For each starter, the project provides a prebuilt version including the Kafka Binder and a prebuilt version including the Rabbit MQ Binder.
Note | |
---|---|
Only starters are present in the source code of the project. Prebuilt applications are generated according to the Maven plugin configuration. |
Based on their target application type, starters can be either:
You can easily identify the type and functionality of a starter based on its name.
All starters are named following the convention spring-cloud-starter-stream-<type>-<functionality>
.
For example spring-cloud-starter-stream-source-file
is a starter for a file source that polls a directory and sends file data on the output channel (read the reference documentation of the source for details).
Conversely, spring-cloud-starter-stream-sink-cassandra
is a starter for a Cassandra sink that writes the data that it receives on the input channel to Cassandra (read the reference documentation of the sink for details).
The prebuilt applications follow a naming convention too: <functionality>-<type>-<binder>
. For example, cassandra-sink-kafka
is a Cassandra sink using the Kafka binder.
You either get access to the artifacts produced by Spring Cloud Stream Application Starters via Maven, Docker, or building the artifacts yourself.
Starters are available as Maven artifacts in the Spring repositories. You can add them as dependencies to your application, as follows:
<dependency> <group>org.springframework.cloud.stream.app</group> <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency>
From this, you can infer the coordinates for other starters found in this guide.
While the version may vary, the group will always remain org.springframework.cloud.stream.app
and the artifact id follows the naming convention spring-cloud-starter-stream-<type>-<functionality>
described previously.
Prebuilt applications are available as Maven artifacts too.
It is not encouraged to use them directly as dependencies, as starters should be used instead.
Following the typical Maven <group>:<artifactId>:<version>
convention, they can be referenced for example as:
org.springframework.cloud.stream.app:cassandra-sink-rabbit:1.0.0.BUILD-SNAPSHOT
Just as with the starters, you can infer the coordinates for other prebuilt applications found in the guide.
The group will be always org.springframework.cloud.stream.app
.
The version may vary.
The artifact id follows the format <functionality>-<type>-<binder>
previously described.
The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/
. Naming and versioning follows the same general conventions as Maven, e.g.
docker pull springcloudstream/cassandra-sink-kafka
will pull the latest Docker image of the Cassandra sink with the Kafka binder.
You can also build the project and generate the artifacts (including the prebuilt applications) on your own. This is useful if you want to deploy the artifacts locally, for example for adding a new starter, or if you want to build the entire set of artifacts with a new binder.
First, you need to generate the prebuilt applications. This is done by running the application generation Maven plugin. You can do so by simply invoking the corresponding script in the root of the project.
./generate.sh
For the each of the prebuilt applications, the script will generate the following items:
pom.xml
file with the required dependencies (starter and binder)main
method of the application and imports the predefined configurationFor example, spring-cloud-starter-stream-sink-cassandra
will generate cassandra-sink-rabbit
and cassandra-sink-kafka
as completely functional applications.
Apart from accessing the sources, sinks and processors already provided by the project, in this section we will describe how to:
If you want to use one of the applications found in Spring Cloud Stream Application Starters and you want to use one of the predefined binders (i.e. Kafka or Rabbit), you can just use the prebuilt versions of the artifacts. But if you want to connect to a different middleware system, and you have a binder for it, you will create new artifacts.
<dependencies> <!- other dependencies --> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-gemfire</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> </dependencies>
The next step is to create the project’s main class and import the configuration provided by the starter. For example, in the same case of the Cassandra sink it can look like the following:
package org.springframework.cloud.stream.app.cassandra.sink.rabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.app.cassandra.sink.CassandraSinkConfiguration; import org.springframework.context.annotation.Import; @SpringBootApplication @Import(CassandraSinkConfiguration.class) public class CassandraSinkGemfireApplication { public static void main(String[] args) { SpringApplication.run(CassandraSinkGemfireApplication.class, args); } }
Spring Cloud Stream Application Starters consists of regular Spring Cloud Stream applications with some additional conventions that facilitate generating prebuilt applications with the preconfigured binders. Sometimes, your solution may require additional applications that are not in the scope of Spring Cloud Stream Application Starters, or require additional tweaks and enhancements. In this section we will show you how to create custom applications that can be part of your solution, along with Spring Cloud Stream application starters. You have the following options:
If you want to add your own custom applications to your solution, you can simply create a new Spring Cloud Stream project with the binder of your choice and run it the same way as the applications provided by Spring Cloud Stream Application Starters, independently or via Spring Cloud Data Flow. The process is described in the Getting Started Guide of Spring Cloud Stream. One restriction is that the applications must have:
input
for sources - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Source
;output
for sinks - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Sink
;input
and an outbound channel named output
for processors - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Processor
.The other restriction is to use the same kind of binder as the rest of your solution.
You can also reuse the starters provided by Spring Cloud Stream Application Starters to create custom components, enriching the behavior of the application.
For example, you can add a Spring Security layer to your HTTP source, add additional configurations to the ObjectMapper
used for JSON transformation wherever that happens, or change the JDBC driver or Hadoop distribution that the application is using.
For doing so should set up your project following a process similar to customizing a binder.
In fact, customizing the binder is the simplest form of creating a custom component.
As a reminder, this involves:
After doing so, you can simply add the additional configuration for the extra features of your application.
If you’re looking to patch the pre-built applications to accommodate addition of new dependencies, you can use the following example as the reference. Let’s review the steps to add mysql
driver to jdbc-sink
application.
mysql
java-driver dependency<dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-sink-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
jdbc
sink, it is: @Import(org.springframework.cloud.stream.app.jdbc.sink.JdbcSinkConfiguration.class)
. You can find the configuration class for other applications in their respective packages.@SpringBootApplication @Import(org.springframework.cloud.stream.app.jdbc.sink.JdbcSinkConfiguration.class) public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
jdbc-sink
application now includes mysql
driver in itIn this section, we will explain how to develop a custom source/sink/processor application and then generate maven and docker artifacts for it with the necessary middleware bindings using the existing tooling provided by the spring cloud stream app starter infrastructure. For explanation purposes, we will assume that we are creating a new source application for a technology named foobar.
Please follow the instructions above for designing a proper Spring Cloud Stream Source. You may also look into the existing
starters for how to structure a new one. The default naming for the main @Configuration
class is
FoobarSourceConfiguration
and the default package for this @Configuration
is org.springfamework.cloud.stream.app.foobar.source
. If you have a different class/package name, see below for
overriding that in the app generator. The technology/functionality name for which you create
a starter can be a hyphenated stream of strings such as in scriptable-transform
which is a processor type in the
module spring-cloud-starter-stream-processor-scriptable-transform
.
The starters in spring-cloud-stream-app-starters
are slightly different from the other starters in spring-boot and
spring-cloud in that here we don’t provide a way to auto configure any configuration through spring factories mechanism.
Rather, we delegate this responsibility to the maven plugin that is generating the binder based apps. Therefore, you don’t
have to provide a spring.factories file that lists all your configuration classes.
./mvnw clean install -pl :spring-cloud-starter-stream-source-foobar
spring-cloud-stream-app-dependencies
bill of material (BOM) in the
dependecy management section. For example,<dependencyManagement> ... ... <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-source-foobar</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> ... ...
./mvnw clean install -pl :spring-cloud-stream-app-dependencies
spring-cloud-stream-app-generator
module and start editing as below.The minimal configuration needed to generate the app is to add to plugin configuration in spring-cloud-stream-app-generator/pom.xml. There are other plugin options that customize the generated applications which are described in the plugin documentation (github.com/spring-cloud/spring-cloud-stream-app-maven-plugin). A few plugin features are described below.
<generatedApps> .... <foobar-source /> .... </generatedApps>
More information about the maven plugin used above can be found here: github.com/spring-cloud/spring-cloud-stream-app-maven-plugin
If you did not follow the default convention expected by the plugin of where it is looking for the main configuration
class, which is org.springfamework.cloud.stream.app.foobar.source.FoobarSourceConfiguration
, you can override that in
the configuration for the plugin. For example, if your main configuration class is foo.bar.SpecialFooBarConfiguration.class
,
this is how you can tell the plugin to override the default.
<foobar-source> <autoConfigClass>foo.bar.SpecialFooBarConfiguration.class</autoConfigClass> </foobar-source>
./generateApps.sh
This will generate the binder based foobar source apps in a directory named apps
at the root of the repository.
If you want to change the location where the apps are generated, for instance /tmp/scs-apps, you can do it in the
configuration section of the plugin.
<configuration> ... <generatedProjectHome>/tmp/scs-apps</generatedProjectHome> ... </configuration
By default, we generate apps for both Kafka and Rabbitmq binders - spring-cloud-stream-binder-kafka
and
spring-cloud-stream-binder-rabbit
. Say, if you have a custom binder you created for some middleware (say JMS),
which you need to generate apps for foobar source, you can add that binder to the binders list in the configuration
section as in the following.
<binders> <kafka /> <rabbit /> <jms /> </binders>
Please note that this would only work, as long as there is a binder with the maven coordinates of
org.springframework.cloud.stream
as group id and spring-cloud-stream-binder-jms
as artifact id.
This artifact needs to be specified in the BOM above and available through a maven repository as well.
If you have an artifact that is only available through a private internal maven repository (may be an enterprise wide Nexus repo that you use globally across teams), and you need that for your app, you can define that as part of the maven plugin configuration.
For example,
<configuration> ... <extraRepositories> <repository> <id>private-internal-nexus</id> <url>.../</url> <name>...</name> <snapshotEnabled>...</snapshotEnabled> </repository> </extraRepositories> </configuration>
Then you can define this as part of your app tag:
<foobar-source> <extraRepositories> <private-internal-nexus /> </extraRepositories> </foobar-source>
apps
at the root of the repository by default, unless you changed
it elsewhere as described above).Here you will see both foobar-source-kafka
and foobar-source-rabbit
along with all the other out of the box apps that
is generated. If you added more binders as described above, you would see that app as well here - for example foobar-source-jms.
If you only care about the foobar-source apps and nothing else, you can cd into those particular foo-bar-[binder]
directories and import them directly into your IDE of choice. Each of them is a self contained spring boot application project.
For all the generated apps, the parent is spring-boot-starter-parent
as required by the underlying Spring Initializr library.
You can cd into these custom foobar-source directories and do the following to build the apps:
cd foo-source-kafka
mvn clean install
This would install the foo-source-kafka into your local maven cache (~/.m2 by default).
The app generation phase adds an integration test to the app project that is making sure that all the spring
components and contexts are loaded properly. However, these tests are not run by default when you do a mvn install
.
You can force the running of these tests by doing the following:
mvn clean install -DskipTests=false
One important note about running these tests in generated apps: If your application’s spring beans need to interact with
some real services out there or expect some properties be present in the context, these tests would fail unless you make
those things available. An example would be a Twitter Source, where the underlying spring beans are trying to create a
twitter template and would fail if it can’t find the credentials available through properties. One way to solve this and
still run the generated context load tests would be to create a mock class that provides these properties or mock beans
(for example, a mock twitter template) and tell the maven plugin about its existence. You can use the existing module
app-starters-test-support
for this purpose and add the mock class there.
See the class org.springframework.cloud.stream.app.test.twitter.TwitterTestConfiguration
for reference.
You can create a similar class for your foobar source - FoobarTestConfiguration
and add that to the plugin configuration.
You only need to do this if you run into this particular issue of spring beans are not created properly in the
integration test in the generated apps.
<foobar-source> <extraTestConfigClass>org.springframework.cloud.stream.app.test.foobar.FoobarTestConfiguration.class</extraTestConfigClass> </foobar-source>
When you do the above, this test configuration will be automatically imported into the context of your test class.
Also note that, you need to rerun the script for generating the apps each time you make a configuration change in the plugin.
target
directories of the respective apps and also as
maven artifacts in your local maven repository. Go to the target
directory and run the following:java -jar foobar-source-kafa.jar
[Ensure that you have kafka running locally when you do this]
It should start the application up.
mvn clean package docker:build
This creates the docker image under the target/docker/springcloudstream
directory. Please ensure that the Docker
container is up and running and DOCKER_HOST environment variable is properly set before you try docker:build
.
All the generated apps from the repository are uploaded to Docker Hub
However, for a custom app that you build, this won’t be uploaded to docker hub under springcloudstream
repository.
If you think that there is a general need for this app, you should try contributing this starter to the main repository
and upon review, this app then can be uploaded to the above location in docker hub.
If you still need to push this to docker hub under a different repository you can take the following steps.
Go to the pom.xml of the generated app [ example - foo-source-kafka/pom.xml
]
Search for springcloudstream
. Replace with your repository name.
Then do this:
mvn clean package docker:build docker:push -Ddocker.username=[provide your username] -Ddocker.password=[provide password]
This would upload the docker image to the docker hub in your custom repository.
This application polls a directory and sends new files or their contents to the output channel.
The file source provides the contents of a File as a byte array by default.
However, this can be customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The file source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)<none>
)<none>
)<none>
)true
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)The ref
option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The ftp source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: ACTIVE
,PASSIVE
)<none>
)<none>
)21
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)A source module that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the
Spring Boot JDBC Support for more
information.
The jdbc source has the following options:
0
)<none>
)true
)<none>
)<none>
)<none>
)true
)<none>
)<none>
)<none>
)<none>
)<none>
)1
)0
)1
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
The "jms" source enables receiving messages from JMS.
The jms source has the following options:
<none>
)<none>
)<none>
)true
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: AUTO
,CLIENT
,DUPS_OK
)true
)<none>
)<none>
)false
)Note | |
---|---|
Spring boot broker configuration is used; refer to the
Spring Boot Documentation for more information.
The |
A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.
The load-generator source has the following options:
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/testing/spring-cloud-starter-stream-source-mail/README.adoc[tags=ref-doc]
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/testing/spring-cloud-starter-stream-source-mongodb/README.adoc[tags=ref-doc]
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.
The rabbit source has the following options:
false
)1000
)[STANDARD_REQUEST_HEADERS]
)3
)30000
)<none>
)true
)2
)false
)<none>
)localhost
)<none>
)5672
)<none>
)<none>
)<none>
)Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
Note | |
---|---|
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream |
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/s3/spring-cloud-starter-stream-source-s3/README.adoc[tags=ref-doc]
This source app supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The sftp source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)<none>
)<none>
)false
)<none>
)<none>
)<none>
)<empty string>
)<none>
)22
)<empty string>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.
The syslog source has the following options:
2048
)false
)1514
)tcp
)false
)3164
)0
)The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
2048
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
)<none>
)<none>
)<none>
)<none>
)Text Data
Text and Binary Data
The time source will simply emit a String with the current time every so often.
The time source has the following options:
<none>
)<none>
)1
)0
)1
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can also be sent each time the trigger fires.
The trigger source has the following options:
<none>
)<none>
)1
)0
)1
)<none>
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
The twitterstream source has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: SAMPLE
,FIREHOSE
)Note | |
---|---|
|
A Processor module that returns messages that is passed by connecting just the input and output channels.
Use the filter module in a stream to determine whether a Message should be passed to the output channel.
A Processor module that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
<none>
)<none>
)<none>
)A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
<none>
)<none>
)<none>
)A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
The httpclient processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)body
)<none>
)A processor that evaluates a machine learning model stored in PMML format.
The pmml processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
<none>
)<none>
)<none>
)<none>
)The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
true
)<none>
)<none>
)<none>
)<none>
)true
)When no expression
, fileMarkers
, or charset
is provided, a DefaultMessageSplitter
is configured with (optional) delimiters
.
When fileMarkers
or charset
is provided, a FileSplitter
is configured (you must provide either a fileMarkers
or charset
to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter
is configured.
When splitting File
payloads, the sequenceSize
header is zero because the size cannot be determined at the beginning.
Caution | |
---|---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>')
.
For example, consider the following JSON:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }}
and an expression #jsonPath(payload, '$.store.book')
; the result will be 4 messages, each with a Map
payload
containing the properties of a single book.
Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload. For example, --expression=payload.toUpperCase()
.
This transform will convert all message payloads to upper case.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')
This sink application writes the content of each message it receives into Cassandra.
The cassandra sink has the following options:
<none>
, possible values: NONE
,SNAPPY
)<none>
)false
)[]
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: NONE
,CREATE
,RECREATE
,RECREATE_DROP_UNUSED
)<none>
)<none>
, possible values: ANY
,ONE
,TWO
,THREE
,QUOROM
,LOCAL_QUOROM
,EACH_QUOROM
,ALL
,LOCAL_ONE
,SERIAL
,LOCAL_SERIAL
)<none>
)<none>
)<none>
, possible values: DEFAULT
,DOWNGRADING_CONSISTENCY
,FALLTHROUGH
,LOGGING
)<none>
)0
)The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.
The counter sink has the following options:
<none>
)<none>
)0
)localhost
)<none>
)6379
)0
)A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload. This sinks supports the following payload types out of the box:
For example suppose a message source produces a payload with a field named user :
class Foo { String user; public Foo(String user) { this.user = user; } }
If the stream source produces messages with the following objects:
new Foo("fred") new Foo("sue") new Foo("dave") new Foo("sue")
The field value counter on the field user will contain:
fred:1, sue:2, dave:1
Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:
users:["dave","fred","sue"] users:["sue","jon"]
The field value counter on the field users will contain:
dave:1, fred:1, sue:2, jon:1
The field-value-counter sink has the following options:
<none>
)<none>
)<none>
)0
)localhost
)<none>
)6379
)0
)This module writes each message it receives to a file.
The file sink has the following options:
false
)UTF-8
)<none>
)<none>
)<none>
, possible values: APPEND
,FAIL
,IGNORE
,REPLACE
)file-sink
)<none>
)<empty string>
)FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages could be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
Note | |
---|---|
By default Spring Integration will use |
The ftp sink has the following options:
<none>
)<none>
)<none>
, possible values: APPEND
,FAIL
,IGNORE
,REPLACE
)<none>
)<none>
)<none>
)<none>
)<none>
)The Gemfire sink allows one to write message payloads to a Gemfire server.
The gemfire sink has the following options:
false
)<none>
)<none>
, possible values: locator
,server
)<none>
)false
)<none>
)A sink module that route messages into GPDB/HAWQ
segments via
gpfdist protocol. Internally, this sink creates a custom http listener that supports
the gpfdist
protcol and schedules a task that orchestrates a gploadd
session in the
same way it is done natively in Greenplum.
No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.
The gpfdist sink has the following options:
100
)10
)4
)<none>
)<none>
)localhost
)gpadmin
)gpadmin
)5432
)gpadmin
)
)<none>
)100
)2
)0
)<none>
)<none>
)<none>
)0
)<none>
)<none>
, possible values: ROWS
,PERCENT
)<none>
)<none>
)<none>
)<none>
)false
)<none>
)<none>
)false
)<none>
)Within a gpfdist
sink we have a Reactor based stream where data is published from the incoming SI channel.
This channel receives data from the Message Bus. The Reactor stream is then connected to Netty
based
http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among
existing http clients. When Greenplum
does a load from an external table, each segment will initiate
a http connection and start loading data. The net effect is that incoming data is automatically spread
among the Greenplum segments.
The gpfdist sink supports the following configuration properties:
Database table to work with. (String, default: ``, required)
This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.
Currently table
is only way to define a structure of an external
table. Effectively it will replace other_table
in below clause
segment.
CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
Gpfdist mode, either `insert` or `update`. (String, default: insert
)
Currently only insert
and update
gpfdist mode is supported. Mode
merge
familiar from a native gpfdist loader is not yet supported.
For mode update
options matchColumns
and updateColumns
are
required.
Data record column delimiter. (Character, default: ``)
Defines used delimiter
character in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[DELIMITER AS 'delimiter']
Error reject limit. (String, default: ``)
Defines a count
value in a below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
As a conveniance this reject limit also recognizes a percentage format
2%
and if used, segmentRejectType
is automatically set to
percent
.
Error reject type, either `rows` or `percent`. (String, default: ``)
Defines ROWS
or PERCENT
in below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Tablename to log errors. (String, default: ``)
As error table is optional with SEGMENT REJECT LIMIT
, it’s only used
if both segmentRejectLimit
and segmentRejectType
are set. Sets
error_table
in below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Null string definition. (String, default: ``)
Defines used null string
in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[NULL AS 'null string']
Data record delimiter for incoming messages. (String, default: \n
)
On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.
If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered. | ||
-- External Table Docs |
Comma delimited list of columns to match. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Comma delimited list of columns to update. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Debug rate of data transfer. (Integer, default: 0
)
If set to non zero, sink will log a rate of messages passing throught
a sink after number of messages denoted by this setting has been
processed. Value 0
means that this rate calculation and logging is
disabled.
Max collected size per windowed data. (Integer, default: 100
)
Note | |
---|---|
For more info on flush and batch settings, see above. |
There are few important concepts involving how data passes into a sink, through it and finally lands into a database.
flushTime
) or timeouts(flushTime
) if window doesn’t get full.
One window is then ready to get send into a segment.batchCount
and completes a stream if it got enough batches or if
batchTimeout
occurred due to inactivity.batchPeriod
is then used as a sleep time in
between these load sessions.Lets take a closer look how options flushCount
, flushTime
,
batchCount
, batchTimeout
and batchPeriod
work.
As in a highest level where incoming data into a sink is windowed,
flushCount
and flushTime
controls when a batch of messages are
sent into a downstream. If there are a lot of simultaneous segment
connections, flushing less will keep more segments inactive as there
is more demand for batches than what flushing will produce.
When existing segment connection is active and it has subscribed
itself with a stream of batches, data will keep flowing until either
batchCount
is met or batchTimeout
occurs due to inactivity of data
from an upstream. Higher a batchCount
is more data each segment
will read. Higher a batchTimeout
is more time segment will wait in
case there is more data to come.
As gpfdist load operations are done in a loop, batchPeriod
simply
controls not to run things in a buzy loop. Buzy loop would be ok if
there is a constant stream of data coming in but if incoming data is
more like bursts then buzy loop would be unnecessary.
Note | |
---|---|
Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully. |
Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.
In this first example we’re just creating a simple stream which
inserts data from a time
source. Let’s create a table with two
text columns.
gpadmin=# create table ticktock (date text, time text);
Create a simple stream gpstream
.
dataflow:>stream create --name gpstream1 --definition "time | gpfdist --dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1 --flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy
Let it run and see results from a database.
gpadmin=# select count(*) from ticktock; count ------- 14 (1 row)
In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.
gpadmin=# create table httpdata (col1 text, col2 text, col3 text); gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');
Now table looks like this.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+------+------ DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA | DATA (3 rows)
Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.
dataflow:>stream create --name gpfdiststream2 --definition "http --server.port=8081|gpfdist --mode=update --table=httpdata --dbHost=mdw --columnDelimiter=',' --matchColumns=col1 --updateColumns=col2,col3" --deploy
Post some data into a stream which will be passed into a gpfdist sink via http source.
curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/
If you query table again, you’ll see that row for DATA1 has been updated.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+-------+------- DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA1 | DATA1 (3 rows)
Default values for options flushCount
, flushTime
, batchCount
,
batchTimeout
and batchPeriod
are relatively conservative and needs
to be tuned for every use case for optimal performance. Order to make
a decision on how to tune sink behaviour to suit your needs few things
needs to be considered.
Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.
Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.
Longer a load session for each segment is active higher the overall
transfer rate is going to be. Option batchCount
naturally controls
this. However option batchTimeout
then really controls how fast each
segment will complete a stream due to inactivity from upstream and to
step away from a loading session to allow distributes session to
finish and data become visible in a database.
This module writes each message it receives to HDFS.
The hdfs sink has the following options:
0
)<none>
)<none>
)false
)txt
)<none>
)10
)false
)0
)<none>
)0
)<none>
)<none>
)false
)<none>
)1000000000
)Note | |
---|---|
This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one. |
A module that writes its incoming payload to an RDBMS using JDBC.
The jdbc sink has the following options:
<none>
)false
)<none>
)<none>
)<none>
)true
)<none>
)<none>
)<none>
)Note | |
---|---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like |
The log
sink uses the application logger to output the data for inspection.
The log sink has the following options:
payload
)<none>
, possible values: FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
)<none>
)This module sends messages to RabbitMQ.
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
<none>
)<empty string>
)<none>
)[*]
)false
)<none>
)<none>
)<none>
)localhost
)<none>
)5672
)<none>
)<none>
)<none>
)Note | |
---|---|
By default, the message converter is a |
This module sends messages to Redis store.
The redis sink has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)0
)localhost
)<none>
)8
)8
)-1
)0
)6379
)<none>
)<none>
)0
)This module routes messages to named channels.
The router sink has the following options:
nullChannel
)<none>
)<none>
)60000
)false
)<none>
)<none>
)<none>
)Note | |
---|---|
Since this is a dynamic router, destinations are created as needed; therefore, by default the |
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of
destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel
, which
must also appear in the list.
destinationMappings
are used to map the evaluation results to an actual destination name.
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'"); if (payload.contains('a')) { return "foo" } else { return "bar" }
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
Unresolved directive in sinks.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/s3/spring-cloud-starter-stream-sink-s3/README.adoc[tags=ref-doc]
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
The tcp sink has the following options:
UTF-8
)false
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)Text Data
Text and Binary Data
A simple handler that will count messages and log witnessed throughput at a selected interval.
A simple Websocket Sink implementation.
The following commmand line arguments are supported:
<none>
)/websocket
)9292
)false
)1
)To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
The default broker that is used is Redis. Normally can start Redis via redis-server
.
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \ --logging.level.org.springframework.cloud.stream.module.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketsinktrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketsinktrace
. Here is a sample output:
[ { "timestamp": 1445453703508, "info": { "type": "text", "direction": "out", "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4", "payload": "2015-10-21 20:54:33" } }, ... { "timestamp": 1445453703506, "info": { "type": "text", "direction": "out", "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75", "payload": "2015-10-21 20:54:32" } } ]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser
Note | |
---|---|
For SSL mode ( |
To build the source you will need to install JDK 1.7.
The build uses the Maven wrapper so you don’t have to install a specific version of Maven. To enable the tests for Redis you should run the server before bulding. See below for more information on how run Redis.
The main build command is
$ ./mvnw clean install
You can also add '-DskipTests' if you like, to avoid running the tests.
Note | |
---|---|
You can also install Maven (>=3.3.3) yourself and run the |
Note | |
---|---|
Be aware that you might need to increase the amount of memory
available to Maven by setting a |
The projects that require middleware generally include a
docker-compose.yml
, so consider using
Docker Compose to run the middeware servers
in Docker containers. See the README in the
scripts demo
repository for specific instructions about the common cases of mongo,
rabbit and redis.
There is a "full" profile that will generate documentation. You can build just the documentation by executing
$ ./mvnw package -DskipTests=true -P full -pl spring-cloud-stream-app-starters-docs -am
If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.
We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".
Unfortunately m2e does not yet support Maven 3.3, so once the projects
are imported into Eclipse you will also need to tell m2eclipse to use
the .settings.xml
file for the projects. If you do not do this you
may see many different errors related to the POMs in the
projects. Open your Eclipse preferences, expand the Maven
preferences, and select User Settings. In the User Settings field
click Browse and navigate to the Spring Cloud project you imported
selecting the .settings.xml
file in that project. Click Apply and
then OK to save the preference changes.
Note | |
---|---|
Alternatively you can copy the repository settings from |
Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.
Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.
None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.
eclipse-code-formatter.xml
file from the
Spring
Cloud Build project. If using IntelliJ, you can use the
Eclipse Code Formatter
Plugin to import the same file..java
files to have a simple Javadoc class comment with at least an
@author
tag identifying you, and preferably at least a paragraph on what the class is
for..java
files (copy from existing files
in the project)@author
to the .java files that you modify substantially (more
than cosmetic changes).Fixes gh-XXXX
at the end of the commit
message (where XXXX is the issue number).