Reference Guide

This section provides you with a detailed overview of the out-of-the-box Spring Cloud Stream Applications. It assumes familiarity with general Spring Cloud Stream concepts, which you can find in the Spring Cloud Stream reference documentation.

These Spring Cloud Stream Applications provide you with out-of-the-box Spring Cloud Stream utility applications that you can run independently or with Spring Cloud Data Flow. They include:

  • Connectors (sources, processors, and sinks) for a variety of middleware technologies, including message brokers, storage (relational, non-relational, filesystem).

  • Adapters for various network protocols.

  • Generic processors that you can customize with Spring Expression Language (SpEL) or by scripting.

You can find a detailed listing of all the applications and their options in the following sections of this guide.

Most of these applications are based on core elements that are exposed as a java.util.function component. You can learn more about these foundational components and how they are all connected to the applications by reading this README.

1. Pre-built Applications

Out-of-the-box applications are Spring Boot applications that include a binder implementation on top of the basic logic of the app (a function for example) — a fully functional uber-jar. These uber-jars include the minimal code required for standalone execution. For each function application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.

Prebuilt applications are generated according to the stream apps generator Maven plugin.

2. Classification

Based on their target application type, they can be either:

  • A source that connects to an external resource to poll and receive data that is published to the default “output” channel;

  • A processor that receives data from an “input” channel and processes it, sending the result on the default “output” channel;

  • A sink that connects to an external resource to send the received data to the default “input” channel.

The prebuilt applications follow a naming convention: <functionality>-<type>-<binder>. For example, rabbit-sink-kafka is a Rabbit sink that uses the Kafka binder that is running with Kafka as the middleware.

2.1. Maven Access

The core functionality of the applications is available as functions. See the Java Functions section in the stream-applications repository for more details. Prebuilt applications are available as Maven artifacts. You can download the executable jar artifacts from the Spring Maven repositories. The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/. From there, you can navigate to the latest released version of a specific app. If you want to use functions directly in a custom application, those artifacts are available under the directory structure org/springframework/cloud/fn. You need to use the Release, Milestone and Snapshot repository locations for Release, Milestone and Snapshot executable jar artifacts respectively.

2.2. Docker Access

The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/. Naming and versioning follows the same general conventions as Maven — for example:

docker pull springcloudstream/cassandra-sink-kafka

The preceding command pulls the latest Docker image of the Cassandra sink with the Kafka binder.

2.3. Build

You can build everything from the root of the repository.

./mvnw clean install

This is a long build and you may want to skip tests:

./mvnw clean install -DskipTests

However, this may not be what you are interested in doing since you are probably interested in a single application or a few of them. In order to build the functions and applications that you are interested in, you need to build them selectively as shown below.

2.4. Building the root parent

First, we need to build the parent used in various components.

./mvnw clean install -f stream-applications-build

2.4.1. Building functions

./mvnw clean install -f functions -DskipTests

You can also build a single function or group of functions. For e.g if you are only interested in jdbc-supplier and log-consumer, do the following.

./mvnw clean install -pl :jdbc-suppler,:log-consumer

2.4.2. Building core for Stream Applications

./mvnw clean install -f applications/stream-applications-core -DskipTests

2.5. Building the applications

Let’s assume that you want to build JDBC Source application based on Kafka Binder in Spring Cloud Stream and Log Sink application based on Rabbit binder. Here is what you need to do. Assuming you built both functions and stream-applications-core as above.

./mvnw clean package -pl :jdbc-source
cd applications/source/jdbc-source/apps/jdbc-source-kafka
./mvnw clean package

This will generate the Kafka binder based uber jar in the target folder.

Similarly for the log sink, do the following.

./mvnw clean package -pl :log-sink
cd applications/sink/log-sink/apps/log-sink-rabbit
./mvnw clean package

2.5.1. Building a Docker image

The apps use the Jib Maven Plugin to build and publish the Docker image. If you have made some changes to an app, you may want to build the image and test it locally.

If you plan to use the image with minikube, run the following command before building the image:
eval $(minikube docker-env)

To build the image in your local registry:

./mvnw clean package jib:dockerBuild

To publish the image to a remote registry:

./mvnw jib:build \
    -Djib.to.image=myregistry/myimage:latest \
    -Djib.to.auth.username=$USERNAME \
    -Djib.to.auth.password=$PASSWORD

3. Patching Pre-built Applications

