Darwin.SR2
Copyright © 2013-2016 Pivotal Software, Inc.
Table of Contents
This section will provide you with a detailed overview of Spring Cloud Stream Application Starters, their purpose, and how to use them. It assumes familiarity with general Spring Cloud Stream concepts, which can be found in the Spring Cloud Stream reference documentation.
Spring Cloud Stream Application Starters provide you with out-of-the-box Spring Cloud Stream uility applications that you can run independently or with Spring Cloud Data Flow. You can also use the starters as a basis for creating your own applications.
They include:
You can find a detailed listing of all the starters and as their options in the corresponding section of this guide.
You can find all available app starter repositories in this GitHub Organization.
As a user of Spring Cloud Stream Application Starters you have access to two types of artifacts.
Starters are libraries that contain the complete configuration of a Spring Cloud Stream application with a specific role (e.g. an HTTP source that receives HTTP POST requests and forwards the data on its output channel to downstream Spring Cloud Stream applications). Starters are not executable applications, and are intended to be included in other Spring Boot applications, along with an available Binder implementation of your choice.
Out-of-the-box applications are Spring Boot applications that include the starters and a Binder implementation - a fully functional uber-jar. These uber-jar’s include minimal code required to execute standalone. For each starter application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.
Note | |
---|---|
Only starters are present in the source code of the project. Prebuilt applications are generated according to the stream apps generator maven plugin. |
Based on their target application type, starters can be either:
You can easily identify the type and functionality of a starter based on its name.
All starters are named following the convention spring-cloud-starter-stream-<type>-<functionality>
.
For example spring-cloud-starter-stream-source-file
is a starter for a file source that polls a directory and sends file data on the output channel (read the reference documentation of the source for details).
Conversely, spring-cloud-starter-stream-sink-cassandra
is a starter for a Cassandra sink that writes the data that it receives on the input channel to Cassandra (read the reference documentation of the sink for details).
The prebuilt applications follow a naming convention too: <functionality>-<type>-<binder>
. For example, cassandra-sink-kafka-10
is a Cassandra sink using the Kafka binder that is running with Kafka version 0.10.
You either get access to the artifacts produced by Spring Cloud Stream Application Starters via Maven, Docker, or building the artifacts yourself.
Starters are available as Maven artifacts in the Spring repositories. You can add them as dependencies to your application, as follows:
<dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency>
From this, you can infer the coordinates for other starters found in this guide.
While the version may vary, the group will always remain org.springframework.cloud.stream.app
and the artifact id follows the naming convention spring-cloud-starter-stream-<type>-<functionality>
described previously.
Prebuilt applications are available as Maven artifacts too.
It is not encouraged to use them directly as dependencies, as starters should be used instead.
Following the typical Maven <group>:<artifactId>:<version>
convention, they can be referenced for example as:
org.springframework.cloud.stream.app:cassandra-sink-rabbit:1.0.0.BUILD-SNAPSHOT
You can download the executable jar artifacts from the Spring Maven repositories. The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/. From there you can navigate to the latest release version of a specific app, for example log-sink-rabbit-1.1.1.RELEASE.jar. Use the Milestone and Snapshot repository locations for Milestone and Snapshot executuable jar artifacts.
The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/
. Naming and versioning follows the same general conventions as Maven, e.g.
docker pull springcloudstream/cassandra-sink-kafka-10
will pull the latest Docker image of the Cassandra sink with the Kafka binder that is running with Kafka version 0.10.
You can build the project and generate the artifacts (including the prebuilt applications) on your own. This is useful if you want to deploy the artifacts locally or add additional features.
First, you need to generate the prebuilt applications. This is done by running the application generation Maven plugin. You can do so by simply invoking the maven build with the generateApps profile and install lifecycle.
mvn clean install -PgenerateApps
Each of the prebuilt applciation will contain:
pom.xml
file with the required dependencies (starter and binder)main
method of the application and imports the predefined configurationFor example, spring-cloud-starter-stream-sink-cassandra
will generate cassandra-sink-rabbit
and cassandra-sink-kafka-10
as completely functional applications.
Apart from accessing the sources, sinks and processors already provided by the project, in this section we will describe how to:
Prebuilt applications are provided for both kafka and rabbit binders. But if you want to connect to a different middleware system, and you have a binder for it, you will need to create new artifacts.
<dependencies> <!- other dependencies --> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-gemfire</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> </dependencies>
The next step is to create the project’s main class and import the configuration provided by the starter.
package org.springframework.cloud.stream.app.cassandra.sink.rabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.app.cassandra.sink.CassandraSinkConfiguration; import org.springframework.context.annotation.Import; @SpringBootApplication @Import(CassandraSinkConfiguration.class) public class CassandraSinkGemfireApplication { public static void main(String[] args) { SpringApplication.run(CassandraSinkGemfireApplication.class, args); } }
Spring Cloud Stream Application Starters consists of regular Spring Boot applications with some additional conventions that facilitate generating prebuilt applications with the preconfigured binders. Sometimes, your solution may require additional applications that are not in the scope of out-of-the-box Spring Cloud Stream Application Starters, or require additional tweaks and enhancements. In this section we will show you how to create custom applications that can be part of your solution, along with Spring Cloud Stream application starters. You have the following options:
If you want to add your own custom applications to your solution, you can simply create a new Spring Cloud Stream app project with the binder of your choice and run it the same way as the applications provided by Spring Cloud Stream Application Starters, independently or via Spring Cloud Data Flow. The process is described in the Getting Started Guide of Spring Cloud Stream.
An alternative way to bootstrap your application is to go to the Spring Initializr and choose a Spring Cloud Stream Binder of your choice. This way you already have the necessary infrastructure ready to go and mainly focus on the specifics of the application.
The following requirements need to be followed when you go with this option:
input
for sources - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Source
;output
for sinks - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Sink
;input
and an outbound channel named output
for processors - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Processor
.You can also reuse the starters provided by Spring Cloud Stream Application Starters to create custom components, enriching the behavior of the application.
For example, you can add a Spring Security layer to your HTTP source, add additional configurations to the ObjectMapper
used for JSON transformation wherever that happens, or change the JDBC driver or Hadoop distribution that the application is using.
In order to do this, you should set up your project following a process similar to customizing a binder.
In fact, customizing the binder is the simplest form of creating a custom component.
As a reminder, this involves:
After doing so, you can simply add the additional configuration for the extra features of your application.
If you’re looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference. Let’s review the steps to add mysql
driver to jdbc-sink
application.
mysql
java-driver dependency<dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-sink-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
jdbc
sink, it is: @Import(org.springframework.cloud.stream.app.jdbc.sink.JdbcSinkConfiguration.class)
. You can find the configuration class for other applications in their respective repositories.@SpringBootApplication @Import(org.springframework.cloud.stream.app.jdbc.sink.JdbcSinkConfiguration.class) public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
jdbc-sink
application now includes mysql
driver in itIn this section, we will explain how to develop a custom source/sink/processor application and then generate maven and docker artifacts for it with the necessary middleware bindings using the existing tooling provided by the spring cloud stream app starter infrastructure. For explanation purposes, we will assume that we are creating a new source application for a technology named foobar.
app-starters-build
Please follow the instructions above for designing a proper Spring Cloud Stream Source. You may also look into the existing
starters for how to structure a new one. The default naming for the main @Configuration
class is
FoobarSourceConfiguration
and the default package for this @Configuration
is org.springfamework.cloud.stream.app.foobar.source
. If you have a different class/package name, see below for
overriding that in the app generator. The technology/functionality name for which you create
a starter can be a hyphenated stream of strings such as in scriptable-transform
which is a processor type in the
module spring-cloud-starter-stream-processor-scriptable-transform
.
The starters in spring-cloud-stream-app-starters
are slightly different from the other starters in spring-boot and
spring-cloud in that here we don’t provide a way to auto configure any configuration through spring factories mechanism.
Rather, we delegate this responsibility to the maven plugin that is generating the binder based apps. Therefore, you don’t
have to provide a spring.factories file that lists all your configuration classes.
foobar-app-starters-build
)build
section. This will add the necessary plugin configuration for app generation as well as generating proper documentation metadata.
Please ensure that your root pom inherits app-starters-build as the base configuration for the plugins is specified there.<build> <plugins> <plugin> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-app-starter-doc-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.springframework.cloud.stream.app.plugin</groupId> <artifactId>spring-cloud-stream-app-maven-plugin</artifactId> <configuration> <generatedProjectHome>${session.executionRootDirectory}/apps</generatedProjectHome> <generatedProjectVersion>${project.version}</generatedProjectVersion> <bom> <name>scs-bom</name> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>foobar-app-dependencies</artifactId> <version>${project.version}</version> </bom> <generatedApps> <foobar-source/> </generatedApps> </configuration> </plugin> </plugins> </build>
More information about the maven plugin used above to generate the apps can be found here: github.com/spring-cloud/spring-cloud-stream-app-maven-plugin
If you did not follow the default convention expected by the plugin for where it is looking for the main configuration
class, which is org.springfamework.cloud.stream.app.foobar.source.FoobarSourceConfiguration
, you can override that in
the configuration for the plugin. For example, if your main configuration class is foo.bar.SpecialFooBarConfiguration.class
,
this is how you can tell the plugin to override the default.
<foobar-source> <autoConfigClass>foo.bar.SpecialFooBarConfiguration.class</autoConfigClass> </foobar-source>
foobar-app-dependencies
). This is the bom (bill of material) for this project. It is advised that this bom is inherited from spring-cloud-dependencies-parent
. Please see other starter repositories for guidelines.<dependencyManagement> ... ... <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-source-foobar</artifactId> <version>1.0.0.BUILD-SNAPSHOT</version> </dependency> ... ...
./mvnw clean install -PgenerateApps
This will generate the binder based foobar source apps in a directory named apps
at the root of the repository.
If you want to change the location where the apps are generated, for instance `/tmp/scs-apps
, you can do it in the
configuration section of the plugin.
<configuration> ... <generatedProjectHome>/tmp/scs-apps</generatedProjectHome> ... </configuration
By default, we generate apps for both Kafka 09/10 and Rabbitmq binders - spring-cloud-stream-binder-kafka
and
spring-cloud-stream-binder-rabbit
. Say, if you have a custom binder you created for some middleware (say JMS),
which you need to generate apps for foobar source, you can add that binder to the binders list in the configuration
section as in the following.
<binders> <jms /> </binders>
Please note that this would only work, as long as there is a binder with the maven coordinates of
org.springframework.cloud.stream
as group id and spring-cloud-stream-binder-jms
as artifact id.
This artifact needs to be specified in the BOM above and available through a maven repository as well.
If you have an artifact that is only available through a private internal maven repository (may be an enterprise wide Nexus repo that you use globally across teams), and you need that for your app, you can define that as part of the maven plugin configuration.
For example,
<configuration> ... <extraRepositories> <repository> <id>private-internal-nexus</id> <url>.../</url> <name>...</name> <snapshotEnabled>...</snapshotEnabled> </repository> </extraRepositories> </configuration>
Then you can define this as part of your app tag:
<foobar-source> <extraRepositories> <private-internal-nexus /> </extraRepositories> </foobar-source>
apps
at the root of the repository by default, unless you changed
it elsewhere as described above).Here you will see foobar-source-kafka-09
, foobar-source-kafka-10
and foobar-source-rabbit
.
If you added more binders as described above, you would see that app as well here - for example foobar-source-jms.
You can import these apps directly into your IDE of choice if you further want to do any customizations on them. Each of them is a self contained spring boot application project.
For the generated apps, the parent is spring-boot-starter-parent
as required by the underlying Spring Initializr library.
You can cd into these custom foobar-source directories and do the following to build the apps:
cd foo-source-kafka-10
mvn clean install
This would install the foo-source-kafka-10 into your local maven cache (~/.m2 by default).
The app generation phase adds an integration test to the app project that is making sure that all the spring
components and contexts are loaded properly. However, these tests are not run by default when you do a mvn install
.
You can force the running of these tests by doing the following:
mvn clean install -DskipTests=false
One important note about running these tests in generated apps:
If your application’s spring beans need to interact with
some real services out there or expect some properties to be present in the context, these tests will fail unless you make
those things available. An example would be a Twitter Source, where the underlying spring beans are trying to create a
twitter template and will fail if it can’t find the credentials available through properties. One way to solve this and
still run the generated context load tests would be to create a mock class that provides these properties or mock beans
(for example, a mock twitter template) and tell the maven plugin about its existence. You can use the existing module
app-starters-test-support
for this purpose and add the mock class there.
See the class org.springframework.cloud.stream.app.test.twitter.TwitterTestConfiguration
for reference.
You can create a similar class for your foobar source - FoobarTestConfiguration
and add that to the plugin configuration.
You only need to do this if you run into this particular issue of spring beans are not created properly in the
integration test in the generated apps.
<foobar-source> <extraTestConfigClass>org.springframework.cloud.stream.app.test.foobar.FoobarTestConfiguration.class</extraTestConfigClass> </foobar-source>
When you do the above, this test configuration will be automatically imported into the context of your test class.
Also note that, you need to regenerate the apps each time you make a configuration change in the plugin.
target
directories of the respective apps and also as
maven artifacts in your local maven repository. Go to the target
directory and run the following:java -jar foobar-source-kafa-10.jar
[Ensure that you have kafka running locally when you do this]
It should start the application up.
mvn clean package docker:build
This creates the docker image under the target/docker/springcloudstream
directory. Please ensure that the Docker
container is up and running and DOCKER_HOST environment variable is properly set before you try docker:build
.
All the generated apps from the various app repositories are uploaded to Docker Hub
However, for a custom app that you build, this won’t be uploaded to docker hub under springcloudstream
repository.
If you think that there is a general need for this app, you should try contributing this starter as a new repository to Spring Cloud Stream App Starters.
Upon review, this app then can be eventually available through the above location in docker hub.
If you still need to push this to docker hub under a different repository (may be an enterprise repo that you manage for your organization) you can take the following steps.
Go to the pom.xml of the generated app [ example - foo-source-kafka/pom.xml
]
Search for springcloudstream
. Replace with your repository name.
Then do this:
mvn clean package docker:build docker:push -Ddocker.username=[provide your username] -Ddocker.password=[provide password]
This would upload the docker image to the docker hub in your custom repository.
In the following sections, you can find a brief faq on various things that we discussed above and a few other infrastructure related topics.
What is the parent for stream app starters?
The parent for all app starters is app-starters-build
which is coming from the core project. github.com/spring-cloud-stream-app-starters/core
For example:
<parent> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>app-starters-build</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> </parent>
app-starters-core-dependencies
.
We need this bom during app generation to pull down all the core dependencies.app-starters-build
artfiact. This same BOM is referenced through the maven plugin configuration for the app generation.
The generated apps thus will include this bom also in their pom.xml files.What spring cloud stream artifacts does the parent artifact (app-starters-build
) include?
What other artfiacts are available through the parent app-starters-build
and where are they coming from?
In addition to the above artifacts, the artifacts below also included in app-starters-build
by default.
Can you summarize all the BOM’s that SCSt app starters depend on? All SCSt app starters have access to dependencies defined in the following BOM’s and other dependencies from any other BOM’s these three boms import transitively as in the case of Spring Integration:
app-starter-build
as the parent which in turn has spring-cloud-build
as parent. The above documentation states that the
generated apps have spring-boot-starter
as the parent. Why the mismatch?
There is no mismatch per se, but a slight subtlety. As the question frames, each app starter has access to artifacts managed all the way through spring-cloud-build
at compile time.
However, this is not the case for the generated apps at runtime. Generated apps are managed by boot. Their parent is spring-boot-starter
that imports spring-boot-dependencies
bom that includes a majority of the components that these apps need.
The additional dependencies that the generated application needs are managed by including a BOM specific to each application starter.time-app-dependencies
.
This is an important BOM. At runtime, the generated apps get the versions used in their dependencies through a BOM that is managing the dependencies. Since all the boms
that we specified above only for the helper artifacts, we need a place to manage the starters themselves. This is where the app specific BOM comes into play.
In addition to this need, as it becomes clear below, there are other uses for this BOM such as dependency overrides etc. But in a nutshell, all the starter dependencies go to this BOM.
For instance, take TCP repo as an example. It has a starter for source, sink, client processor etc. All these dependencies are managed through the app specific tcp-app-dependencies
bom.
This bom is provided to the app generator maven plugin in addition to the core bom. This app specific bom has spring-cloud-dependencies-parent
as parent.spring-cloud-stream-app-starters
organization where you can start contributing the starters and other components.How do I override Spring Integration version that is coming from spring-boot-dependencies by default? The following solution only works if the versions you want to override are available through a new Spring Integration BOM. Go to your app starter specific bom. Override the property as following:
<spring-integration.version>VERSION GOES HERE</spring-integration.version>
Then add the following in the dependencies management section in the BOM.
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-bom</artifactId> <version>${spring-integration.version}</version> <scope>import</scope> <type>pom</type> </dependency>
How do I override spring-cloud-stream artifacts coming by default in spring-cloud-dependencies defined in core BOM? The following solution only works if the versions you want to override are available through a new Spring-Cloud-Dependencies BOM. Go to your app starter specific bom. Override the property as following:
<spring-cloud-dependencies.version>VERSION GOES HERE</spring-cloud-dependencies.version>
Then add the following in the dependencies management section in the BOM.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud-dependencies.version}</version> <scope>import</scope> <type>pom</type> </dependency>
What if there is no spring-cloud-dependencies BOM available that contains my versions of spring-cloud-stream, but there is a spring-cloud-stream BOM available? Go to your app starter specific BOM. Override the property as below.
<spring-cloud-stream.version>VERSION GOES HERE</spring-cloud-stream.version>
Then add the following in the dependencies management section in the BOM.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>${spring-cloud-stream.version}</version> <scope>import</scope> <type>pom</type> </dependency>
What if I want to override a single artifact that is provided through a bom? For example spring-integration-java-dsl? Go to your app starter BOM and add the following property with the version you want to override:
<spring-integration-java-dsl.version>VERSION GOES HERE</spring-integration-java-dsl.version>
Then in the dependency management section add the following:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> <version>${spring-integration-java-dsl.version}</version> </dependency>
How do I override the boot version used in a particular app? When you generate the app, override the boot version as follows.
./mvnw clean install -PgenerateApps -DbootVersion=<boot version to override>
For example: ./mvnw clean install -PgenerateApps -DbootVersion=2.0.0.BUILD-SNAPSHOT
You can also override the boot version more permanently by overriding the following property in your starter pom.
<bootVersion>2.0.0.BUILD-SNAPSHOT</bootVersion>
This application polls a directory and sends new files or their contents to the output channel.
The file source provides the contents of a File as a byte array by default.
However, this can be customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
Content-Type: application/octet-stream
file_originalFile: <java.io.File>
file_name: <file name>
Content-Type: text/plain
file_originalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)The file source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)<none>
)<none>
)<none>
)true
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)The ref
option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.
$ ./mvnw clean install -PgenerateApps $ cd apps You can find the corresponding binder based projects here. You can then cd into one of the folders and build it: $ ./mvnw clean package
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetaDataStore
options for possible shared persistent store configuration for the FtpPersistentAcceptOnceFileListFilter
used in the FTP Source.
Content-Type: application/octet-stream
file_originalFile: <java.io.File>
file_name: <file name>
Content-Type: text/plain
file_orginalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)The ftp source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)true
)false
)<none>
)<none>
, possible values: ACTIVE
,PASSIVE
)<none>
)<none>
)21
)<none>
)<none>
)<none>
)<none>
)true
)/
)/
).tmp
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This source allows you to subscribe to any creates or updates to a Gemfire region. The application configures a client cache and client region, along with the necessary subscriptions enabled. By default the payload contains the updated entry value, but may be controlled by passing in a SpEL expression that uses the EntryEvent as the evaluation context.
To enable SSL communication between Geode Source and the Geode cluster you need to provide the URIs of the
Keystore and Truststore files using the gemfire.security.ssl.keystore-uri
and gemfire.security.ssl.truststore-uri
properties.
(If a single file is ued for both stores then point both URIs to it).
The gemfire source supports the following configuration properties:
<none>
)<none>
, possible values: locator
,server
)<none>
)false
)<none>
)<none>
)any
)JKS
)<none>
)<none>
)<none>
)JKS
)<none>
)user.home
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
Continuous query allows client applications to create a GemFire query using Object Query Language (OQL) and to register a CQ listener which subscribes to the query and is notified every time the query’s result set changes. The gemfire-cq source registers a CQ which will post CQEvent messages to the stream.
To enable SSL communication between Geode CQ and the Geode cluster you need to provide the URIs of the
Keystore and Truststore files using the gemfire.security.ssl.keystore-uri
and gemfire.security.ssl.truststore-uri
properties.
(If a single file is ued for both stores then point both URIs to it).
The gemfire-cq source supports the following configuration properties:
<none>
)<none>
, possible values: locator
,server
)<none>
)false
)<none>
)<none>
)any
)JKS
)<none>
)<none>
)<none>
)JKS
)<none>
)user.home
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A source application that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
The http source supports the following configuration properties:
<none>
)<none>
)<none>
)false
)false
)<none>
)/
)8080
)Note | |
---|---|
Security is disabled for this application by default.
To enable it, you should use the mentioned above |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the Spring Boot JDBC Support for more information.
The jdbc source has the following options:
0
)<none>
)true
)<none>
)<none>
)<none>
)embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)1
)0
)1
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The "jms" source enables receiving messages from JMS.
The jms source has the following options:
<none>
)<none>
)<none>
)true
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: AUTO
,CLIENT
,DUPS_OK
)true
)<none>
)<none>
)false
)Note | |
---|---|
Spring boot broker configuration is used; refer to the
Spring Boot Documentation for more information.
The |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.
The load-generator source has the following options:
false
)1000
)1000
)1
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A source module that listens for Emails and emits the message body as a message payload.
The mail source supports the following configuration properties:
UTF-8
)false
)true
)false
)<none>
)false
)<none>
)<none>
)<none>
)<none>
)1
)0
)1
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
This source polls data from MongoDB.
This source is fully based on the MongoDataAutoConfiguration
, so refer to the
Spring Boot MongoDB Support
for more information.
The mongodb source has the following options:
<none>
){ }
)<none>
)true
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)mongodb://localhost/test
)<none>
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)Also see the Spring Boot Documentation for additional MongoProperties
properties.
See and TriggerProperties
for polling options.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
The "mqtt" source enables receiving messages from MQTT.
The mqtt source has the following options:
false
)UTF-8
)true
)stream.client.id.source
)30
)60
)guest
)memory
)/tmp/paho
)[0]
)[stream.mqtt]
)[tcp://localhost:1883]
)guest
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.
The rabbit source has the following options:
false
)1000
)[STANDARD_REQUEST_HEADERS]
)3
)30000
)false
)<none>
)true
)2
)false
)<none>
)<none>
)localhost
)guest
)5672
)false
)false
)<none>
)guest
)<none>
)Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
Note | |
---|---|
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
This source app supports transfer of files using the Amazon S3 protocol.
Files are transferred from the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
Content-Type: application/octet-stream
file_orginalFile: <java.io.File>
file_name: <file name>
Content-Type: text/plain
file_orginalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)The s3 source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)true
)false
)<none>
)<none>
)<none>
)true
)bucket
)/
).tmp
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
Other are for AWS Region
definition:
And for AWS Stack
:
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This source app supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode
option:
java.io.File
referenceWhen using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
.
The option withMarkers
defaults to false
if not explicitly set.
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
See also MetadataStore
options for possible shared persistent store configuration for the SftpPersistentAcceptOnceFileListFilter
and IdempotentReceiverInterceptor
used in the SFTP Source.
This source supports polling multiple sftp servers. This requires configuring multiple session factories. The following configuration will poll two sftp servers, consuming files in a round-robin fashion:
sftp.factories.one.host=host1 sftp.factories.one.port=1234, sftp.factories.one.username = user1, sftp.factories.one.password = pass1, ... sftp.factories.two.host=host2, sftp.factories.two.port=2345, sftp.factories.two.username = user2, sftp.factories.two.password = pass2, sftp.directories=one.sftpSource,two.sftpSecondSource, sftp.max-fetch=1, sftp.fair=true
Note | |
---|---|
The TaskLaunchRequest output functionality is currently supported here for legacy reasons.
If you are interested in this feature, we recommend using the sftp-datafow-source which is intended specifically for this use case.
A task launch request posted to the Data Flow Server API is much simpler to use than the |
Content-Type: application/octet-stream
file_originalFile: <java.io.File>
file_name: <file name>
Content-Type: text/plain
file_originalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)A TaskLaunchRequest object with the following set as command line arguments (also bound to job parameters for Spring Batch):
<task.local-file-path-parameter-name>
=<task.local-file-path-parameter-value>
<task.remote-file-path-parameter-name>
=<task.remote-file-path-parameter-value>
task.resource-uri
is required.
task.deployment-properties
and task.environment-properties
are optional.
The sftp source has the following options:
true
)<none>
, possible values: ref
,lines
,contents
)<none>
)true
)false
)<none>
)<none>
)false
)localhost
)<none>
)<empty string>
)<none>
)22
)<empty string>
)<none>
)false
)<none>
)<none>
)false
)<none>
)<none>
)true
)/
)/
)false
)false
)<none>
)<none>
)jdbc:h2:tcp://localhost:19092/mem:dataflow
)sa
)<none>
)<none>
)localFilePath
)<none>
)<none>
)remoteFilePath
)<empty string>
).tmp
)<none>
)<none>
)1
)0
)-1
)SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.
The syslog source has the following options:
2048
)false
)1514
)tcp
)false
)3164
)0
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
2048
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)false
)1234
)false
)120000
)false
)Text Data
Text and Binary Data
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The tcp-client source has the following options:
2048
)UTF-8
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)localhost
)false
)1234
)60000
)false
)120000
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The time source will simply emit a String with the current time every so often.
The time source has the following options:
<none>
)<none>
)1
)0
)1
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can also be sent each time the trigger fires.
The trigger source has the following options:
<none>
)<none>
)1
)0
)1
)<none>
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The TriggerTask app sends a TaskLaunchRequest
based on a fixed delay, date or
cron expression. The TaskLaunchRequest
is used by a tasklauncher-* sink that
will deploy and launch a task. The only required property for the triggertask
is the --uri which specifies the artifact that will be launched by the
tasklauncher-* that you have selected. The user is also allowed to set the
command line arguments as well as the
Spring Boot properties
that are used by the task.
The triggertask source has the following options:
<none>
)<none>
)1
)0
)1
)<none>
)<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)<empty string>
)<empty string>
)<empty string>
)<empty string>
)<empty string>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This source ingests data from Twitter’s streaming API. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
The twitterstream source has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: SAMPLE
,FIREHOSE
,FILTER
)<none>
)Note | |
---|---|
|
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the aggregator
application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.
If the aggregation and correlation logic is based on the default strategies, the correlationId
, sequenceNumber
and sequenceSize
headers must be presented in the incoming message.
Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler
and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder.
If payload is JSON, the JsonPropertyAccessor
helps to build straightforward SpEL expressions for correlation, release and aggregation functions.
Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.
The aggregator processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)mongodb://localhost/test
)<none>
)false
)<none>
)<none>
)<none>
)<none>
)false
)embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
)<none>
)<none>
)<none>
)all
)<none>
)<none>
)<none>
);
)<none>
)<none>
)<none>
)<none>
)[sync_delay]
)3.2.2
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)By default the aggregator
processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
- for correlation
;
- SequenceSizeReleaseStrategy
- for release
;
- DefaultAggregatingMessageGroupProcessor
- for aggregation
;
- SimpleMessageStore
- for messageStoreType
.
The aggregator
application can be configured for persistent MessageGroupStore
implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import
ed basing on the aggregator.messageStoreType
configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator
.
The JDBC JdbcMessageStore
requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc
package of the spring-integration-jdbc
jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar aggregator_processor.jar --aggregator.message-store-type=jdbc --spring.datasource.url=jdbc:h2:mem:test --spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql java -jar aggregator_processor.jar --spring.data.mongodb.port=0 --aggregator.correlation=T(Thread).currentThread().id --aggregator.release="!#this.?[payload == 'bar'].empty" --aggregator.aggregation="#this.?[payload == 'foo'].![payload]" --aggregator.message-store-type=mongodb --aggregator.message-store-entity=aggregatorTest
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A Processor module that returns messages that is passed by connecting just the input and output channels.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the filter module in a stream to determine whether a Message should be passed to the output channel.
The filter processor has the following options:
true
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor application that retains or discards messages according to a predicate, expressed as a Groovy script.
The groovy-filter processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Processor module that transforms messages using a Groovy script.
The groovy-transform processor has the following options:
<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This processor uses gRPC to process Messages via a remote process written in any language that supports gRPC. This pattern, allows the Java app to handle the stream processing while the gRPC service handles the business logic. The service must implement the grpc service using link:. ./grpc-app-protos/src/main/proto/processor.proto[this protobuf schema].
Note | |
---|---|
The gRPC client stub is blocking by default. Asynchronous and streaming stubs are provided. The Asynchronous stub will perform better if the server is multi-threaded however message ordering will not be guaranteed. If the server supports bidirectional streaming, use the streaming stub. |
Note | |
---|---|
A |
Headers are available to the sidecar application via the process
schema if grpc.include-headers
is true
. The header value contains one or more string values to support multiple
values, e.g., the HTTP Accepts
header.
The payload is a byte array as defined by the schema.
In most cases the return message should simply contain the original headers provided. The sidecar application may modify or add headers however it is recommended to only add headers if necessary.
It is expected that the payload will normally be a string or byte array. However common primitive types are supported as defined by the schema.
The grpc processor has the following options:
<none>
)0
)false
)0
)true
)0
)<none>
, possible values: async
,blocking
,streaming
,riff
)Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
The header-enricher processor has the following options:
<none>
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar header-enricher-processor.jar --headers='foo=payload.someProperty \n bar=payload.otherProperty'
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.
Any Required HTTP headers must be explicitly set via the headers-expression
property. See examples below.
Header values may also be used to construct the request body when referenced in the body-expression
property.
You can set the http-method-expression
property to derive the HTTP method from the inbound Message, or http-method
to set it statically (defaults to GET method).
The Message payload may be any Java type.
Generally, standard Java types such as String(e.g., JSON, XML) or byte array payloads are recommended.
A Map should work without too much effort.
By default, the payload will become HTTP request body (if needed).
You may also set the body-expression
property to construct a value derived from the Message, or body
to use a static (literal) value.
Internally, the processor uses RestTemplate.exchange(…).
The RestTemplate supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
(Note user defined payload types will require adding required dependencies to your pom file)
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
The httpclient processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)<none>
)body
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
$ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $ java -jar httpclient-processor.jar --httpclient.url=http://someurl --httpclient.reply-expression="statusCode.name()"
A processor that evaluates a machine learning model stored in PMML format.
The pmml processor has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
Spring Cloud Stream App Starters for integrating with python
This application invokes a REST service, similar to the standard httpclient processor. In fact, this application embeds the httpclient processor. As a convenience for Python developers, this allows you to provide a Jython wrapper script that may execute a function before and after REST call in order to perform any necessary data transformation. If you don’t require any custom transformations, just use the httpclient processor.
The diagram shows input and output adapters as conceptual components. These are actually implemented as functions defined in a single script that must conform to a simple convention:
def input(): return "Pre" + payload; def output(): return payload + "Post"; result = locals()[channel]()
The function names input
and output
map to the conventional channel names used by Spring Cloud Stream processors.
The last line is a bit of Python reflection magic to invoke a function by its name, given by the bound variable
channel
. Implemented with Spring Integration Scripting, headers
and payload
are always bound to the Message
headers and payload respectively. The payload on the input
side is the object you use to build the REST request.
The output
side transforms the response. If you don’t need any additional processing on one side, implement the
function with pass
as the body:
def output(): pass
Note | |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
Note | |
---|---|
The script is evaluated for every message. This tends to create a a lot of classes for each execution which puts
stress on the JRE |
Headers may be set by the Jython wrapper script if the output()
script function returns a Message.
Whatever the `output()`wrapper script function returns.
Note | |
---|---|
The wrapper script is intended to perform some required transformations prior to sending an HTTP request and/or after
the response is received. The return value of the input adapter will be the inbound payload of the
httpclient processor and shoud conform to its requirements. Likewise
the HTTP |
The python-http processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: GET
,HEAD
,POST
,PUT
,PATCH
,DELETE
,OPTIONS
,TRACE
)body
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
See httpclient processor for more examples on
httpclient
properties.
$java -jar python-http-processor.jar --wrapper.script=/local/directory/build-json.py --httpclient.url=http://someurl --httpclient.http-method=POST --httpclient.headers-expression="{'Content-Type':'application/json'}" $java -jar python-http-processor.jar --git.uri=https://github.com/some-repo --wrapper.script=some-script.py --wrapper .variables=foo=0.45,bar=0.55 --httpclient.url=http://someurl
This application executes a Jython script that binds payload
and headers
variables to the Message payload
and headers respectively. In addition you may provide a jython.variables
property containing a (comma delimited by
default) delimited string, e.g., var1=val1,var2=val2,…
.
This processor uses a JSR-223 compliant embedded ScriptEngine provided by www.jython.org/.
Note | |
---|---|
The last line in the script must be an assignment statement. The variable name doesn’t matter. This is required to bind the return value correctly. |
Note | |
---|---|
The script is evaluated for every message which may limit your performance with high message loads. This also tends
to create a a lot of classes for each execution which puts stress on the JRE |
The jython processor has the following options:
<none>
)true
)master
)<none>
)<none>
)5
)<none>
)<none>
)<none>
)<none>
, possible values: COMMA
,SPACE
,TAB
,NEWLINE
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
The scriptable-transform processor has the following options:
<none>
)<none>
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
true
)<none>
)<none>
)<none>
)<none>
)true
)When no expression
, fileMarkers
, or charset
is provided, a DefaultMessageSplitter
is configured with (optional) delimiters
.
When fileMarkers
or charset
is provided, a FileSplitter
is configured (you must provide either a fileMarkers
or charset
to split files, which must be text-based - they are split into lines).
Otherwise, an ExpressionEvaluatingMessageSplitter
is configured.
When splitting File
payloads, the sequenceSize
header is zero because the size cannot be determined at the beginning.
Caution | |
---|---|
Ambiguous properties are not allowed. |
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is
#jsonPath(payload, '<json path expression>')
.
For example, consider the following JSON:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }}
and an expression #jsonPath(payload, '$.store.book')
; the result will be 4 messages, each with a Map
payload
containing the properties of a single book.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
The tcp-client processor has the following options:
2048
)UTF-8
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)localhost
)false
)1234
)60000
)false
)120000
)false
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
Use the transform app in a stream to convert a Message’s content or structure.
The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload.
As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')
The transform processor has the following options:
payload
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format.
Following snippet shows how to export a TensorFlow
model into ProtocolBuffer
binary format as required by the Processor.
from tensorflow.python.framework.graph_util import convert_variables_to_constants ... SAVE_DIR = os.path.abspath(os.path.curdir) minimal_graph = convert_variables_to_constants(sess, sess.graph_def, ['<model output>']) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my_graph.proto', as_text=False) tf.train.write_graph(minimal_graph, SAVE_DIR, 'my.txt', as_text=True)
The --tensorflow.model
property configures the Processor with the location of the serialized Tensorflow model.
The TensorflowInputConverter
converts the input data into the format, specific for the given model.
The TensorflowOutputConverter
converts the computed Tensors
result into a pipeline Message
.
The --tensorflow.modelFetch
property defines the list of TensorFlow graph outputs to fetch the output Tensors from.
The --tensorflow.mode
property defines whether the computed results are passed in the message payload or in the message header.
The TensorFlow Processor uses a TensorflowInputConverter
to convert the input data into data format compliant with the
TensorFlow Model used. The input converter converts the input Messages
into key/value Map
, where
the Key corresponds to a model input placeholder and the content is org.tensorflow.DataType
compliant value.
The default converter implementation expects either Map payload or flat json message that can be converted into a Map.
The TensorflowInputConverter
can be extended and customized.
See TwitterSentimentTensorflowInputConverter.java for example.
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The tensorflow processor has the following options:
<none>
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)result
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A processor that evaluates a machine learning model stored in TensorFlow Protobuf format. It operationalizes the github.com/danielegrattarola/twitter-sentiment-cnn
Real-time Twitter Sentiment Analytics with TensorFlow and Spring Cloud Dataflow
Decodes the evaluated result into POSITIVE, NEGATIVE and NEUTRAL values. Then creates and returns a simple JSON message with this structure:
N/A
Processor’s output uses TensorflowOutputConverter
to convert the computed Tensor
result into a serializable
message. The default implementation uses Tuple
triple.
Custom TensorflowOutputConverter
can provide more convenient data representations.
See TwitterSentimentTensorflowOutputConverter.java.
The twitter-sentiment processor has the following options:
<none>
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)result
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar twitter-sentiment-processor.jar --tensorflow.twitter.vocabulary= --tensorflow.model= \ --tensorflow.modelFetch= --tensorflow.mode="
And here is a sample pipeline that computes sentiments for json tweets coming from the twitterstream
source and
using the pre-build minimal_graph.proto
and vocab.csv
:
tweets=twitterstream --access-token-secret=xxx --access-token=xxx --consumer-secret=xxx --consumer-key=xxx \ | filter --expression=#jsonPath(payload,'$.lang')=='en' \ | twitter-sentimet --vocabulary='http://dl.bintray.com/big-data/generic/vocab.csv' \ --output-name=output/Softmax --model='http://dl.bintray.com/big-data/generic/minimal_graph.proto' \ --model-fetch=output/Softmax \ | log
A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).
Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.
The input of the model is an image as binary array.
The output is a Tuple (or JSON) message in this format:
{ "labels" : [ {"giant panda":0.98649305} ] }
Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.
If the response-seize
is set to value higher then 1, then the result will include the top response-seize
probable labels. For example response-size=3
would return:
{ "labels": [ {"giant panda":0.98649305}, {"badger":0.010562794}, {"ice bear":0.001130851} ] }
The image-recognition processor has the following options:
<none>
)true
)<none>
)1
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)result
)The new Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor uses one of the pre-trained object detection models and corresponding object labels.
If the pre-trained model is not set explicitly set then following defaults are used:
tensorflow.modelFetch
: detection_scores,detection_classes,detection_boxes,num_detections
tensorflow.model
: dl.bintray.com/big-data/generic/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
tensorflow.object.detection.labels
: dl.bintray.com/big-data/generic/mscoco_label_map.pbtxt
The following diagram illustrates a Spring Cloud Data Flow streaming pipeline that predicts object types from the images in real-time.
Processor’s input is an image byte array and the output is a Tuple (or JSON) message in this format:
{ "labels" : [ {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1}, {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1}, {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23}, {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23} ] }
The output format is:
The object-detection processor has the following options:
<none>
)<none>
, possible values: payload
,tuple
,header
)<none>
)<none>
)false
)0.4
)true
)true
)<none>
)result
)The aggregate counter differs from a simple counter in that it not only keeps a total value for the count, but also retains the total count values for each minute, hour day and month of the period for which it is run. The data can then be queried by supplying a start and end date and the resolution at which the data should be returned.
The aggregate-counter sink has the following options:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
)<none>
)<none>
)<none>
)<none>
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This sink application writes the content of each message it receives into Cassandra.
It expects a payload of JSON String and uses it’s properties to map to table columns.
Headers:
Content-Type: application/json
Payload:
A JSON String or byte array representing the entity (or a list of entities) to be persisted
The cassandra sink has the following options:
<none>
, possible values: NONE
,SNAPPY
,LZ4
)<none>
)false
)[]
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: NONE
,CREATE
,CREATE_IF_NOT_EXISTS
,RECREATE
,RECREATE_DROP_UNUSED
)false
)false
)<none>
)<none>
, possible values: ANY
,ONE
,TWO
,THREE
,ALL
,LOCAL_ONE
,SERIAL
,LOCAL_SERIAL
,QUORUM
,LOCAL_QUORUM
,EACH_QUORUM
)<none>
)<none>
, possible values: INSERT
,UPDATE
,DELETE
,STATEMENT
)<none>
, possible values: DEFAULT
,DOWNGRADING_CONSISTENCY
,FALLTHROUGH
,LOGGING
)<none>
)0
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
The following example assumes a JSON payload is sent to a default destination called input
, the sink parses some of its properties (id,time,customer_id,value) and persists them into a table called orders
.
java -jar cassandra_sink.jar --cassandra.cluster.keyspace=test --cassandra.ingest-query="insert into orders(id,time,customer_id, value) values (?,?,?,?)"
The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.
The counter sink has the following options:
<none>
)<none>
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload.
Tuple
For example suppose a message source produces a payload with a field named user :
class Foo { String user; public Foo(String user) { this.user = user; } }
If the stream source produces messages with the following objects:
new Foo("fred") new Foo("sue") new Foo("dave") new Foo("sue")
The field value counter on the field user will contain:
fred:1, sue:2, dave:1
Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:
users:["dave","fred","sue"] users:["sue","jon"]
The field value counter on the field users will contain:
dave:1, fred:1, sue:2, jon:1
The field-value-counter sink has the following options:
<none>
)<none>
)<none>
)0
)localhost
)<none>
)6379
)false
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This module writes each message it receives to a file.
The file sink has the following options:
false
)UTF-8
)<none>
)<none>
)<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
)file-sink
)<none>
)<empty string>
)FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
Note | |
---|---|
By default Spring Integration will use |
The ftp sink has the following options:
true
)<none>
)<none>
, possible values: ACTIVE
,PASSIVE
)<none>
)<none>
)21
)<none>
)<none>
)<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
)/
)/
)/
).tmp
)true
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The Gemfire sink allows one to write message payloads to a Gemfire server.
To enable SSL communication between Geode Sink and the Geode cluster you need to provide the URIs of the
Keystore and Truststore files using the gemfire.security.ssl.keystore-uri
and gemfire.security.ssl.truststore-uri
properties.
(If a single file is used for both stores then point both URIs to it).
The gemfire sink has the following options:
false
)<none>
)<none>
, possible values: locator
,server
)<none>
)false
)<none>
)<none>
)any
)JKS
)<none>
)<none>
)<none>
)JKS
)<none>
)user.home
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
A sink module that route messages into GPDB/HAWQ
segments via
gpfdist protocol. Internally, this sink creates a custom http listener that supports
the gpfdist
protcol and schedules a task that orchestrates a gploadd
session in the
same way it is done natively in Greenplum.
No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.
The gpfdist sink has the following options:
100
)10
)4
)<none>
)<none>
)localhost
)gpadmin
)gpadmin
)5432
)gpadmin
)
)100
)2
)0
)false
)<none>
)<none>
)<none>
)0
)<none>
)<none>
, possible values: ROWS
,PERCENT
)<none>
)<none>
)<none>
)<none>
)false
)<none>
)<none>
)false
)<none>
)Within a gpfdist
sink we have a Reactor based stream where data is published from the incoming SI channel.
This channel receives data from the Message Bus. The Reactor stream is then connected to Netty
based
http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among
existing http clients. When Greenplum
does a load from an external table, each segment will initiate
a http connection and start loading data. The net effect is that incoming data is automatically spread
among the Greenplum segments.
The gpfdist sink supports the following configuration properties:
Database table to work with. (String, default: ``, required)
This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.
Currently table
is only way to define a structure of an external
table. Effectively it will replace other_table
in below clause
segment.
CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
Gpfdist mode, either `insert` or `update`. (String, default: insert
)
Currently only insert
and update
gpfdist mode is supported. Mode
merge
familiar from a native gpfdist loader is not yet supported.
For mode update
options matchColumns
and updateColumns
are
required.
Data record column delimiter. (Character, default: ``)
Defines used delimiter
character in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[DELIMITER AS 'delimiter']
Error reject limit. (String, default: ``)
Defines a count
value in a below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
As a conveniance this reject limit also recognizes a percentage format
2%
and if used, segmentRejectType
is automatically set to
percent
.
Error reject type, either `rows` or `percent`. (String, default: ``)
Defines ROWS
or PERCENT
in below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Enable or disable log errors. (Boolean, default: false
)
As error logging is optional with SEGMENT REJECT LIMIT
, it’s only used
if both segmentRejectLimit
and segmentRejectType
are set. Enables
the error log in below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Null string definition. (String, default: ``)
Defines used null string
in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[NULL AS 'null string']
Data record delimiter for incoming messages. (String, default: \n
)
On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.
If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered. | ||
-- External Table Docs |
Comma delimited list of columns to match. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Comma delimited list of columns to update. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Debug rate of data transfer. (Integer, default: 0
)
If set to non zero, sink will log a rate of messages passing throught
a sink after number of messages denoted by this setting has been
processed. Value 0
means that this rate calculation and logging is
disabled.
Max collected size per windowed data. (Integer, default: 100
)
Note | |
---|---|
For more info on flush and batch settings, see above. |
There are few important concepts involving how data passes into a sink, through it and finally lands into a database.
flushTime
) or timeouts(flushTime
) if window doesn’t get full.
One window is then ready to get send into a segment.batchCount
and completes a stream if it got enough batches or if
batchTimeout
occurred due to inactivity.batchPeriod
is then used as a sleep time in
between these load sessions.Lets take a closer look how options flushCount
, flushTime
,
batchCount
, batchTimeout
and batchPeriod
work.
As in a highest level where incoming data into a sink is windowed,
flushCount
and flushTime
controls when a batch of messages are
sent into a downstream. If there are a lot of simultaneous segment
connections, flushing less will keep more segments inactive as there
is more demand for batches than what flushing will produce.
When existing segment connection is active and it has subscribed
itself with a stream of batches, data will keep flowing until either
batchCount
is met or batchTimeout
occurs due to inactivity of data
from an upstream. Higher a batchCount
is more data each segment
will read. Higher a batchTimeout
is more time segment will wait in
case there is more data to come.
As gpfdist load operations are done in a loop, batchPeriod
simply
controls not to run things in a buzy loop. Buzy loop would be ok if
there is a constant stream of data coming in but if incoming data is
more like bursts then buzy loop would be unnecessary.
Note | |
---|---|
Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully. |
Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.
In this first example we’re just creating a simple stream which
inserts data from a time
source. Let’s create a table with two
text columns.
gpadmin=# create table ticktock (date text, time text);
Create a simple stream gpstream
.
dataflow:>stream create --name gpstream1 --definition "time | gpfdist --dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1 --flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy
Let it run and see results from a database.
gpadmin=# select count(*) from ticktock; count ------- 14 (1 row)
In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.
gpadmin=# create table httpdata (col1 text, col2 text, col3 text); gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');
Now table looks like this.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+------+------ DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA | DATA (3 rows)
Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.
dataflow:>stream create --name gpfdiststream2 --definition "http --server.port=8081|gpfdist --mode=update --table=httpdata --dbHost=mdw --columnDelimiter=',' --matchColumns=col1 --updateColumns=col2,col3" --deploy
Post some data into a stream which will be passed into a gpfdist sink via http source.
curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/
If you query table again, you’ll see that row for DATA1 has been updated.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+-------+------- DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA1 | DATA1 (3 rows)
Default values for options flushCount
, flushTime
, batchCount
,
batchTimeout
and batchPeriod
are relatively conservative and needs
to be tuned for every use case for optimal performance. Order to make
a decision on how to tune sink behaviour to suit your needs few things
needs to be considered.
Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.
Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.
Longer a load session for each segment is active higher the overall
transfer rate is going to be. Option batchCount
naturally controls
this. However option batchTimeout
then really controls how fast each
segment will complete a stream due to inactivity from upstream and to
step away from a loading session to allow distributes session to
finish and data become visible in a database.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This module writes each message it receives to HDFS.
The hdfs sink has the following options:
0
)<none>
)<none>
)false
)txt
)<none>
)10
)false
)0
)<none>
)0
)<none>
)<none>
)false
)<none>
)1000000000
)Note | |
---|---|
This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one. |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A module that writes its incoming payload to an RDBMS using JDBC.
The jdbc sink has the following options:
payload:payload.toString()
)false
)messages
)<none>
)<none>
)embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
)<none>
)<none>
)<none>
)<none>
)The jdbc.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{ "name": "My Name" "address": { "city": "Big City", "street": "Narrow Alley" } }
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.columns=name,city:address.city,street:address.street
Note | |
---|---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
The log
sink uses the application logger to output the data for inspection.
Please understand that log
sink uses type-less handler, which affects how the actual logging will be performed.
This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged.
Please see more info in the user-guide.
The log sink has the following options:
payload
)<none>
, possible values: FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This module sends messages to RabbitMQ.
java.io.Serializable
Note: With converterBeanName = jsonConverter
any object that can be converted to JSON by Jackson (content type sent to rabbit will be application/json
with type information in other headers.
With converterBeanName
set to something else, payload will be any object that the converter can handle.
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
<none>
)<empty string>
)<none>
)[*]
)false
)false
)<none>
)<none>
)<none>
)<none>
)localhost
)guest
)5672
)false
)false
)<none>
)guest
)<none>
)Note | |
---|---|
By default, the message converter is a |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
This sink application ingest incoming data into MongoDB.
This application is fully based on the MongoDataAutoConfiguration
, so refer to the Spring Boot MongoDB Support for more information.
The mongodb sink has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)mongodb://localhost/test
)<none>
)Also see the Spring Boot Documentation for additional MongoProperties
properties.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
This module sends messages to MQTT.
The mqtt sink has the following options:
false
)UTF-8
)true
)stream.client.id.sink
)30
)60
)guest
)memory
)/tmp/paho
)1
)false
)stream.mqtt
)[tcp://localhost:1883]
)guest
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.
The jdbc sink has the following options:
10000
)payload
)<none>
)<none>
)<none>
)<none>
, possible values: TEXT
,CSV
)-1
)false
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)Note | |
---|---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like |
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
For integration tests to run, start a PostgreSQL database on localhost:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
This module sends messages to Redis store.
The redis sink has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)0
)localhost
)8
)8
)-1ms
)0
)8
)8
)-1ms
)0
)<none>
)6379
)<none>
)<none>
)false
)<none>
)<none>
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar redis-pubsub-sink.jar --redis.queue= java -jar redis-pubsub-sink.jar --redis.queueExpression= java -jar redis-pubsub-sink.jar --redis.key= java -jar redis-pubsub-sink.jar --redis.keyExpression= java -jar redis-pubsub-sink.jar --redis.topic= java -jar redis-pubsub-sink.jar --redis.topicExpression=
This application routes messages to named channels.
The router sink has the following options:
nullChannel
)<none>
)<none>
)60000
)false
)<none>
)<none>
)<none>
)Note | |
---|---|
Since this is a dynamic router, destinations are created as needed; therefore, by default the |
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of
destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel
, which
must also appear in the list.
destinationMappings
are used to map the evaluation results to an actual destination name.
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.
Note | |
---|---|
Starting with Spring Cloud Stream 2.0 onwards the message wire format for |
For example for text
content type one should use:
new String(payload).contains('a')
and for json
content type SpEL expressions like this:
#jsonPath(payload, '$.person.name')
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'"); if (payload.contains('a')) { return "foo" } else { return "bar" }
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This sink app supports transfer files to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain payload
as:
File
, including directories for recursive upload;InputStream
;byte[]
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The s3 sink has the following options:
<none>
, possible values: private
,public-read
,public-read-write
,authenticated-read
,log-delivery-write
,bucket-owner-read
,bucket-owner-full-control
,aws-exec-read
)<none>
)<none>
)<none>
)<none>
)The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
Other are for AWS Region
definition:
And for AWS Stack
:
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
Note | |
---|---|
By default Spring Integration will use |
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
The sftp sink has the following options:
true
)false
)<none>
)localhost
)<none>
)<empty string>
)<none>
)22
)<empty string>
)<none>
)<none>
)<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
)/
)/
)/
).tmp
)true
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
The tcp sink has the following options:
UTF-8
)false
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
)false
)1234
)false
)120000
)false
)Text Data
Text and Binary Data
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A simple handler that will count messages and log witnessed throughput at a selected interval.
$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
A simple Websocket Sink implementation.
The following commmand line arguments are supported:
<none>
)/websocket
)9292
)false
)1
)$ ./mvnw clean install -PgenerateApps $ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \ --logging.level.org.springframework.cloud.stream.module.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketsinktrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketsinktrace
. Here is a sample output:
[ { "timestamp": 1445453703508, "info": { "type": "text", "direction": "out", "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4", "payload": "2015-10-21 20:54:33" } }, ... { "timestamp": 1445453703506, "info": { "type": "text", "direction": "out", "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75", "payload": "2015-10-21 20:54:32" } } ]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser
Note | |
---|---|
For SSL mode ( |
To build the source you will need to install JDK 1.7.
The build uses the Maven wrapper so you don’t have to install a specific version of Maven. To enable the tests for Redis you should run the server before bulding. See below for more information on how run Redis.
The main build command is
$ ./mvnw clean install
You can also add '-DskipTests' if you like, to avoid running the tests.
Note | |
---|---|
You can also install Maven (>=3.3.3) yourself and run the |
Note | |
---|---|
Be aware that you might need to increase the amount of memory
available to Maven by setting a |
The projects that require middleware generally include a
docker-compose.yml
, so consider using
Docker Compose to run the middeware servers
in Docker containers. See the README in the
scripts demo
repository for specific instructions about the common cases of mongo,
rabbit and redis.
There is a "full" profile that will generate documentation. You can build just the documentation by executing
$ ./mvnw package -DskipTests=true -P full -pl spring-cloud-stream-app-starters-docs -am
If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.
We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".
Unfortunately m2e does not yet support Maven 3.3, so once the projects
are imported into Eclipse you will also need to tell m2eclipse to use
the .settings.xml
file for the projects. If you do not do this you
may see many different errors related to the POMs in the
projects. Open your Eclipse preferences, expand the Maven
preferences, and select User Settings. In the User Settings field
click Browse and navigate to the Spring Cloud project you imported
selecting the .settings.xml
file in that project. Click Apply and
then OK to save the preference changes.
Note | |
---|---|
Alternatively you can copy the repository settings from |
Following diagram highlights some of the important Stream App
and Stream App Starter
POM dependencies.
The dependencies are grouped in three categories:
Spring Boot
, Spring Integration
,
Spring Cloud
. The "Bill Of Materials" (BOM) patterns is used throughout the stack to decouple the dependency
management from the lifecycle configurations.
The app-starters-build
parent POM and the app-starters-core-dependencies
BOM use inherit by all app starters.Pre-build Apps
, along with a Binder
implementation.
The App Starter root pom ([my-app-name]-app-starters-build
) inherit all compile-tme configuration for its parent
the core app-starters-build
. Starer’s BOM [my-app-name]-app-dependencies
is used to manage starter’s own dependencies.The spring-cloud-stream-app-maven-plugin
(used to generate the Pre-build Apps) asserts a naming convention over
certain starter’s resources.
Following diagram describes which resources are involved and how the convention is applied to them.
The [type]
placeholder represents the application type and must be either Source
, Processor
or Sink
values.
The [my-app-name]
placeholder represents the name of your app starter project.
For multi-word app names, the hyphens (-
) is used as word delimiter (e.g. my-app-name
). Mind that for package names
the hyphen delimiter is replaced by (.
) character (e.g. o.s.c.s.a.my.app.name
). Class name convention expects
CamelCase names in place of any delimiters (e.g. MyAppNameSourceConfiguration
.
The capital letters in the placeholders are relevant. For example the [type]
refers to lower case names such as
source
, processor
or sink
types, while capitalized placeholder [Type]
refers to names like Source
,
Processor
and Sink
.
The Configuration
and Properties
class suffixes are expected as well.
With the help of the maven plugin configuration all default conventions can be customized or replaced. More information about the maven plugin can be found here: github.com/spring-cloud/spring-cloud-stream-app-maven-plugin
Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.
Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.
None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.
eclipse-code-formatter.xml
file from the
Spring
Cloud Build project. If using IntelliJ, you can use the
Eclipse Code Formatter
Plugin to import the same file..java
files to have a simple Javadoc class comment with at least an
@author
tag identifying you, and preferably at least a paragraph on what the class is
for..java
files (copy from existing files
in the project)@author
to the .java files that you modify substantially (more
than cosmetic changes).Fixes gh-XXXX
at the end of the commit
message (where XXXX is the issue number).