Version 2020.0.0

© 2012-2020 Pivotal Software, Inc.

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

Reference Guide

This section provides you with a detailed overview of the out-of-the-box Spring Cloud Stream Applications. It assumes familiarity with general Spring Cloud Stream concepts, which you can find in the Spring Cloud Stream reference documentation.

These Spring Cloud Stream Applications provide you with out-of-the-box Spring Cloud Stream utility applications that you can run independently or with Spring Cloud Data Flow. They include:

  • Connectors (sources, processors, and sinks) for a variety of middleware technologies, including message brokers, storage (relational, non-relational, filesystem).

  • Adapters for various network protocols.

  • Generic processors that you can customize with Spring Expression Language (SpEL) or by scripting.

You can find a detailed listing of all the applications and their options in the following sections of this guide.

Most of these applications are based on core elements that are exposed as a java.util.function component. You can learn more about these foundational components and how they are all connected to the applications by reading this README.

1. Pre-built Applications

Out-of-the-box applications are Spring Boot applications that include a binder implementation on top of the basic logic of the app (a function for example) — a fully functional uber-jar. These uber-jars include the minimal code required for standalone execution. For each function application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.

Prebuilt applications are generated according to the stream apps generator Maven plugin.

2. Classification

Based on their target application type, they can be either:

  • A source that connects to an external resource to poll and receive data that is published to the default “output” channel;

  • A processor that receives data from an “input” channel and processes it, sending the result on the default “output” channel;

  • A sink that connects to an external resource to send the received data to the default “input” channel.

The prebuilt applications follow a naming convention: <functionality>-<type>-<binder>. For example, rabbit-sink-kafka is a Rabbit sink that uses the Kafka binder that is running with Kafka as the middleware.

2.1. Maven Access

The core functionality of the applications is available as functions. See the Java Functions section in the stream-applications repository for more details. Prebuilt applications are available as Maven artifacts. You can download the executable jar artifacts from the Spring Maven repositories. The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/. From there, you can navigate to the latest released version of a specific app. If you want to use functions directly in a custom application, those artifacts are available under the directory structure org/springframework/cloud/fn. You need to use the Release, Milestone and Snapshot repository locations for Release, Milestone and Snapshot executable jar artifacts respectively.

2.2. Docker Access

The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/. Naming and versioning follows the same general conventions as Maven — for example:

docker pull springcloudstream/cassandra-sink-kafka

The preceding command pulls the latest Docker image of the Cassandra sink with the Kafka binder.

2.3. Build

You can build everything from the root of the repository.

./mvnw clean install

This is a long build and you may want to skip tests:

./mvnw clean install -DskipTests

However, this may not be what you are interested in doing since you are probably interested in a single application or a few of them. In order to build the functions and applications that you are interested in, you need to build them selectively as shown below.

2.4. Building the root parent

First, we need to build the parent used in various components.

./mvnw clean install -f stream-applications-build

2.4.1. Building functions

./mvnw clean install -f functions -DskipTests

You can also build a single function or group of functions. For e.g if you are only interested in jdbc-supplier and log-consumer, do the following.

./mvnw clean install -pl :jdbc-suppler,:log-consumer

2.4.2. Building core for Stream Applications

./mvnw clean install -f applications/stream-applications-core -DskipTests

2.5. Building the applications

Let’s assume that you want to build JDBC Source application based on Kafka Binder in Spring Cloud Stream and Log Sink application based on Rabbit binder. Here is what you need to do. Assuming you built both functions and stream-applications-core as above.

./mvnw clean package -pl :jdbc-source
cd applications/source/jdbc-source/apps/jdbc-source-kafka
./mvnw clean package

This will generate the Kafka binder based uber jar in the target folder.

Similarly for the log sink, do the following.

./mvnw clean package -pl :log-sink
cd applications/sink/log-sink/apps/log-sink-rabbit
./mvnw clean package

2.5.1. Building a Docker image

The apps use the Jib Maven Plugin to build and publish the Docker image. If you have made some changes to an app, you may want to build the image and test it locally.

If you plan to use the image with minikube, run the following command before building the image:
eval $(minikube docker-env)

To build the image in your local registry:

./mvn clean package jib:dockerBuild

To publish the image to a remote registry:

jib:build \
    -Djib.to.image=myregistry/myimage:latest \
    -Djib.to.auth.username=$USERNAME \
    -Djib.to.auth.password=$PASSWORD

3. Patching Pre-built Applications

3.1. Adding new dependencies

If you are looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference. To add mysql driver to jdbc-sink application:

  1. Clone the GitHub repository at github.com/spring-cloud/stream-applications

  2. Find the module that you want to patch and add the additional dependencies, jdbc-sink in this case. For example, you can add the following mysql dependency to the application generator plugin’s configuration in the pom.xml:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.37</version>
  </dependency>

This is how the complete plugin configuration should look like.

 <plugin>
    <groupId>org.springframework.cloud.stream.app.plugin</groupId>
    <artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
    <configuration>
        <generatedApp>
            <name>jdbc</name>
            <type>sink</type>
            <version>${project.version}</version>
            <configClass>org.springframework.cloud.fn.consumer.jdbc.JdbcConsumerConfiguration.class</configClass>
        </generatedApp>
        <dependencies>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.37</version>
              </dependency>
            <dependency>
                <groupId>org.springframework.cloud.fn</groupId>
                <artifactId>jdbc-consumer</artifactId>
                <version>${java-functions.version}</version>
            </dependency>
        </dependencies>
    </configuration>
</plugin>

Once the above changes are done, you can generate the binder based apps as below from the root of the repository.

./mvnw clean install -pl :jdbc-sink

This generates the binder based applications in the apps folder under jdbc-sink folder. In order to build the app with the binder flavor that you are interested in, you need to do the following step.

cd applications/sink/jdbc-sink
cd apps/jdbc-sink-kafka (or Rabbit if you are interested in that)
./mvnw clean package
cd target

There you will find the binder based uber jar with your changes.

3.2. Update existing dependencies or add new resources in the application

Modifying the plugin as above work when there are new dependencies to add to the application. However, when we need to update any existing dependencies, it is easier to make the maven changes in the generated application itself. If we have to update the binder dependencies from a new release of Spring Cloud Stream for example, then those versions need to be updated in the generated application.

Here are the steps (again, we are using jdbc-sink-kafka as an example).

./mvnw clean install -pl :jdbc-sink
cd applications/sink/jdbc-sink/apps/jdbc-sink-kafka

Open the generated application’s pom.xml and update the dependencies. If there is a new version of Spring Cloud Stream update available that contains the enhancements we are looking for, then it is easier to update the BOM itself. Find where the bom is declared in pom.xml and update the version.

For example, if we have to update Spring Cloud Stream to Horsham.SR10, this version must be specified in the BOM declaration as below:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-dependencies</artifactId>
            <version>Horsham.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

We can also update any individual dependencies directly, but it is preferred to use the above dependencyManagement approach if there is a BOM available. This is because, when using a BOM, maven will properly use and align any transitive dependencies.

If you have to modify the application further, this method of modifying the generated application is again the recommended approach.

For instance, if you want to add security certificate files such as a key store, or a trust store to the application’s classpath, then generate the application first and add those resources to the classpath.

Make sure you are in the generated jdbc-sink-kafka folder, then do the following:

First, add the resources to the classpath by placing them under src/main/resources.

Then rebuild the application.

./mvnw clean package
cd target

Here you can find the modified application jar file.

4. Generating out of the box applications for other binders

By default, we only provide out of the box applications for Apache Kafka and RabbitMQ binders. There are other binder implementations exist, for which we can generate these same out of the box applications. For example, if one wants to generate these applications for the Kinesis binder, or the Solace binder, or Google gcp pubsub binder etc. it is possible to do so by following the instructions below.

As a first step, clone the stream applications repository.

cd applications/stream-applications-core

We need to edit the pom.xml in this module. Find the following configuration where it defines the Kafka and RabbitMQ binders for the maven plugin.

<kafka>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
        </dependencies>
    </maven>
</kafka>
<rabbit>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    </maven>
</rabbit>

Add the binder for which you want to generate new apps for. For example, if we want to generate applications for the Kinesis binder, then modify as below.

<binders>
    <kafka>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </kafka>
    <rabbit>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </rabbit>
    <kinesis>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
                    <version>2.0.3.RELEASE</version>
                </dependency>
            </dependencies>
        </maven>
    </kinesis>
</binders>

Note that, we need to use the Kinesis binder version here explicitly, while both Kafka and RabbitMQ do not need them. This is because, those versions come from a dependency management while the Kinesis binder is not available through such mechanisms. Therefore, we need to explicitly use the binder version. If we have a BOM available that defines the version, then that can be used instead, just ensure that is declared in the proper BOM section of the maven plugin.

If the binder for which you are generating the applications relies on a different version of Spring Cloud Stream, make sure it is updated in the maven properties.

Now, we can build: ./mvnw clean install -DskipTests.

If we go to the applications folder and look at the generated applications, we should see the new binder variants there. For instance, if we follow the configuration above for adding the Kinesis binder, then we should see the Kinesis binder based app in the generated apps. Let’s take time-source as an example.

cd applications/source/time-souce/apps

Here, we should see three different binder based apps projects - time-source-kafka, time-source-rabbit and time-source-kineses. Similarly, this should happen for all the out of the box application projects.

Keep in mind that, these generated applications further need to be built individually. For that, go to the generated applications folder and then initiate a maven build.

Applications

5. Sources

5.1. CDC Source