3.1. Adding new dependencies

If you are looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference. To add mysql driver to jdbc-sink application:

  1. Clone the GitHub repository at github.com/spring-cloud/stream-applications

  2. Find the module that you want to patch and add the additional dependencies, jdbc-sink in this case. For example, you can add the following mysql dependency to the application generator plugin’s configuration in the pom.xml:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.37</version>
  </dependency>

This is how the complete plugin configuration should look like.

 <plugin>
    <groupId>org.springframework.cloud.stream.app.plugin</groupId>
    <artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
    <configuration>
        <generatedApp>
            <name>jdbc</name>
            <type>sink</type>
            <version>${project.version}</version>
            <configClass>org.springframework.cloud.fn.consumer.jdbc.JdbcConsumerConfiguration.class</configClass>
        </generatedApp>
        <dependencies>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.37</version>
              </dependency>
            <dependency>
                <groupId>org.springframework.cloud.fn</groupId>
                <artifactId>jdbc-consumer</artifactId>
                <version>${java-functions.version}</version>
            </dependency>
        </dependencies>
    </configuration>
</plugin>

Once the above changes are done, you can generate the binder based apps as below from the root of the repository.

./mvnw clean install -pl :jdbc-sink

This generates the binder based applications in the apps folder under jdbc-sink folder. In order to build the app with the binder flavor that you are interested in, you need to do the following step.

cd applications/sink/jdbc-sink
cd apps/jdbc-sink-kafka # (or Rabbit if you are interested in that)
./mvnw clean package
cd target

There you will find the binder based uber jar with your changes.

3.2. Update existing dependencies or add new resources in the application

Modifying the plugin as above work when there are new dependencies to add to the application. However, when we need to update any existing dependencies, it is easier to make the maven changes in the generated application itself. If we have to update the binder dependencies from a new release of Spring Cloud Stream for example, then those versions need to be updated in the generated application.

Here are the steps (again, we are using jdbc-sink-kafka as an example).

./mvnw clean install -pl :jdbc-sink
cd applications/sink/jdbc-sink/apps/jdbc-sink-kafka

Open the generated application’s pom.xml and update the dependencies. If there is a new version of Spring Cloud Stream update available that contains the enhancements we are looking for, then it is easier to update the BOM itself. Find where the bom is declared in pom.xml and update the version.

For example, if we have to update Spring Cloud Stream to 3.2.4-SNAPSHOT, this version must be specified in the BOM declaration as below:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-dependencies</artifactId>
            <version>4.0.3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

We can also update any individual dependencies directly, but it is preferred to use the above dependencyManagement approach if there is a BOM available. This is because, when using a BOM, maven will properly use and align any transitive dependencies.

If you have to modify the application further, this method of modifying the generated application is again the recommended approach.

For instance, if you want to add security certificate files such as a key store, or a trust store to the application’s classpath, then generate the application first and add those resources to the classpath.

Make sure you are in the generated jdbc-sink-kafka folder, then do the following:

First, add the resources to the classpath by placing them under src/main/resources.

Then rebuild the application.

./mvnw clean package
cd target

Here you can find the modified application jar file.

4. Generating out of the box applications for other binders

By default, we only provide out of the box applications for Apache Kafka and RabbitMQ binders. There are other binder implementations exist, for which we can generate these same out of the box applications. For example, if one wants to generate these applications for the Kinesis binder, or the Solace binder etc. it is possible to do so by following the instructions below.

As a first step, clone the stream applications repository.

cd applications/stream-applications-core

We need to edit the pom.xml in this module. Find the following configuration where it defines the Kafka and RabbitMQ binders for the maven plugin.

<kafka>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
        </dependencies>
    </maven>
</kafka>
<rabbit>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    </maven>
</rabbit>

Add the binder for which you want to generate new apps for. For example, if we want to generate applications for the Kinesis binder, then modify as below.

<binders>
    <kafka>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </kafka>
    <rabbit>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </rabbit>
    <kinesis>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
                    <version>2.0.3.RELEASE</version>
                </dependency>
            </dependencies>
        </maven>
    </kinesis>
</binders>

Note that, we need to use the Kinesis binder version here explicitly, while both Kafka and RabbitMQ do not need them. This is because, those versions come from a dependency management while the Kinesis binder is not available through such mechanisms. Therefore, we need to explicitly use the binder version. If we have a BOM available that defines the version, then that can be used instead, just ensure that is declared in the proper BOM section of the maven plugin.