Change Data Capture (CDC) source that captures and streams change events from various databases. Currently, it supports MySQL, PostgreSQL, MongoDB, Oracle and SQL Server databases.

Build upon Debezium Embedded Connector, the CDC Source allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.

It supports all Debezium configuration properties. Just add the cdc.config. prefix to the existing Debezium properties. For example to set the Debezium’s connector.class property use the cdc.config.connector.class source property instead.

We provide convenient shortcuts for the most frequently used Debezium properties. For example instead of the long cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector Debezium property you can use our cdc.connector=mysql shortcut. The table below lists all available shortcuts along with the Debezium properties they represent. The Debezium properties (e.g. cdc.config.XXX) always have precedence over the shortcuts!

The CDC Source introduces a new default BackingOffsetStore configuration, based on the MetadataStore service. Later provides various microservices friendly ways for storing the offset metadata.

5.1.1. Options

Properties grouped by prefix:

cdc
config

Spring pass-trough wrapper for debezium configuration properties. All properties with a 'cdc.config.' prefix are native Debezium properties. The prefix is removed, converting them into Debezium io.debezium.config.Configuration. (Map<String, String>, default: <none>)

connector

Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other. (ConnectorType, default: <none>, possible values: mysql,postgres,mongodb,oracle,sqlserver)

name

Unique name for this sourceConnector instance. (String, default: <none>)

schema

Include the schema's as part of the outbound message. (Boolean, default: false)

cdc.flattening
add-fields

Comma separated list of metadata fields to add to the flattened message. The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct. (String, default: <none>)

add-headers

Comma separated list specify a list of metadata fields to add to the header of the flattened message. The fields will be prefixed with "__" or "__[struct]__". (String, default: <none>)

delete-handling-mode

Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and (3) rewrite - add a '__deleted' field to the records. (DeleteHandlingMode, default: <none>, possible values: drop,rewrite,none)

drop-tombstones

By default Debezium generates tombstone records to enable Kafka compaction on deleted records. The dropTombstones can suppress the tombstone records. (Boolean, default: true)

enabled

Enable flattening the source record events (https://debezium.io/docs/configuration/event-flattening). (Boolean, default: true)

cdc.offset
commit-timeout

Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. (Duration, default: 5000ms)

flush-interval

Interval at which to try committing offsets. The default is 1 minute. (Duration, default: 60000ms)

policy

Offset storage commit policy. (OffsetPolicy, default: <none>)

storage

Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset. (OffsetStorageType, default: <none>, possible values: memory,file,kafka,metadata)

cdc.stream.header
convert-connect-headers

When true the {@link org.apache.kafka.connect.header.Header} are converted into message headers with the {@link org.apache.kafka.connect.header.Header#key()} as name and {@link org.apache.kafka.connect.header.Header#value()}. (Boolean, default: true)

offset

Serializes the source record's offset metadata into the outbound message header under cdc.offset. (Boolean, default: false)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.gemfire
region

Gemfire region name for metadata. (String, default: <none>)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

Debezium property Shortcut mapping

The table below lists all available shortcuts along with the Debezium properties they represent.

Table 1. Table Shortcut Properties Mapping
Shortcut Original Description

cdc.connector

cdc.config.connector.class

mysql : MySqlConnector, postgres : PostgresConnector, mongodb : MongodbSourceConnector, oracle : OracleConnector, sqlserver : SqlServerConnector

cdc.name

cdc.config.name

cdc.offset.flush-interval

cdc.config.offset.flush.interval.ms

cdc.offset.commit-timeout

cdc.config.offset.flush.timeout.ms

cdc.offset.policy

cdc.config.offset.commit.policy

periodic : PeriodicCommitOffsetPolicy, always : AlwaysCommitOffsetPolicy

cdc.offset.storage

cdc.config.offset.storage

metadata : MetadataStoreOffsetBackingStore, file : FileOffsetBackingStore, kafka : KafkaOffsetBackingStore, memory : MemoryOffsetBackingStore

cdc.flattening.drop-tombstones

cdc.config.drop.tombstones

cdc.flattening.delete-handling-mode

cdc.config.delete.handling.mode

none : none, drop : drop, rewrite : rewrite

5.1.2. Database Support

The CDC Source uses the Debezium utilities, and currently supports CDC for five datastores: MySQL, PostgreSQL, MongoDB, Oracle and SQL Server databases.

5.1.3. Examples and Testing

The [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() and [CdcFlatteningIntegrationTest]() integration tests use test databases fixtures, running on the local machine. We use pre-build debezium docker database images. The Maven builds create the test databases fixtures with the help of the docker-maven-plugin.

To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.

MySQL

Start the debezium/example-mysql in a docker:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0

(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

Use following properties to connect the CDC Source to the MySQL DB:

cdc.connector=mysql (1)

cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)

cdc.config.database.user=debezium  (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)

cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 Configures the CDC Source to use MySqlConnector. (equivalent to setting cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector).
2 Metadata used to identify and dispatch the incoming events.
3 Connection to the MySQL server running on localhost:3306 as debezium user.
4 Includes the Change Event Value schema in the SourceRecord events.
5 Enables the CDC Event Flattening.

You can run also the CdcSourceIntegrationTests#CdcMysqlTests using this mysql configuration.

PostgreSQL

Start a pre-configured postgres server from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0

You can connect to this server like this:

psql -U postgres -h localhost -p 5432

Use following properties to connect the CDC Source to the PostgreSQL:

cdc.connector=postgres (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=postgres  (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 Configures CDC Source to use PostgresConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector.
2 Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the PostgreSQL server running on localhost:5432 as postgres user.
5 Includes the Change Event Value schema in the SourceRecord events.
6 Enables the CDC Event Flattening.

You can run also the CdcSourceIntegrationTests#CdcPostgresTests using this mysql configuration.

MongoDB

Start a pre-configured mongodb from the debezium/example-mongodb:0.10 Docker image:

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:0.10

Initialize the inventory collections

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

In the mongodb terminal output, search for a log entry like host: "3f95a8a6516e:27017" :

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

Add 127.0.0.1 3f95a8a6516e entry to your /etc/hosts

Use following properties to connect the CDC Source to the MongoDB:

cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)

cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)

cdc.config.tasks.max=1 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 Configures CDC Source to use MongoDB Connector. This maps into cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector.
2 Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
3 Connection to the MongoDB running on localhost:27017 as debezium user.
4 debezium.io/docs/connectors/mongodb/#tasks
5 Includes the Change Event Value schema in the SourceRecord events.
6 Enables the CDC Event Flattening.

You can run also the CdcSourceIntegrationTests#CdcPostgresTests using this mysql configuration.

SQL Server

Start a sqlserver from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

Populate with sample data form debezium’s sqlserver tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

Use following properties to connect the CDC Source to the SQLServer:

cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=sa  (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 Configures CDC Source to use SqlServerConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector.
2 Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the SQL Server running on localhost:1433 as sa user.

You can run also the CdcSourceIntegrationTests#CdcSqlServerTests using this mysql configuration.

Oracle

Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up

Populate with sample data form Debezium’s Oracle tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --file.supplier.mode=lines, you can also provide the additional option --file.supplier.withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

5.2.1. Options

The file source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

file.supplier
delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

directory

The directory to poll for new files. (File, default: <none>)

filename-pattern

A simple ant pattern to match files. (String, default: <none>)

filename-regex

A regex pattern to match files. (Pattern, default: <none>)

prevent-duplicates

Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default: true)

5.3. FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.3.1. Input

N/A (Fetches files from an FTP server).

5.3.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_originalFile: <java.io.File>

  • file_name: <file name>

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:

None.

Payload:

A java.io.File object.

5.3.3. Options

The ftp source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

ftp.factory
cache-sessions

Cache sessions. (Boolean, default: <none>)

client-mode

The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)

host

The host name of the server. (String, default: localhost)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 21)

username

The username to use to connect to the server. (String, default: <none>)

ftp.supplier
auto-create-local-dir

Set to true to create the local directory if it does not exist. (Boolean, default: true)

delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

delete-remote-files

Set to true to delete remote files after successful transfer. (Boolean, default: false)

filename-pattern

A filter pattern to match the names of files to transfer. (String, default: <none>)

filename-regex

A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)

local-dir

The local directory to use for file transfers. (File, default: <none>)

preserve-timestamp

Set to true to preserve the original timestamp. (Boolean, default: true)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.gemfire
region

Gemfire region name for metadata. (String, default: <none>)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

5.3.4. Examples

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. Geode Source

The Geode source will emit a stream of Objects extracted from Apache Geode EntryEvents or CqEvents.

5.4.1. Options

The geode source has the following options:

Properties grouped by prefix:

geode.client
pdx-read-serialized

Deserialize the Geode objects into PdxInstance instead of the domain class. (Boolean, default: false)

geode.pool
connect-type

Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)

host-addresses

Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)

subscription-enabled

Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)

geode.region
region-name

The region name. (String, default: <none>)

geode.security
password

The cache password. (String, default: <none>)

username

The cache username. (String, default: <none>)

geode.security.ssl
ciphers

Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default: any)

keystore-type

Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)

keystore-uri

Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)

ssl-keystore-password

Password for accessing the keys truststore. (String, default: <none>)

ssl-truststore-password

Password for accessing the trust store. (String, default: <none>)

truststore-type

Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)

truststore-uri

Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)

user-home-directory

Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default: user.home)

geode.supplier
event-expression

SpEL expression to extract data from an {@link org.apache.geode.cache.EntryEvent} or {@link org.apache.geode.cache.query.CqEvent}. (Expression, default: <none>)

query

An OQL query. This will enable continuous query if provided. (String, default: <none>)