If the binder for which you are generating the applications relies on a different version of Spring Cloud Stream, make sure it is updated in the maven properties.

Now, we can build: ./mvnw clean install -DskipTests.

If we go to the applications folder and look at the generated applications, we should see the new binder variants there. For instance, if we follow the configuration above for adding the Kinesis binder, then we should see the Kinesis binder based app in the generated apps. Let’s take time-source as an example.

cd applications/source/time-souce/apps

Here, we should see three different binder based apps projects - time-source-kafka, time-source-rabbit and time-source-kineses. Similarly, this should happen for all the out of the box application projects.

Keep in mind that, these generated applications further need to be built individually. For that, go to the generated applications folder and then initiate a maven build.

Applications

5. Sources

5.1. Debezium Source

Debezium Engine based Change Data Capture (CDC) source. The Debezium Source allows capturing database change events and streaming those over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.

This source can be used with any Spring Cloud Stream message binder. It is not restricted nor depended on the Kafka Connect framework. Though this approach is flexible it comes with certain limitations.

All Debezium configuration properties are supported. Just precede any Debezium properties with the debezium.properties. prefix. For example to set the Debezium’s connector.class property use the debezium.properties.connector.class source property instead.

5.1.1. Database Support

The Debezium Source currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.

5.1.2. Options

Event flattening configuration

Debezium provides a comprehensive message format, that accurately details information about changes that happen in the system. Sometime this format, though, might not be suitable for the downstream consumers, that might require messages that are formatted so that field names and values are presented in a simplified, flattened structure.

To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening message transformation. Using the flattering configuration you can configure simple messages format like this:

--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium Offset Storage

When a Debezium source runs, it reads information from the source and periodically records offsets that define how much of that information it has processed. Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading. Out of the box, the following offset storage configuration options are provided:

  • In-Memory

    Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
  • Local Filesystem

    Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
    --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1)
    --debezium.properties.offset.flush.interval.ms=60000 (2)
    1 Path to file where offsets are to be stored. Required when offset.storage` is set to the FileOffsetBackingStore.
    2 Interval at which to try committing offsets. The default is 1 minute.
  • Kafka topic

    Uses a Kafka topic to store offset data.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
    --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1)
    --debezium.properties.offset.storage.partitions=2 (2)
    --debezium.properties.offset.storage.replication.factor=1 (3)
    --debezium.properties.offset.flush.interval.ms=60000 (4)
    1 The name of the Kafka topic where offsets are to be stored. Required when offset.storage is set to the KafkaOffsetBackingStore.
    2 The number of partitions used when creating the offset storage topic.
    3 Replication factor used when creating the offset storage topic.
    4 Interval at which to try committing offsets. The default is 1 minute.

One can implement the org.apache.kafka.connect.storage.OffsetBackingStore interface in to provide a offset storage bound to a custom backend key-value store.

5.1.3. Examples and Testing

The debezium integration tests use databases fixtures, running on the local machine. Pre-build debezium docker database images with the help of Testcontainers are leveraged.

To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.

MySQL

Start the debezium/example-mysql in a docker:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

Use following properties to connect the Debezium Source to MySQL DB:

debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)

debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)


debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)

debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)

debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 Configures the Debezium Source to use MySqlConnector.
2 Metadata used to identify and dispatch the incoming events.
3 Connection to the MySQL server running on localhost:3306 as debezium user.
4 Includes the Change Event Value schema in the ChangeEvent message.
5 Enables the Change Event Flattening.
6 Source state to preserver between multiple starts.

You can run also the DebeziumDatabasesIntegrationTest#mysql() using this mysql configuration.

Disable the mysql GenericContainer test initialization code.
PostgreSQL

Start a pre-configured postgres server from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final

You can connect to this server like this:

psql -U postgres -h localhost -p 5432

Use following properties to connect the Debezium Source to PostgreSQL:

debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=postgres  (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 Configures Debezium Source to use PostgresConnector.
2 Configures the Debezium engine to use memory stores.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the PostgreSQL server running on localhost:5432 as postgres user.
5 Includes the Change Event Value schema in the message.
6 Enables the Chage Event Flattening.

You can run also the DebeziumDatabasesIntegrationTest#postgres() using this postgres configuration.

Disable the postgres GenericContainer test initialization code.
MongoDB

Start a pre-configured mongodb from the debezium/example-mongodb:2.3.3.Final container image:

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:2.3.3.Final

Initialize the inventory collections

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

In the mongodb terminal output, search for a log entry like host: "3f95a8a6516e:27017" :

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

Add 127.0.0.1 3f95a8a6516e entry to your /etc/hosts

Use following properties to connect the Debezium Source to MongoDB:

debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)

debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)

debezium.properties.tasks.max=1 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 Configures Debezium Source to use MongoDB Connector.
2 Configures the Debezium engine to use memory.
3 Connection to the MongoDB running on localhost:27017 as debezium user.
4 debezium.io/docs/connectors/mongodb/#tasks
5 Includes the Change Event Value schema in the SourceRecord events.
6 Enables the Chnage Event Flattening.

You can run also the DebeziumDatabasesIntegrationTest#mongodb() using this mongodb configuration.

SQL Server

Start a sqlserver from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

Populate with sample data form debezium SqlServer tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

Use following properties to connect the Debezium Source to SQLServer:

debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=sa  (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 Configures Debezium Source to use SqlServerConnector.
2 Configures the Debezium engine to use memory state stores.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the SQL Server running on localhost:1433 as sa user.

You can run also the DebeziumDatabasesIntegrationTest#sqlServer() using this SqlServer configuration.

Oracle

Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up

Populate with sample data form Debezium Oracle tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --file.supplier.mode=lines, you can also provide the additional option --file.supplier.withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

5.2.1. Options

The file source has the following options:

5.3. FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.3.1. Input

N/A (Fetches files from an FTP server).

5.3.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_originalFile: <java.io.File>

  • file_name: <file name>

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:

None.

Payload:

A java.io.File object.

5.3.3. Options

The ftp source has the following options:

5.3.4. Examples

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

5.4.1. Payload:

If content type matches text/* or application/json

  • String

If content type does not match text/* or application/json

  • byte array

5.4.2. Options

The http source supports the following configuration properties:

5.5. JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

5.5.1. Payload

  • Map<String, Object> when jdbc.split == true (default) and List<Map<String, Object>> otherwise

5.5.2. Options

The jdbc source has the following options:

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

5.6. JMS Source

The JMS source enables receiving messages from JMS.

5.6.1. Options

The JMS source has the following options:

5.7. Apache Kafka Source

This module consumes messages from Apache Kafka.

5.7.1. Options

The kafka source has the following options:

(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)

5.8. Load Generator Source

A source that sends generated data and dispatches it to the stream.

5.8.1. Options

The load-generator source has the following options:

load-generator.generate-timestamp

Whether timestamp generated. (Boolean, default: false)

load-generator.message-count

Message count. (Integer, default: 1000)

load-generator.message-size

Message size. (Integer, default: 1000)

load-generator.producers

Number of producers. (Integer, default: 1)

5.9. Mail Source

A source application that listens for Emails and emits the message body as a message payload.

5.9.1. Options

The mail source has the following options:

5.10. MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

5.10.1. Options

The mongodb source has the following options:

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

5.11. MQTT Source

Source that enables receiving messages from MQTT.

5.11.1. Payload:

  • String if binary setting is false (default)

  • byte[] if binary setting is true

5.11.2. Options

The mqtt source has the following options:

5.12. RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

5.12.1. Input

N/A

5.12.2. Output

Payload
  • byte[]

5.12.3. Options

The rabbit source has the following options:

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

5.12.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.12.5. Examples

java -jar rabbit-source.jar --rabbit.queues=

5.13. Amazon S3 Source

This source app supports transfer of files using the Amazon S3 protocol. Files are transferred from the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.13.1. mode = lines

Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

5.13.2. mode = ref

Headers:

None.

Payload:

A java.io.File object.

5.13.3. Options

The s3-source has the following options:

5.13.4. Amazon AWS common options

The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

5.13.5. Examples

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP Source

This source application supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See sftp-supplier for advanced configuration options.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.14.1. Input

N/A (Fetches files from an SFTP server).

5.14.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_name: <file name>

  • file_remoteFileInfo <file metadata>

  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

  • file_marker : <file marker> (if with-markers is enabled)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:
  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • file_originalFile: <absolute-path-of-local-file>

  • file_name <local-file-name>

  • file_relativePath

  • file_remoteFile: <remote-file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A java.io.File object.

5.14.3. Options

The ftp source has the following options:

5.14.4. Examples

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. SYSLOG

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

5.16. TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

5.16.2. Available Decoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

5.17. Time Source

The time source will simply emit a String with the current time every so often.

5.17.1. Options

The time source has the following options:

spring.integration.poller
cron

Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default: <none>)

fixed-delay

Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default: <none>)

fixed-rate

Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default: <none>)

initial-delay

Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default: <none>)

max-messages-per-poll

Maximum number of messages to poll per polling cycle. (Integer, default: <none>)

receive-timeout

How long to wait for messages on poll. (Duration, default: 1s)

5.18. Twitter Message Source

Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order. The relieved messages are cached (in a MetadataStore cache) to prevent duplications. By default an in-memory SimpleMetadataStore is used.

The twitter.message.source.count controls the number or returned messages.

The spring.cloud.stream.poller properties control the message poll interval. Must be aligned with used APIs rate limit

5.19. Twitter Search Source

The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.

Returns a collection of relevant Tweets matching a specified query.

Use the spring.cloud.stream.poller properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)

The twitter.search query properties allows querying by keywords and filter the result by time and geolocation.

The twitter.search.count and twitter.search.page control the result pagination in accordance with to the Search API.

Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.

5.20. Twitter Stream Source

Real-time Tweet streaming Filter and Sample APIs support.

  • The Filter API returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: The track, follow, and locations fields are combined with an OR operator! Queries with track=foo and follow=1234 returns Tweets matching test OR created by user 1234.

  • The Sample API returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.

The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.

5.21. Websocket Source

The Websocket source that produces messages through web socket.

5.21.2. Examples

To verify that the websocket-source receives messages from Websocket clients, you can use the following simple end-to-end setup.

Step 1: Start kafka
Step 2: Deploy websocket-source on a specific port, say 8080
Step 3: Connect a websocket client on port 8080 path "/websocket", and send some messages.

You can start a kafka console consumer and see the messages there.

5.22. XMPP Source

The "xmpp" source enables receiving messages from an XMPP Server.

5.22.1. Input

N/A

5.22.2. Output

Payload
  • byte[]

5.22.3. Options

The xmpp source has the following options:

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

5.22.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.22.5. Examples

java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost

5.23. ZeroMQ Source

The "zeromq" source enables receiving messages from ZeroMQ.

5.23.1. Input

N/A

5.23.2. Output

Payload
  • byte[]

5.23.3. Options

The zeromq source has the following options:

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

5.23.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.23.5. Examples

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. Processors

6.1. Aggregator Processor

Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc

Change kafka to rabbit if you want to run it against RabbitMQ.

6.1.1. Payload

If an input payload is a byte[] and content-type header is a JSON, then JsonBytesToMap function tries to deserialize this payload to a Map for better data representation on the output of the aggregator function. Also, such a Map data representation makes it easy to access to the payload content from SpEL expressions mentioned below. Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.

6.2. Bridge Processor

A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.

6.2.1. Payload

Any

6.3. Filter Processor

Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued. For example, if the incoming payload is of type String and you want to filter out anything that has less than five characters, you can run the filter processor as below.

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4

Change kafka to rabbit if you want to run it against RabbitMQ.

6.3.1. Payload

You can pass any type as payload and then apply SpEL expressions against it to filter. If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

6.4. Groovy Processor

A Processor that applies a Groovy script against messages.

6.4.1. Options

The groovy-processor processor has the following options:

groovy-processor.script

Reference to a script used to process messages. (Resource, default: <none>)

groovy-processor.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

groovy-processor.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

6.5. Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'.

6.5.1. Options

The header-enricher processor has the following options:

6.6. Http Request Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload.

6.6.1. Input

Headers

Any Required HTTP headers must be explicitly set via the headers or headers-expression property. See examples below. Header values may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

Payload

The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. Note that user defined payload types will require adding required dependencies to your pom file.

6.6.2. Output

Headers

No HTTP message headers are mapped to the outbound Message.

Payload

The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body. Note that ResponseEntity (referenced by the expression #root) cannot be deserialized by Jackson by default, but may be rendered as a HashMap.

6.6.3. Options

The http-request processor has the following options:

Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]

Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]

Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]

6.7. Script Processor

Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

6.7.1. Options

The script-processor processor has the following options:

script-processor.language

Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)

script-processor.script

Text of the script. (String, default: <none>)

script-processor.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

script-processor.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

6.8. Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages. The processor uses a function that takes a Message<?> as input and then produces a List<Message<?> as output based on various properties (see below). You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.

6.8.1. Payload

  • Incoming payload - Message<?>

If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

  • Outgoing payload - List<Message<?>

6.9. Transform Processor

Transformer processor allows you to convert the message payload structure based on a SpEL expression.

Here is an example of how you can run this application.

java -jar transform-processor-kafka-<version>.jar \
    --spel.function.expression=payload.toUpperCase()

Change kafka to rabbit if you want to run it against RabbitMQ.

6.9.1. Payload

The incoming message can contain any type of payload.

6.10. Twitter Trend and Trend Locations Processor

Processor that can return either trending topic or the Locations of the trending topics. The twitter.trend.trend-query-type property allow to select the query type.

6.10.1. Retrieve trending topic in a location (optionally)

For this mode set twitter.trend.trend-query-type to trend.

Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.

6.10.2. Retrieve trend Locations

For this mode set twitter.trend.trend-query-type to trendLocation.

Retrieve a full or nearby locations list of trending topics by location.

If the latitude, longitude parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.

If the latitude, longitude parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.

Response is an array of locations that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.

7. Sinks

7.1. Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

7.1.1. Payload

A JSON String or byte array representing the entity (or a list of entities) to be persisted.

7.1.2. Options

The cassandra sink has the following options:

7.2. Analytics Sink

Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.

The analytics sink can produce two metrics types:

  • Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.

  • Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.

A Meter (e.g Counter or Gauge) is uniquely identified by its name and dimensions (the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.

As a metrics is uniquely identified by its name and dimensions, you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.

Use the analytics.name or analytics.name-expression properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.

Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>, property for adding one or many tags to your metrics. the TAG_NAME used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL expression that dynamically computes the tag value from the incoming message.

The SpEL expressions use the headers and payload keywords to access message’s headers and payload values.

You can use literals (e.g. 'fixed value') to set tags with fixed values.

All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront, Prometheus and InfluxDB and you can enable each of them declaratively. You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink applications.

Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.

  • To enable the Prometheus meter registry, set the following properties.

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
  • To enable Wavefront meter registry, set the following properties.

management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
  • To enable InfluxDB meter registry, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.

Following diagram illustrates how the Analytics Sink helps to collection business insides for stock-exchange, real-time pipeline.

Analytics Architecture

7.2.1. Payload

The incoming message can contain any type of payload.

7.3. Elasticsearch Sink

Sink that indexes documents into Elasticsearch.

This Elasticsearch sink only supports indexing JSON documents. It consumes data from an input destination and then indexes it to Elasticsearch. The input data can be a plain json string, or a java.util.Map that represents the JSON. It also accepts the data as the Elasticsearch provided XContentBuilder. However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder. This is provided mainly for direct invocation of the consumer.

7.3.1. Options

The Elasticsearch sink has the following options:

7.3.2. Examples of running this sink

  1. From the folder elasticsearch-sink: ./mvnw clean package

  2. cd apps

  3. cd to the proper binder generated app (Kafka or RabbitMQ)

  4. ./mvnw clean package

  5. Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command. docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

  6. Start the middleware (Kafka or RabbitMQ) if it is not already running.

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing

  8. Send some JSON data into the middleware destination. For e.g: {"foo":"bar"}

  9. Verify that the data is indexed: curl localhost:9200/testing/_search

7.4. File Sink

The file sink app writes each message it receives to a file.

7.4.1. Payload

  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.4.2. Options

The file-sink has the following options:

7.5. FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

7.5.1. Headers

  • file_name (See note above)

7.5.2. Payload

  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.5.3. Output

N/A (writes to the FTP server).

7.5.4. Options

The ftp sink has the following options:

7.6. JDBC Sink

JDBC sink allows you to persist incoming payload into an RDBMS database.

The jdbc.consumer.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.consumer.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.6.1. Examples

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
            --jdbc.consumer.columns=name \
            --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
            --spring.datasource.url='jdbc:mysql://localhost:3306/test

7.6.2. Payload

7.6.3. Options

The jdbc sink has the following options:

7.7. Apache Kafka Sink

This module publishes messages to Apache Kafka.

7.7.1. Options

The kafka sink has the following options:

(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)

7.8. Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

7.8.1. Options

The log sink has the following options:

7.9. MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

7.9.1. Input

Payload
  • Any POJO

  • String

  • byte[]

7.9.2. Options

The mongodb sink has the following options:

7.10. MQTT Sink

This module sends messages to MQTT.

7.10.1. Payload:

  • byte[]

  • String

7.10.2. Options

The mqtt sink has the following options:

7.11. Pgcopy Sink

A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.

7.11.1. Input

Headers
Payload
  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

7.11.2. Output

N/A

7.11.3. Options

The jdbc sink has the following options:

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.11.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

For integration tests to run, start a PostgreSQL database on localhost:

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. Examples

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

This module sends messages to RabbitMQ.

7.12.1. Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

7.13. Redis Sink

Sends messages to Redis.

7.13.1. Options

The redis sink has the following options:

7.14. Router Sink

This application routes messages to named channels.

7.14.1. Options

The router sink has the following options:

router.default-output-binding

Where to send un-routable messages. (String, default: <none>)

router.destination-mappings

Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.expression

The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default: <none>)

router.refresh-delay

How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)

router.resolution-required

Whether channel resolution is required. (Boolean, default: false)

router.script

The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)

router.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

This router sink is based on a StreamBridge API from Spring Cloud Stream, therefore destinations can be created as needed. In this case a defaultOutputBinding can only be reached if key is not included into destinationMappings. The resolutionRequired = true neglects defaultOutputBinding and throws an exception if no mapping and no respective binding already declared.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputBinding, which must also appear in the list.

The destinationMappings are used to map the evaluation results to an actual destination name.

7.14.2. SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring a Generic Router section.

7.14.3. Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

7.15. RSocket Sink

RSocket sink to send data using RSocket protocols' fire and forget strategy.

7.15.1. Options

The rsocket sink has the following options:

7.16. Amazon S3 Sink

This sink app supports transferring objects to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain one of the payload as:

  • File, including directories for recursive upload;

  • InputStream;

  • byte[]

7.16.1. Options

The s3 sink has the following options:

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean. See Spring Integration AWS support for more details.

7.16.2. Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • spring.cloud.aws.credentials.accessKey

  • spring.cloud.aws.credentials.secretKey

  • spring.cloud.aws.credentials.instanceProfile

  • spring.cloud.aws.credentials.profileName

  • spring.cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto

  • cloud.aws.region.static

Examples
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

7.17.1. Input

Headers
  • file_name (See note above)

Payload
  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.17.2. Output

N/A (writes to the SFTP server).

7.17.3. Options

The sftp sink has the following options:

7.18. TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

7.18.1. Options

The tcp sink has the following options:

7.18.2. Available Encoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

7.19. Throughput Sink

Sink that will count messages and log the observed throughput at a selected interval.

7.19.1. Options

The throughput sink has the following options:

throughput.report-every-ms

how often to report. (Integer, default: 1000)

7.20. Twitter Message Sink

Send Direct Messages to a specified user from the authenticating user. Requires a JSON POST body and Content-Type header to be set to application/json.

When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint.

SpEL expressions are used to compute the request parameters from the input message.

7.20.1. Options

Use single quotes (') to wrap the literal values of the SpEL expression properties. For example to set a fixed message text use text='Fixed Text'. For fixed target userId use userId='666'.

7.21. Twitter Update Sink

Updates the authenticating user’s current text (e.g Tweeting).

For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row.

While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.

7.22. Wavefront Sink

The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.

Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.

7.22.1. Options

The Wavefront sink has the following options:

7.23. Websocket Sink

A simple Websocket Sink implementation.

7.23.1. Options

The following options are supported:

7.23.2. Examples

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Rabbitmq
Step 2: Deploy a time-source
Step 3: Deploy the websocket-sink

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.23.3. Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketconsumertrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketconsumertrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser.

7.24. XMPP Sink

The "xmpp" sink enables sending messages to a XMPP server.

7.24.1. Input

  • byte[]

7.24.2. Output

Payload

N/A

7.24.3. Options

The zeromq sink has the following options:

7.24.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

7.25. ZeroMQ Sink

The "zeromq" sink enables sending messages to a ZeroMQ socket.

7.25.1. Input

  • byte[]

7.25.2. Output

Payload

N/A

7.25.3. Options

The zeromq sink has the following options:

7.25.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

7.25.5. Examples

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=

Appendices

Appendix A: Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

A.1. Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

A.2. Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).