5.5. Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

Payload:

If content type matches text/* or application/json

  • String

If content type does not match text/* or application/json

  • byte array

5.5.2. Options

The http source supports the following configuration properties:

Properties grouped by prefix:

http.cors
allow-credentials

Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default: <none>)

allowed-headers

List of request headers that can be used during the actual request. (String[], default: <none>)

allowed-origins

List of allowed origins, e.g. https://domain1.com. (String[], default: <none>)

http
mapped-request-headers

Headers that will be mapped. (String[], default: <none>)

path-pattern

HTTP endpoint path mapping. (String, default: /)

server
port

Server HTTP port. (Integer, default: 8080)

5.6. JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

Payload
  • Map<String, Object> when jdbc.split == true (default) and List<Map<String, Object>> otherwise

5.6.2. Options

The jdbc source has the following options:

Properties grouped by prefix:

jdbc.supplier
max-rows

Max numbers of rows to process for query. (Integer, default: 0)

query

The query to use to select data. (String, default: <none>)

split

Whether to split the SQL result as individual messages. (Boolean, default: true)

update

An SQL update statement to execute for marking polled messages as 'seen'. (String, default: <none>)

spring.cloud.stream.poller
cron

Cron expression value for the Cron Trigger. (String, default: <none>)

fixed-delay

Fixed delay for default poller. (Long, default: 1000)

initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

time-unit

The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

spring.datasource
data

Data (DML) script resource references. (List<String>, default: <none>)

driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

initialization-mode

Mode to apply when determining if DataSource initialization should be performed using the available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)

password

Login password of the database. (String, default: <none>)

schema

Schema (DDL) script resource references. (List<String>, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

5.7. JMS Source

The JMS source enables receiving messages from JMS.

5.7.1. Options

The JMS source has the following options:

Properties grouped by prefix:

jms.supplier
client-id

Client id for durable subscriptions. (String, default: <none>)

destination

The destination from which to receive messages (queue or topic). (String, default: <none>)

message-selector

A selector for messages. (String, default: <none>)

session-transacted

True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default: true)

subscription-durable

True for a durable subscription. (Boolean, default: <none>)

subscription-name

The name of a durable or shared subscription. (String, default: <none>)

subscription-shared

True for a shared subscription. (Boolean, default: <none>)

spring.jms
jndi-name

Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default: <none>)

pub-sub-domain

Whether the default destination type is topic. (Boolean, default: false)

spring.jms.listener
acknowledge-mode

Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default: <none>, possible values: AUTO,CLIENT,DUPS_OK)

auto-startup

Start the container automatically on startup. (Boolean, default: true)

concurrency

Minimum number of concurrent consumers. (Integer, default: <none>)

max-concurrency

Maximum number of concurrent consumers. (Integer, default: <none>)

receive-timeout

Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no timeout at all. The latter is only feasible if not running within a transaction manager and is generally discouraged since it prevents clean shutdown. (Duration, default: 1s)

5.8. Load Generator Source

A source that sends generated data and dispatches it to the stream.

5.8.1. Options

The load-generator source has the following options:

load-generator.generate-timestamp

Whether timestamp generated. (Boolean, default: false)

load-generator.message-count

Message count. (Integer, default: 1000)

load-generator.message-size

Message size. (Integer, default: 1000)

load-generator.producers

Number of producers. (Integer, default: 1)

5.9. Mail Source

A source application that listens for Emails and emits the message body as a message payload.

5.9.1. Options

The mail source has the following options:

mail.supplier.charset

The charset for byte[] mail-to-string transformation. (String, default: UTF-8)

mail.supplier.delete

Set to true to delete email after download. (Boolean, default: false)

mail.supplier.expression

Configure a SpEL expression to select messages. (String, default: true)

mail.supplier.idle-imap

Set to true to use IdleImap Configuration. (Boolean, default: false)

mail.supplier.java-mail-properties

JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

mail.supplier.mark-as-read

Set to true to mark email as read. (Boolean, default: false)

mail.supplier.url

Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default: <none>)

mail.supplier.user-flag

The flag to mark messages when the server does not support \Recent. (String, default: <none>)

5.10. MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

5.10.1. Options

The mongodb source has the following options:

mongodb.supplier.collection

The MongoDB collection to query. (String, default: <none>)

mongodb.supplier.query

The MongoDB query. (String, default: { })

mongodb.supplier.query-expression

The SpEL expression in MongoDB query DSL style. (Expression, default: <none>)

mongodb.supplier.split

Whether to split the query result as individual messages. (Boolean, default: true)

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

5.11. MQTT Source

Source that enables receiving messages from MQTT.

Payload:
  • String if binary setting is false (default)

  • byte[] if binary setting is true

5.11.2. Options

The mqtt source has the following options:

Properties grouped by prefix:

mqtt
clean-session

whether the client and server should remember state across restarts and reconnects. (Boolean, default: true)

connection-timeout

the connection timeout in seconds. (Integer, default: 30)

keep-alive-interval

the ping interval in seconds. (Integer, default: 60)

password

the password to use when connecting to the broker. (String, default: guest)

persistence

'memory' or 'file'. (String, default: memory)

persistence-directory

Persistence directory. (String, default: /tmp/paho)

url

location of the mqtt broker(s) (comma-delimited list). (String[], default: [tcp://localhost:1883])

username

the username to use when connecting to the broker. (String, default: guest)

mqtt.supplier
binary

true to leave the payload as bytes. (Boolean, default: false)

charset

the charset used to convert bytes to String (when binary is false). (String, default: UTF-8)

client-id

identifies the client. (String, default: stream.client.id.source)

qos

the qos; a single value for all topics or a comma-delimited list to match the topics. (Integer[], default: [0])

topics

the topic(s) (comma-delimited) to which the source will subscribe. (String[], default: [stream.mqtt])

5.12. RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

5.12.1. Input

N/A

5.12.2. Output

Payload
  • byte[]

5.12.3. Options

The rabbit source has the following options:

Properties grouped by prefix:

rabbit.supplier
enable-retry

true to enable retry. (Boolean, default: false)

initial-retry-interval

Initial retry interval when retry is enabled. (Integer, default: 1000)

mapped-request-headers

Headers that will be mapped. (String[], default: [STANDARD_REQUEST_HEADERS])

max-attempts

The maximum delivery attempts when retry is enabled. (Integer, default: 3)

max-retry-interval

Max retry interval when retry is enabled. (Integer, default: 30000)

own-connection

When true, use a separate connection based on the boot properties. (Boolean, default: false)

queues

The queues to which the source will listen for messages. (String[], default: <none>)

requeue

Whether rejected messages should be requeued. (Boolean, default: true)

retry-multiplier

Retry backoff multiplier when retry is enabled. (Double, default: 2)

transacted

Whether the channel is transacted. (Boolean, default: false)

spring.rabbitmq
addresses

Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default: <none>)

connection-timeout

Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)

host

RabbitMQ host. Ignored if an address is set. (String, default: localhost)

password

Login to authenticate against the broker. (String, default: guest)

port

RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default: <none>)

publisher-confirm-type

Type of publisher confirms to use. (ConfirmType, default: <none>, possible values: SIMPLE,CORRELATED,NONE)

publisher-returns

Whether to enable publisher returns. (Boolean, default: false)

requested-channel-max

Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default: 2047)

requested-heartbeat

Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

username

Login user to authenticate to the broker. (String, default: guest)

virtual-host

Virtual host to use when connecting to the broker. (String, default: <none>)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

5.12.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.12.5. Examples

java -jar rabbit-source.jar --rabbit.queues=

5.13. Amazon S3 Source

This source app supports transfer of files using the Amazon S3 protocol. Files are transferred from the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:

None.

Payload:

A java.io.File object.

5.13.3. Options

The s3 source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.gemfire
region

Gemfire region name for metadata. (String, default: <none>)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

s3.common
endpoint-url

Optional endpoint url to connect to s3 compatible storage. (String, default: <none>)

path-style-access

Use path style access. (Boolean, default: false)

s3.supplier
auto-create-local-dir

Create or not the local directory. (Boolean, default: true)

delete-remote-files

Delete or not remote files after processing. (Boolean, default: false)

filename-pattern

The pattern to filter remote files. (String, default: <none>)

filename-regex

The regexp to filter remote files. (Pattern, default: <none>)

list-only

Set to true to return s3 object metadata without copying file to a local directory. (Boolean, default: false)

local-dir

The local directory to store files. (File, default: <none>)

preserve-timestamp

To transfer or not the timestamp of the remote file to the local one. (Boolean, default: true)

remote-dir

AWS S3 bucket resource. (String, default: bucket)

remote-file-separator

Remote File separator. (String, default: /)

tmp-file-suffix

Temporary file suffix. (String, default: .tmp)

5.13.4. Amazon AWS common options

The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey

  • cloud.aws.credentials.secretKey

  • cloud.aws.credentials.instanceProfile

  • cloud.aws.credentials.profileName

  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto

  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto

  • cloud.aws.stack.name

5.13.5. Examples

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP Source

This source application supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See sftp-supplier for advanced configuration options.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.14.1. Input

N/A (Fetches files from an SFTP server).

5.14.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_name: <file name>

  • file_remoteFileInfo <file metadata>

  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

  • file_marker : <file marker> (if with-markers is enabled)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:
  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • file_originalFile: <absolute-path-of-local-file>

  • file_name <local-file-name>

  • file_relativePath

  • file_remoteFile: <remote-file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A java.io.File object.

5.14.3. Options

The ftp source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.gemfire
region

Gemfire region name for metadata. (String, default: <none>)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

sftp.supplier
auto-create-local-dir

Set to true to create the local directory if it does not exist. (Boolean, default: true)

delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

delete-remote-files

Set to true to delete remote files after successful transfer. (Boolean, default: false)

directories

A list of factory "name.directory" pairs. (String[], default: <none>)

factories

A map of factory names to factories. (Map<String, Factory>, default: <none>)

fair

True for fair rotation of multiple servers/directories. This is false by default so if a source has more than one entry, these will be received before the other sources are visited. (Boolean, default: false)

filename-pattern

A filter pattern to match the names of files to transfer. (String, default: <none>)

filename-regex

A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)

list-only

Set to true to return file metadata without the entire payload. (Boolean, default: false)

local-dir

The local directory to use for file transfers. (File, default: <none>)

max-fetch

The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default: <none>)

preserve-timestamp

Set to true to preserve the original timestamp. (Boolean, default: true)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

stream

Set to true to stream the file rather than copy to a local directory. (Boolean, default: false)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

sftp.supplier.factory
allow-unknown-keys

True to allow an unknown or changed key. (Boolean, default: false)

host

The host name of the server. (String, default: localhost)

known-hosts-expression

A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)

pass-phrase

Passphrase for user's private key. (String, default: <empty string>)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 22)

private-key

Resource location of user's private key. (Resource, default: <none>)

username

The username to use to connect to the server. (String, default: <none>)

5.14.4. Examples

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --ftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. SYSLOG

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

5.15.1. Options

syslog.supplier.buffer-size

the buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)

syslog.supplier.nio

whether or not to use NIO (when supporting a large number of connections). (Boolean, default: false)

syslog.supplier.port

The port to listen on. (Integer, default: 1514)

syslog.supplier.protocol

Protocol used for SYSLOG (tcp or udp). (Protocol, default: <none>, possible values: tcp,udp,both)

syslog.supplier.reverse-lookup

whether or not to perform a reverse lookup on the incoming socket. (Boolean, default: false)

syslog.supplier.rfc

'5424' or '3164' - the syslog format according to the RFC; 3164 is aka 'BSD' format. (String, default: 3164)

syslog.supplier.socket-timeout

the socket timeout. (Integer, default: 0)

5.16. TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

5.16.1. Options

Properties grouped by prefix:

tcp
nio

Whether or not to use NIO. (Boolean, default: false)

port

The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)

reverse-lookup

Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)

socket-timeout

The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)

use-direct-buffers

Whether or not to use direct buffers. (Boolean, default: false)

tcp.supplier
buffer-size

The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)

decoder

The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

5.16.2. Available Decoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

5.17. Time Source

The time source will simply emit a String with the current time every so often.

5.17.1. Options

The time source has the following options:

spring.cloud.stream.poller.cron

Cron expression value for the Cron Trigger. (String, default: <none>)

spring.cloud.stream.poller.fixed-delay

Fixed delay for default poller. (Long, default: 1000)

spring.cloud.stream.poller.initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

spring.cloud.stream.poller.max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

spring.cloud.stream.poller.time-unit

The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

time.date-format

Format for the date value. (String, default: MM/dd/yy HH:mm:ss)

5.18. Twitter Message Source

Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order. The relieved messages are cached (in a MetadataStore cache) to prevent duplications. By default an in-memory SimpleMetadataStore is used.

The twitter.message.source.count controls the number or returned messages.

The spring.cloud.stream.poller properties control the message poll interval. Must be aligned with used APIs rate limit

5.18.1. Options

Properties grouped by prefix:

spring.cloud.stream.poller
cron

Cron expression value for the Cron Trigger. (String, default: <none>)

fixed-delay

Fixed delay for default poller. (Long, default: 1000)

initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

time-unit

The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

twitter.message.source
count

Max number of events to be returned. 20 default. 50 max. (Integer, default: 20)

5.19. Twitter Search Source

The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.

Returns a collection of relevant Tweets matching a specified query.

Use the spring.cloud.stream.poller properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)

The twitter.search query properties allows querying by keywords and filter the result by time and geolocation.

The twitter.search.count and twitter.search.page control the result pagination in accordance with to the Search API.

Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.

5.19.1. Options

Properties grouped by prefix:

spring.cloud.stream.poller
cron

Cron expression value for the Cron Trigger. (String, default: <none>)

fixed-delay

Fixed delay for default poller. (Long, default: 1000)

initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

time-unit

The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

count

Number of tweets to return per page (e.g. per single request), up to a max of 100. (Integer, default: 100)

lang

Restricts searched tweets to the given language, given by an http://en.wikipedia.org/wiki/ISO_639-1 . (String, default: <none>)

page

Number of pages (e.g. requests) to search backwards (from most recent to the oldest tweets) before start the search from the most recent tweets again. The total amount of tweets searched backwards is (page * count) (Integer, default: 3)

query

Search tweets by search query string. (String, default: <none>)

restart-from-most-recent-on-empty-response

Restart search from the most recent tweets on empty response. Applied only after the first restart (e.g. when since_id != UNBOUNDED) (Boolean, default: false)

result-type

Specifies what type of search results you would prefer to receive. The current default is "mixed." Valid values include: mixed : Include both popular and real time results in the response. recent : return only the most recent results in the response popular : return only the most popular results in the response (ResultType, default: <none>, possible values: popular,mixed,recent)

since

If specified, returns tweets with since the given date. Date should be formatted as YYYY-MM-DD. (String, default: <none>)

twitter.search.geocode
latitude

User's latitude. (Double, default: -1)

longitude

User's longitude. (Double, default: -1)

radius

Radius (in kilometers) around the (latitude, longitude) point. (Double, default: -1)

5.20. Twitter Stream Source

Real-time Tweet streaming Filter and Sample APIs support.

  • The Filter API returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: The track, follow, and locations fields are combined with an OR operator! Queries with track=foo and follow=1234 returns Tweets matching test OR created by user 1234.

  • The Sample API returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.

The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.

5.20.1. Options

Properties grouped by prefix:

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

twitter.stream.filter
count

Indicates the number of previous statuses to stream before transitioning to the live stream. (Integer, default: 0)

filter-level

The filter level limits what tweets appear in the stream to those with a minimum filterLevel attribute value. One of either none, low, or medium. (FilterLevel, default: <none>)

follow

Specifies the users, by ID, to receive public tweets from. (List<Long>, default: <none>)

language

Specifies the tweets language of the stream. (List<String>, default: <none>)

locations

Locations to track. Internally represented as 2D array. Bounding box is invalid: 52.38, 4.90, 51.51, -0.12. The first pair must be the SW corner of the box (List<BoundingBox>, default: <none>)

track

Specifies keywords to track. (List<String>, default: <none>)

twitter.stream
type

<documentation missing> (StreamType, default: <none>, possible values: sample,filter,firehose,link)

5.21. Websocket Source

The Websocket source that produces messages through web socket.

5.21.1. Options

Properties grouped by prefix:

websocket.supplier
allowed-origins

The allowed origins. (String, default: *)

path

The path on which server WebSocket handler is exposed. (String, default: /websocket)

websocket.supplier.sock-js
enable

Enable SockJS service on the server. Default is 'false' (Boolean, default: false)

5.21.2. Examples

To verify that the websocket-source receives messages from Websocket clients, you can use the following simple end-to-end setup.

Step 1: Start kafka
Step 2: Deploy websocket-source on a specific port, say 8080
Step 3: Connect a websocket client on port 8080 path "/websocket", and send some messages.

You can start a kafka console consumer and see the messages there.

6. Processors

6.1. Aggregator Processor

Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc

Change kafka to rabbit if you want to run it against RabbitMQ.

Payload

6.1.2. Options

Properties grouped by prefix:

aggregator
aggregation

SpEL expression for aggregation strategy. Default is collection of payloads. (Expression, default: <none>)

correlation

SpEL expression for correlation key. Default to correlationId header. (Expression, default: <none>)

group-timeout

SpEL expression for timeout to expiring uncompleted groups. (Expression, default: <none>)

message-store-entity

Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc. (String, default: <none>)

message-store-type

Message store type. (String, default: <none>)

release

SpEL expression for release strategy. Default is based on the sequenceSize header. (Expression, default: <none>)

spring.data.mongodb
authentication-database

Authentication database name. (String, default: <none>)

auto-index-creation

Whether to enable auto-index creation. (Boolean, default: <none>)

database

Database name. (String, default: <none>)

field-naming-strategy

Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)

grid-fs-database

GridFS database name. (String, default: <none>)

host

Mongo server host. Cannot be set with URI. (String, default: <none>)

password

Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)

port

Mongo server port. Cannot be set with URI. (Integer, default: <none>)

replica-set-name

Required replica set name for the cluster. Cannot be set with URI. (String, default: <none>)

uri

Mongo database URI. Cannot be set with host, port, credentials and replica set name. (String, default: mongodb://localhost/test)

username

Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

uuid-representation

Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default: java-legacy, possible values: UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)

spring.datasource
continue-on-error

Whether to stop if an error occurs while initializing the database. (Boolean, default: false)

data

Data (DML) script resource references. (List<String>, default: <none>)

data-password

Password of the database to execute DML scripts (if different). (String, default: <none>)

data-username

Username of the database to execute DML scripts (if different). (String, default: <none>)

driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

generate-unique-name

Whether to generate a random datasource name. (Boolean, default: true)

initialization-mode

Mode to apply when determining if DataSource initialization should be performed using the available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)

jndi-name

JNDI location of the datasource. Class, url, username and password are ignored when set. (String, default: <none>)

name

Name of the datasource. Default to "testdb" when using an embedded database. (String, default: <none>)

password

Login password of the database. (String, default: <none>)

platform

Platform to use in the DDL or DML scripts (such as schema-${platform}.sql or data-${platform}.sql). (String, default: all)

schema

Schema (DDL) script resource references. (List<String>, default: <none>)

schema-password

Password of the database to execute DDL scripts (if different). (String, default: <none>)

schema-username

Username of the database to execute DDL scripts (if different). (String, default: <none>)

separator

Statement separator in SQL initialization scripts. (String, default: ;)

sql-script-encoding

SQL scripts encoding. (Charset, default: <none>)

type

Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (Class<DataSource>, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

spring.mongodb.embedded
features

Comma-separated list of features to enable. Uses the defaults of the configured version by default. (Set<Feature>, default: [sync_delay])

version

Version of Mongo to use. (String, default: 3.5.5)

spring.redis
client-name

Client name to be set on connections with CLIENT SETNAME. (String, default: <none>)

database

Database index used by the connection factory. (Integer, default: 0)

host

Redis server host. (String, default: localhost)

password

Login password of the redis server. (String, default: <none>)

port

Redis server port. (Integer, default: 6379)

ssl

Whether to enable SSL support. (Boolean, default: false)

timeout

Connection timeout. (Duration, default: <none>)

url

Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

6.2. Bridge Processor

A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.

Payload

Any

6.3. Filter Processor

Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued. For example, if the incoming payload is of type String and you want to filter out anything that has less than five characters, you can run the filter processor as below.

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4

Change kafka to rabbit if you want to run it against RabbitMQ.

Payload

You can pass any type as payload and then apply SpEL expressions against it to filter. If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

6.3.2. Options

filter.function.expression

Boolean SpEL expression to apply against request message to filter. (Expression, default: <none>)

6.4. Groovy Processor

A Processor that applies a Groovy script against messages.

6.4.1. Options

The groovy-processor processor has the following options:

6.5. Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'.

6.5.1. Options

The header-enricher processor has the following options:

header.enricher.headers

\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz. (Properties, default: <none>)

header.enricher.overwrite

set to true to overwrite any existing message headers. (Boolean, default: false)

6.6. Http Request Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload.

6.6.1. Input

Headers

Any Required HTTP headers must be explicitly set via the headers or headers-expression property. See examples below. Header values may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

Payload

The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. Note that user defined payload types will require adding required dependencies to your pom file.

6.6.2. Output

Headers

No HTTP message headers are mapped to the outbound Message.

Payload

The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body. Note that ResponseEntity (referenced by the expression #root) cannot be deserialized by Jackson by default, but may be rendered as a HashMap.

6.6.3. Options

The http-request processor has the following options:

6.6.4. Options

http.request.body-expression

A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)

http.request.expected-response-type

The type used to interpret the response. (Class<?>, default: <none>)

http.request.headers-expression

A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)

http.request.http-method-expression

A SpEL expression to derive the request method from the incoming message. (Expression, default: <none>)

http.request.maximum-buffer-size

Maximum buffer size in bytes allocated for input stream buffers. Defaults to 256k. Increase, as necessary, for posting or getting large binary content. (Integer, default: 0)

http.request.reply-expression

A SpEL expression used to compute the final result, applied against the whole http {@link org.springframework.http.ResponseEntity}. (Expression, default: <none>)

http.request.timeout

Request timeout in milliseconds. (Long, default: 30000)

http.request.url-expression

A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

6.7. Image Recognition Processor

A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).

Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.

The input of the model is an image as binary array.

The output is a JSON message in this format:

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.

If the response-seize is set to value higher then 1, then the result will include the top response-seize probable labels. For example response-size=3 would return:

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}
Payload

If the incoming type is byte[] and the content type is set to application/octet-stream , then the application process the input byte[] image into and outputs augmented byte[] image payload and json header.

6.7.2. Options

image.recognition.cache-model

cache the pre-trained tensorflow model. (Boolean, default: true)

image.recognition.debug-output

<documentation missing> (Boolean, default: false)

image.recognition.debug-output-path

<documentation missing> (String, default: image-recognition-result.png)

image.recognition.model

pre-trained tensorflow image recognition model. Note that the model must match the selected model type! (String, default: https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb)

image.recognition.model-type

Supports three different pre-trained tensorflow image recognition models: Inception, MobileNetV1 and MobileNetV2 1. Inception graph uses 'input' as input and 'output' as output. 2. MobileNetV2 pre-trained models: https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - normalized image size is always square (e.g. H=W) - graph uses 'input' as input and 'MobilenetV2/Predictions/Reshape_1' as output. 3. MobileNetV1 pre-trained models: https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph uses 'input' as input and 'MobilenetV1/Predictions/Reshape_1' as output. (ModelType, default: <none>, possible values: inception,mobilenetv1,mobilenetv2)

image.recognition.normalized-image-size

Normalized image size. (Integer, default: 224)

image.recognition.response-size

number of recognized images. (Integer, default: 5)

6.8. Object Detection Processor

The Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor is built on top of the Object Detection Function.

You have to provide the Processor with a pre-trained object detection model, and the corresponding object labels.

Here are some sensible configuration defaults:

The following diagram shows a Spring Cloud Data Flow, streaming pipeline, that predicts, in real-time, the object types in input image stream.

scdf tensorflow object detection arch

Processor’s input is an image byte array, and the output is an augmented image, and a header, called detected_objects, that provides textual description of the detected objects:

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

The detected_objects header format is:

  • object-name:confidence - human readable name of the detected object (e.g. label) with its confidence as a float between [0-1]

  • x1, y1, x2, y2 - Response also provides the bounding box of the detected objects represented as (x1, y1, x2, y2). The coordinates are relative to the size of the image size.

  • cid - Classification identifier as defined in the provided labels configuration file.

Payload

The incoming type is byte[], and the content type is application/octet-stream. The processor processes the input byte[] image and outputs an augmented byte[] image payload and a JSON header (detected_objects).

6.8.2. Options

object.detection.cache-model

<documentation missing> (Boolean, default: true)

object.detection.confidence

<documentation missing> (Float, default: 0.4)

object.detection.debug-output

<documentation missing> (Boolean, default: false)

object.detection.debug-output-path

<documentation missing> (String, default: object-detection-result.png)

object.detection.labels

Labels URI. (String, default: https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt)

object.detection.model

pre-trained tensorflow object detection model. (String, default: https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb)

object.detection.response-size

<documentation missing> (Integer, default: <none>)

object.detection.with-masks

<documentation missing> (Boolean, default: false)

6.9. Semantic Segmentation Processor

Image Semantic Segmentation based on the state-of-art DeepLab Tensorflow model.

The Semantic Segmentation is the process of associating each pixel of an image with a class label, (such as flower, person, road, sky, ocean, or car). Unlike the Instance Segmentation, which produces instance-aware region masks, the Semantic Segmentation produces class-aware masks. For implementing Instance Segmentation consult the Object Detection Service instead.

The Semantic Segmentation Processor uses the Semantic Segmentation Function library and the TensorFlow Service.

Payload

The incoming type is byte[], and the content type is application/octet-stream. The processor processes the input byte[] image and outputs augmented byte[] image payload and json header.

Processor’s input is an image byte array, and the output is an augmented image byte array, and a JSON header semantic_segmentation in this format:

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ]
    ...
]

The output header json format represents the color pixel map computed from the input image.

6.9.2. Options

semantic.segmentation.color-map-uri

Every pre-trained model is based on certain object color maps. The pre-defined options are: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (String, default: classpath:/colormap/citymap_colormap.json)

semantic.segmentation.debug-output

save output image inn the local debugOutputPath path. (Boolean, default: false)

semantic.segmentation.debug-output-path

<documentation missing> (String, default: semantic-segmentation-result.png)

semantic.segmentation.mask-transparency

The alpha color of the computed segmentation mask image. (Float, default: 0.45)

semantic.segmentation.model

pre-trained tensorflow semantic segmentation model. (String, default: https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb)

semantic.segmentation.output-type

Specifies the output image type. You can return either the input image with the computed mask overlay, or the mask alone. (OutputType, default: <none>, possible values: blended,mask)

6.10. Script Processor

Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

6.10.1. Options

The script-processor processor has the following options:

script-processor.language

Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)

script-processor.script

Text of the script. (String, default: <none>)

script-processor.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

script-processor.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

6.11. Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages. The processor uses a function that takes a Message<?> as input and then produces a List<Message<?> as output based on various properties (see below). You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.

Payload
  • Incoming payload - Message<?>

If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

  • Outgoing payload - List<Message<?>

6.11.2. Options

splitter.apply-sequence

Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)

splitter.charset

The charset to use when converting bytes in text-based files to String. (String, default: <none>)

splitter.delimiters

When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)

splitter.expression

A SpEL expression for splitting payloads. (String, default: <none>)

splitter.file-markers

Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)

splitter.markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

6.12. Transform Processor

Transformer processor allows you to convert the message payload structure based on a SpEL expression.

Here is an example of how you can run this application.

java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()

Change kafka to rabbit if you want to run it against RabbitMQ.

Payload

The incoming message can contain any type of payload.

6.12.2. Options

spel.function.expression

A SpEL expression to apply. (String, default: <none>)

6.13. Twitter Trend and Trend Locations Processor

Processor that can return either trending topic or the Locations of the trending topics. The twitter.trend.trend-query-type property allow to select the query type.

For this mode set twitter.trend.trend-query-type to trend.

Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.

6.13.2. Retrieve trend Locations

For this mode set twitter.trend.trend-query-type to trendLocation.

Retrieve a full or nearby locations list of trending topics by location.

If the latitude, longitude parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.

If the latitude, longitude parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.

Response is an array of locations that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.

6.13.3. Options

Properties grouped by prefix:

twitter.trend.closest
lat

If provided with a long parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default: <none>)

lon

If provided with a lat parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default: <none>)

twitter.trend
location-id

The Yahoo! Where On Earth ID of the location to return trending information for. Global information is available by using 1 as the WOEID. (Expression, default: payload)

trend-query-type

<documentation missing> (TrendQueryType, default: <none>, possible values: trend,trendLocation)

7. Sinks

7.1. Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

Payload

A JSON String or byte array representing the entity (or a list of entities) to be persisted.

7.1.2. Options

The cassandra sink has the following options:

spring.data.cassandra.cluster-name

<documentation missing> (String, default: <none>)

spring.data.cassandra.compression

Compression supported by the Cassandra binary protocol. (Compression, default: none, possible values: LZ4,SNAPPY,NONE)

spring.data.cassandra.connect-timeout

<documentation missing> (Duration, default: <none>)

spring.data.cassandra.consistency-level

<documentation missing> (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

spring.data.cassandra.contact-points

Cluster node addresses in the form 'host:port', or a simple 'host' to use the configured port. (List<String>, default: [127.0.0.1:9042])

spring.data.cassandra.fetch-size

<documentation missing> (Integer, default: <none>)

spring.data.cassandra.keyspace-name

Keyspace name to use. (String, default: <none>)

spring.data.cassandra.local-datacenter

Datacenter that is considered "local". Contact points should be from this datacenter. (String, default: <none>)

spring.data.cassandra.password

Login password of the server. (String, default: <none>)

spring.data.cassandra.port

Port to use if a contact point does not specify one. (Integer, default: 9042)

spring.data.cassandra.read-timeout

<documentation missing> (Duration, default: <none>)

spring.data.cassandra.schema-action

Schema action to take at startup. (String, default: none)

spring.data.cassandra.serial-consistency-level

<documentation missing> (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

spring.data.cassandra.session-name

Name of the Cassandra session. (String, default: <none>)

spring.data.cassandra.ssl

Enable SSL support. (Boolean, default: false)

spring.data.cassandra.username

Login user of the server. (String, default: <none>)

7.2. Analytics Sink

Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.

The analytics sink can produce two metrics types:

  • Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.

  • Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.

A Meter (e.g Counter or Gauge) is uniquely identified by its name and dimensions (the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.

As a metrics is uniquely identified by its name and dimensions, you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.

Use the analytics.name or analytics.name-expression properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.

Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>, property for adding one or many tags to your metrics. the TAG_NAME used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL expression that dynamically computes the tag value from the incoming message.

The SpEL expressions use the headers and payload keywords to access message’s headers and payload values.

You can use literals (e.g. 'fixed value') to set tags with fixed values.

All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront, Prometheus and InfluxDB and you can enable each of them declaratively. You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink applications.

Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.

  • To enable the Prometheus meter registry, set the following properties.

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
  • To enable Wavefront meter registry, set the following properties.

management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
  • To enable InfluxDB meter registry, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.

Following diagram illustrates how the Analytics Sink helps to collection business insides for stock-exchange, real-time pipeline.

Analytics Architecture
Payload

The incoming message can contain any type of payload.

7.2.2. Options

Properties grouped by prefix:

analytics
amount-expression

A SpEL expression to compute the output metrics value (e.g. amount). It defaults to 1.0 (Expression, default: <none>)

meter-type

Micrometer meter type used to report the metrics to the backend. (MeterType, default: <none>, possible values: counter,gauge)

name

The name of the output metrics. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (String, default: <none>)

name-expression

A SpEL expression to compute the output metrics name from the input message. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (Expression, default: <none>)

analytics.tag
expression

Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate meter increment. Tag expression format is: analytics.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default: <none>)

fixed

DEPRECATED: Please use the analytics.tag.expression with literal SpEL expression. Custom, fixed Tags. Those tags have constant values, created once and then sent along with every published metrics. The convention to define a fixed Tags is: <code> analytics.tag.fixed.[tag-name]=[tag-value] </code> (Map<String, String>, default: <none>)

7.3. Elasticsearch Sink

Sink that indexes documents into Elasticsearch.

This Elasticsearch sink only supports indexing JSON documents. It consumes data from an input destination and then indexes it to Elasticsearch. The input data can be a plain json string, or a java.util.Map that represents the JSON. It also accepts the data as the Elasticsearch provided XContentBuilder. However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder. This is provided mainly for direct invocation of the consumer.

7.3.1. Options

The Elasticsearch sink has the following options:

Properties grouped by prefix:

elasticsearch.consumer
async

Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default: false)

id

The id of the document index. (Expression, default: <none>)

index

Name of the index. (String, default: <none>)

routing

Indicates the shard to route to. If not provided, this resolves to the ID used on the document. (String, default: <none>)

timeout-seconds

Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default: 0)

spring.elasticsearch.rest
connection-timeout

Connection timeout. (Duration, default: 1s)

password

Credentials password. (String, default: <none>)

read-timeout

Read timeout. (Duration, default: 30s)

uris

Comma-separated list of the Elasticsearch instances to use. (List<String>, default: [http://localhost:9200])

username

Credentials username. (String, default: <none>)

7.3.2. Examples of running this sink

  1. From the folder elasticsearch-sink: ./mvnw clean package

  2. cd apps

  3. cd to the proper binder generated app (Kafka or RabbitMQ)

  4. ./mvnw clean package

  5. Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command. docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

  6. Start the middleware (Kafka or RabbitMQ) if it is not already running.

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing

  8. Send some JSON data into the middleware destination. For e.g: "foo":"bar"}

  9. Verify that the data is indexed: curl localhost:9200/testing/_search

7.4. File Sink

The file sink app writes each message it receives to a file.

Payload
  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.4.2. Options

The file-sink has the following options:

file.consumer.binary

A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default: false)

file.consumer.charset

The charset to use when writing text content. (String, default: UTF-8)

file.consumer.directory

The parent directory of the target file. (File, default: <none>)

file.consumer.directory-expression

The expression to evaluate for the parent directory of the target file. (String, default: <none>)

file.consumer.mode

The FileExistsMode to use if the target file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

file.consumer.name

The name of the target file. (String, default: file-consumer)

file.consumer.name-expression

The expression to evaluate for the name of the target file. (String, default: <none>)

file.consumer.suffix

The suffix to append to file name. (String, default: <empty string>)

7.5. FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.
Headers
  • file_name (See note above)

Payload
  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.5.3. Output

N/A (writes to the FTP server).

7.5.4. Options

The ftp sink has the following options:

Properties grouped by prefix:

ftp.consumer
auto-create-dir

Whether or not to create the remote directory. (Boolean, default: true)

filename-expression

A SpEL expression to generate the remote file name. (String, default: <none>)

mode

Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

temporary-remote-dir

A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

use-temporary-filename

Whether or not to write to a temporary file and rename. (Boolean, default: true)

ftp.factory
cache-sessions

Cache sessions. (Boolean, default: <none>)

client-mode

The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)

host

The host name of the server. (String, default: localhost)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 21)

username

The username to use to connect to the server. (String, default: <none>)

7.6. Geode Sink

The Geode sink will write Message contents to a Geode region.

7.6.1. Options

The geode sink has the following options:

Properties grouped by prefix:

geode.consumer
json

Indicates if the Geode region stores json objects as PdxInstance. (Boolean, default: false)

key-expression

SpEL expression to use as a cache key. (String, default: <none>)

geode.pool
connect-type

Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)

host-addresses

Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)

subscription-enabled

Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)

geode.region
region-name

The region name. (String, default: <none>)

geode.security
password

The cache password. (String, default: <none>)

username

The cache username. (String, default: <none>)

geode.security.ssl
ciphers

Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default: any)

keystore-type

Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)

keystore-uri

Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)

ssl-keystore-password

Password for accessing the keys truststore. (String, default: <none>)

ssl-truststore-password

Password for accessing the trust store. (String, default: <none>)

truststore-type

Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)

truststore-uri

Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)

user-home-directory

Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default: user.home)

7.7. JDBC Sink

JDBC sink allows you to persist incoming payload into an RDBMS database.

The jdbc.consumer.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.consumer.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.7.1. Examples

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
Payload

7.7.2. Options

The jdbc sink has the following options:

Properties grouped by prefix:

jdbc.consumer
batch-size

Threshold in number of messages when data will be flushed to database table. (Integer, default: 1)

columns

The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())

idle-timeout

Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)

initialize

'true', 'false' or the location of a custom initialization script for the table. (String, default: false)

table-name

The name of the table to write into. (String, default: messages)

spring.datasource
data

Data (DML) script resource references. (List<String>, default: <none>)

driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

initialization-mode

Mode to apply when determining if DataSource initialization should be performed using the available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)

password

Login password of the database. (String, default: <none>)

schema

Schema (DDL) script resource references. (List<String>, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

7.8. Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

7.8.1. Options

The log sink has the following options:

log.expression

A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)

log.level

The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)

log.name

The name of the logger to use. (String, default: <none>)

7.9. MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

7.9.1. Input

Payload
  • Any POJO

  • String

  • byte[]

7.9.2. Options

The mongodb sink has the following options:

Properties grouped by prefix:

mongodb.consumer
collection

The MongoDB collection to store data. (String, default: <none>)

collection-expression

The SpEL expression to evaluate MongoDB collection. (Expression, default: <none>)

spring.data.mongodb
authentication-database

Authentication database name. (String, default: <none>)

auto-index-creation

Whether to enable auto-index creation. (Boolean, default: <none>)

database

Database name. (String, default: <none>)

field-naming-strategy

Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)

grid-fs-database

GridFS database name. (String, default: <none>)

host

Mongo server host. Cannot be set with URI. (String, default: <none>)

password

Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)

port

Mongo server port. Cannot be set with URI. (Integer, default: <none>)

replica-set-name

Required replica set name for the cluster. Cannot be set with URI. (String, default: <none>)

uri

Mongo database URI. Cannot be set with host, port, credentials and replica set name. (String, default: mongodb://localhost/test)

username

Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

uuid-representation

Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default: java-legacy, possible values: UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)

7.10. MQTT Sink

This module sends messages to MQTT.

Payload:
  • byte[]

  • String

7.10.2. Options

The mqtt sink has the following options:

Properties grouped by prefix:

mqtt
clean-session

whether the client and server should remember state across restarts and reconnects. (Boolean, default: true)

connection-timeout

the connection timeout in seconds. (Integer, default: 30)

keep-alive-interval

the ping interval in seconds. (Integer, default: 60)

password

the password to use when connecting to the broker. (String, default: guest)

persistence

'memory' or 'file'. (String, default: memory)

persistence-directory

Persistence directory. (String, default: /tmp/paho)

url

location of the mqtt broker(s) (comma-delimited list). (String[], default: [tcp://localhost:1883])

username

the username to use when connecting to the broker. (String, default: guest)

mqtt.consumer
async

whether or not to use async sends. (Boolean, default: false)

charset

the charset used to convert a String payload to byte[]. (String, default: UTF-8)

client-id

identifies the client. (String, default: stream.client.id.sink)

qos

the quality of service to use. (Integer, default: 1)

retained

whether to set the 'retained' flag. (Boolean, default: false)

topic

the topic to which the sink will publish. (String, default: stream.mqtt)

7.11. Pgcopy Sink

A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.

7.11.1. Input

Headers
Payload
  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

7.11.2. Output

N/A

7.11.3. Options

The jdbc sink has the following options:

spring.datasource.driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

spring.datasource.password

Login password of the database. (String, default: <none>)

spring.datasource.url

JDBC URL of the database. (String, default: <none>)

spring.datasource.username

Login username of the database. (String, default: <none>)

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.11.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

For integration tests to run, start a PostgreSQL database on localhost:

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. Examples

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

This module sends messages to RabbitMQ.

7.12.1. Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

Properties grouped by prefix:

rabbit
converter-bean-name

The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default: <none>)

exchange

Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)

exchange-expression

A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)

mapped-request-headers

Headers that will be mapped. (String[], default: [*])

own-connection

When true, use a separate connection based on the boot properties. (Boolean, default: false)

persistent-delivery-mode

Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)

routing-key

Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)

routing-key-expression

A SpEL expression that evaluates to a routing key. (Expression, default: <none>)

spring.rabbitmq
addresses

Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default: <none>)

connection-timeout

Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)

host

RabbitMQ host. Ignored if an address is set. (String, default: localhost)

password

Login to authenticate against the broker. (String, default: guest)

port

RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default: <none>)

publisher-confirm-type

Type of publisher confirms to use. (ConfirmType, default: <none>, possible values: SIMPLE,CORRELATED,NONE)

publisher-returns

Whether to enable publisher returns. (Boolean, default: false)

requested-channel-max

Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default: 2047)

requested-heartbeat

Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

username

Login user to authenticate to the broker. (String, default: guest)

virtual-host

Virtual host to use when connecting to the broker. (String, default: <none>)

7.13. Redis Sink

Sends messages to Redis.

7.13.1. Options

The redis sink has the following options:

Properties grouped by prefix:

redis.consumer
key

A literal key name to use when storing to a key. (String, default: <none>)

key-expression

A SpEL expression to use for storing to a key. (String, default: <none>)

queue

A literal queue name to use when storing in a queue. (String, default: <none>)

queue-expression

A SpEL expression to use for queue. (String, default: <none>)

topic

A literal topic name to use when publishing to a topic. (String, default: <none>)

topic-expression

A SpEL expression to use for topic. (String, default: <none>)

spring.redis
client-name

Client name to be set on connections with CLIENT SETNAME. (String, default: <none>)

database

Database index used by the connection factory. (Integer, default: 0)

host

Redis server host. (String, default: localhost)

password

Login password of the redis server. (String, default: <none>)

port

Redis server port. (Integer, default: 6379)

ssl

Whether to enable SSL support. (Boolean, default: false)

timeout

Connection timeout. (Duration, default: <none>)

url

Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

spring.redis.jedis.pool
max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.redis.lettuce.pool
max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.redis.sentinel
master

Name of the Redis server. (String, default: <none>)

nodes

Comma-separated list of "host:port" pairs. (List<String>, default: <none>)

password

Password for authenticating with sentinel(s). (String, default: <none>)

7.14. Router Sink

This application routes messages to named channels.

7.14.1. Options

The router sink has the following options:

router.default-output-channel

Where to send un-routable messages. (String, default: nullChannel)

router.destination-mappings

Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.expression

The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default: <none>)

router.refresh-delay

How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)

router.resolution-required

Whether or not channel resolution is required. (Boolean, default: false)

router.script

The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)

router.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

7.14.2. SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

Starting with Spring Cloud Stream 2.0 onwards the message wire format for json, text and xml content types is byte[] not String! This is an altering change from SCSt 1.x that treats those types as Strings! Depends on the content type, different techniques for handling the byte[] payloads are available. For plain text content types, one can covert the octet payload into string using the new String(payload) SpEL expression. For json types the jsonPath() SpEL utility already supports string and byte array content interchangeably. Same applies for the xml content type and the #xpath() SpEL utility.

For example for text content type one should use:

 new String(payload).contains('a')

and for json content type SpEL expressions like this:

 #jsonPath(payload, '$.person.name')

7.14.3. Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

7.15. RSocket Sink

RSocket sink to send data using RSocket protocols' fire and forget strategy.

7.15.1. Options

The rsocket sink has the following options:

rsocket.consumer.host

RSocket host. (String, default: localhost)

rsocket.consumer.port

RSocket port. (Integer, default: 7000)

rsocket.consumer.route

Route used for RSocket. (String, default: <none>)

rsocket.consumer.uri

URI that can be used for websocket based transport. (URI, default: <none>)

7.16. Amazon S3 Sink

This sink app supports transferring objects to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain one of the payload as:

  • File, including directories for recursive upload;

  • InputStream;

  • byte[]

7.16.1. Options

The s3 sink has the following options:

Properties grouped by prefix:

s3.common
endpoint-url

Optional endpoint url to connect to s3 compatible storage. (String, default: <none>)

path-style-access

Use path style access. (Boolean, default: false)

s3.consumer
acl

S3 Object access control list. (CannedAccessControlList, default: <none>, possible values: private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)

acl-expression

Expression to evaluate S3 Object access control list. (Expression, default: <none>)

bucket

AWS bucket for target file(s) to store. (String, default: <none>)

bucket-expression

Expression to evaluate AWS bucket name. (Expression, default: <none>)

key-expression

Expression to evaluate S3 Object key. (Expression, default: <none>)

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean. See Spring Integration AWS support for more details.

7.16.2. Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey

  • cloud.aws.credentials.secretKey

  • cloud.aws.credentials.instanceProfile

  • cloud.aws.credentials.profileName

  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto

  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto

  • cloud.aws.stack.name

Examples
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. Tasklauncher Sink

This module consumes LaunchRequest messages from a PollableMessageSource and uses the Data Flow REST client to launch a registered task on a configured task platform. The client must be configured to connect to a remote Data Flow Server, including any required authentication (see Configuration Options below).

This application launches a registered task definition using the Data Flow Server REST API.

7.17.1. Input

a Task Launch Request including:

  • the task name (required and created as a task with the target Data Flow Server)

  • deployment properties (key value pairs, optional).

  • program arguments for the task (a list, optional).

The message payload can be a JSON document:

{
  "name":"foo",
  "deploymentProps": {"key1":"val1","key2":"val2"},
  "args":["--debug", "--foo", "bar"]
}

Minimally, it must include the task name.

{"name":"foo"}
Options

The tasklauncher-dataflow sink supports the following configuration properties:

Properties grouped by prefix:

spring.cloud.dataflow.client.authentication
access-token

OAuth2 Access Token. (String, default: <none>)

client-id

OAuth2 Client Id. (String, default: <none>)

client-secret

OAuth2 Client Secret. (String, default: <none>)

scope

OAuth2 Scopes. (Set<String>, default: <none>)

token-uri

OAuth2 Token Uri. (String, default: <none>)

spring.cloud.dataflow.client.authentication.basic
password

The login password. (String, default: <none>)

username

The login username. (String, default: <none>)

spring.cloud.dataflow.client.authentication.oauth2
client-registration-id

<documentation missing> (String, default: <none>)

password

<documentation missing> (String, default: <none>)

username

<documentation missing> (String, default: <none>)

spring.cloud.dataflow.client
enable-dsl

Enable Data Flow DSL access. (Boolean, default: false)

server-uri

The Data Flow server URI. (String, default: http://localhost:9393)

skip-ssl-validation

Skip Ssl validation. (Boolean, default: false)

tasklauncher
platform-name

The Spring Cloud Data Flow platform to use for launching tasks. (String, default: default)

trigger
initial-delay

The initial delay in milliseconds. (Integer, default: 1000)

max-period

The maximum polling period in milliseconds. Will be set to period if period > maxPeriod. (Integer, default: 30000)

period

The polling period in milliseconds. (Integer, default: 1000)

7.17.2. Using the TaskLauncher

The tasklauncher sink consumes LaunchRequest messages, as described above, and launches a task using the target Data Flow server (given by --spring.cloud.dataflow.client.server-uri). The task launcher periodically polls its input source for launch requests but will pause polling when the platform has reached it’s concurrent task execution limit, given by spring.cloud.dataflow.task.platform.<platform-type>.accounts[<account-name>].maximum-concurrent-tasks. This prevents the SCDF deployer’s deployment platform from exhausting its resources under heavy task load. The poller is scheduled using a DynamicPeriodicTrigger. By default the initial polling rate is 1 second, but may be configured to any duration. When polling is paused, or if there are no launch requests present, the trigger period will increase, applying exponential backoff, up to a configured maximum (30 seconds by default).

This version of the Data Flow task launcher requires SCDF version 2.4.x or higher

The SCDF server may be configured to launch tasks on multiple platforms. Each task launcher instance is configured for a single platform, given by the platformName property (default if not specified). This limitation is enforced because if the server has multiple task platforms configured, it may be the case that some of its task platforms are at the limit and some are not. In this situation, we can only consume the next launch request if we know for which task platform it is targeted. For this reason, if the SCDF server is configured for multiple task platforms (or a single non-default platform), we assume that all launch requests are targeted for that platform. The task launcher will set the required deployment property spring.cloud.dataflow.task.platformName if the request does not provide it.

If the request includes the deployment property spring.cloud.dataflow.task.platformName, and the value is not the same as the tasklauncher’s platformName, the task launcher will throw an exception.

To launch tasks on multiple platforms, you must configure a task launcher instance per platform and use a router sink, or partitioning strategy, to route requests to the correct instance.

When the poller is paused it puts pressure on the message broker so some tuning will be necessary in extreme cases to balance resource utilization.
Client Authentication

If the Data Flow server requires authentication, the client must pass credentials with authorization to launch a task. The Data Flow client supports both basic and OAuth2 authentication.

For basic authentication set the username and password:

--spring.cloud.dataflow.client.authentication.basic.username=<username> --spring.cloud.dataflow.client.authentication.basic.password=<password>

For OAuth2 authentication, set the client-id, client-secret, and token-uri at a minimum. These values correspond to values set in the SCDF server’s OAuth2 configuration. For more details, see the Security section in the Data Flow reference.

--spring.cloud.dataflow.client.authentication.client-id=<client-id> --spring.cloud.dataflow.client.authentication.client-secret=<client-secret> spring.cloud.dataflow.client.authentication.token-uri: <token-uri>

7.18. SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

7.18.1. Input

Headers
  • file_name (See note above)

Payload
  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.18.2. Output

N/A (writes to the SFTP server).

7.18.3. Options

The sftp sink has the following options:

Properties grouped by prefix:

sftp.consumer
auto-create-dir

Whether or not to create the remote directory. (Boolean, default: true)

filename-expression

A SpEL expression to generate the remote file name. (String, default: <none>)

mode

Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

temporary-remote-dir

A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

use-temporary-filename

Whether or not to write to a temporary file and rename. (Boolean, default: true)

sftp.consumer.factory
allow-unknown-keys

True to allow an unknown or changed key. (Boolean, default: false)

cache-sessions

Cache sessions. (Boolean, default: <none>)

host

The host name of the server. (String, default: localhost)

known-hosts-expression

A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)

pass-phrase

Passphrase for user's private key. (String, default: <empty string>)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 22)

private-key

Resource location of user's private key. (Resource, default: <none>)

username

The username to use to connect to the server. (String, default: <none>)

7.19. TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

7.19.1. Options

The tcp sink has the following options:

Properties grouped by prefix:

tcp.consumer
charset

The charset used when converting from bytes to String. (String, default: UTF-8)

close

Whether to close the socket after each message. (Boolean, default: false)

encoder

The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

host

The host to which this sink will connect. (String, default: <none>)

tcp
nio

Whether or not to use NIO. (Boolean, default: false)

port

The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)

reverse-lookup

Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)

socket-timeout

The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)

use-direct-buffers

Whether or not to use direct buffers. (Boolean, default: false)

7.19.2. Available Encoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

7.20. Throughput Sink

Sink that will count messages and log the observed throughput at a selected interval.

7.20.1. Options

The throughput sink has the following options:

throughput.report-every-ms

how often to report. (Integer, default: 1000)

7.21. Twitter Message Sink

Send Direct Messages to a specified user from the authenticating user. Requires a JSON POST body and Content-Type header to be set to application/json.

When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint.

SpEL expressions are used to compute the request parameters from the input message.

7.21.1. Options

Use single quotes (') to wrap the literal values of the SpEL expression properties. For example to set a fixed message text use text='Fixed Text'. For fixed target userId use userId='666'.
twitter.message.update.media-id

A media id to associate with the message. A Direct Message may only reference a single media id. (Expression, default: <none>)

twitter.message.update.screen-name

The screen name of the user to whom send the direct message. (Expression, default: <none>)

twitter.message.update.text

The direct message text. URL encode as necessary. Max length of 10,000 characters. (Expression, default: payload)

twitter.message.update.user-id

The user id of the user to whom send the direct message. (Expression, default: <none>)

7.22. Twitter Update Sink

Updates the authenticating user’s current text (e.g Tweeting).

For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row.

While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.

7.22.1. Options

Properties grouped by prefix:

twitter.update
attachment-url

(SpEL expression) In order for a URL to not be counted in the text body of an extended Tweet, provide a URL as a Tweet attachment. This URL must be a Tweet permalink, or Direct Message deep link. Arbitrary, non-Twitter URLs must remain in the text text. URLs passed to the attachment_url parameter not matching either a Tweet permalink or Direct Message deep link will fail at Tweet creation and cause an exception. (Expression, default: <none>)

display-coordinates

(SpEL expression) Whether or not to put a pin on the exact coordinates a Tweet has been sent from. (Expression, default: <none>)

in-reply-to-status-id

(SpEL expression) The ID of an existing text that the update is in reply to. Note: This parameter will be ignored unless the author of the Tweet this parameter references is mentioned within the text text. Therefore, you must include @username, where username is the author of the referenced Tweet, within the update. When inReplyToStatusId is set the auto_populate_reply_metadata is automatically set as well. Later ensures that leading @mentions will be looked up from the original Tweet, and added to the new Tweet from there. This wil append @mentions into the metadata of an extended Tweet as a reply chain grows, until the limit on @mentions is reached. In cases where the original Tweet has been deleted, the reply will fail. (Expression, default: <none>)

media-ids

(SpEL expression) A comma-delimited list of media_ids to associate with the Tweet. You may include up to 4 photos or 1 animated GIF or 1 video in a Tweet. See Uploading Media for further details on uploading media. (Expression, default: <none>)

place-id

(SpEL expression) A place in the world. (Expression, default: <none>)

text

(SpEL expression) The text of the text update. URL encode as necessary. t.co link wrapping will affect character counts. Defaults to message's payload (Expression, default: payload)

twitter.update.location
lat

The latitude of the location this Tweet refers to. This parameter will be ignored unless it is inside the range -90.0 to +90.0 (North is positive) inclusive. It will also be ignored if there is no corresponding long parameter. (Expression, default: <none>)

lon

The longitude of the location this Tweet refers to. The valid ranges for longitude are -180.0 to +180.0 (East is positive) inclusive. This parameter will be ignored if outside that range, if it is not a number, if geo_enabled is disabled, or if there no corresponding lat parameter. (Expression, default: <none>)

7.23. Wavefront Sink

The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.

Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.

7.23.1. Options

The Wavefront sink has the following options:

wavefront.api-token

Wavefront API access token. (String, default: <none>)

wavefront.metric-expression

A SpEL expression that evaluates to a metric value. (Expression, default: <none>)

wavefront.metric-name

The name of the metric.Defaults to the application name. (String, default: <none>)

wavefront.proxy-uri

The URL of the Wavefront proxy. (String, default: <none>)

wavefront.source

Unique application, host, container, or instance that emits metrics. (String, default: <none>)

wavefront.tag-expression

Collection of custom metadata associated with the metric.Point tags cannot be empty. Valid characters for keys: alphanumeric, hyphen ('-'), underscore ('_'), dot ('.'). For values any character is allowed, including spaces. To include a double quote, escape it with a backslash, A backslash cannot be the last character in the tag value. Maximum allowed length for a combination of a point tag key and value is 254 characters (255 including the '=' separating key and value). If the value is longer, the point is rejected and logged (Map<String, Expression>, default: <none>)

wavefront.timestamp-expression

A SpEL expression that evaluates to a timestamp of the metric (optional). (Expression, default: <none>)

wavefront.uri

The URL of the Wavefront environment. (String, default: <none>)

7.24. Websocket Sink

A simple Websocket Sink implementation.

7.24.1. Options

The following options are supported:

websocket.consumer.log-level

the logLevel for netty channels. Default is <tt>WARN</tt> (String, default: <none>)

websocket.consumer.path

the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default: /websocket)

websocket.consumer.port

the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default: 9292)

websocket.consumer.ssl

whether or not to create a {@link io.netty.handler.ssl.SslContext}. (Boolean, default: false)

websocket.consumer.threads

the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default: 1)

7.24.2. Examples

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Rabbitmq
Step 2: Deploy a time-source
Step 3: Deploy the websocket-sink

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.24.3. Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketconsumertrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketconsumertrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser.

Appendices

Appendix A: Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

A.1. Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

A.2. Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).