Reference Guide

This section provides you with a detailed overview of the out-of-the-box Spring Cloud Stream Applications. It assumes familiarity with general Spring Cloud Stream concepts, which you can find in the Spring Cloud Stream reference documentation.

These Spring Cloud Stream Applications provide you with out-of-the-box Spring Cloud Stream utility applications that you can run independently or with Spring Cloud Data Flow. They include:

  • Connectors (sources, processors, and sinks) for a variety of middleware technologies, including message brokers, storage (relational, non-relational, filesystem).

  • Adapters for various network protocols.

  • Generic processors that you can customize with Spring Expression Language (SpEL) or by scripting.

You can find a detailed listing of all the applications and their options in the following sections of this guide.

Most of these applications are based on core elements that are exposed as a java.util.function component. You can learn more about these foundational components and how they are all connected to the applications by reading this README.

1. Pre-built Applications

Out-of-the-box applications are Spring Boot applications that include a binder implementation on top of the basic logic of the app (a function for example) — a fully functional uber-jar. These uber-jars include the minimal code required for standalone execution. For each function application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.

Prebuilt applications are generated according to the stream apps generator Maven plugin.

2. Classification

Based on their target application type, they can be either:

  • A source that connects to an external resource to poll and receive data that is published to the default “output” channel;

  • A processor that receives data from an “input” channel and processes it, sending the result on the default “output” channel;

  • A sink that connects to an external resource to send the received data to the default “input” channel.

The prebuilt applications follow a naming convention: <functionality>-<type>-<binder>. For example, rabbit-sink-kafka is a Rabbit sink that uses the Kafka binder that is running with Kafka as the middleware.

2.1. Maven Access

The core functionality of the applications is available as functions. See the Java Functions section in the stream-applications repository for more details. Prebuilt applications are available as Maven artifacts. You can download the executable jar artifacts from the Spring Maven repositories. The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/. From there, you can navigate to the latest released version of a specific app. If you want to use functions directly in a custom application, those artifacts are available under the directory structure org/springframework/cloud/fn. You need to use the Release, Milestone and Snapshot repository locations for Release, Milestone and Snapshot executable jar artifacts respectively.

2.2. Docker Access

The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/. Naming and versioning follows the same general conventions as Maven — for example:

docker pull springcloudstream/cassandra-sink-kafka

The preceding command pulls the latest Docker image of the Cassandra sink with the Kafka binder.

2.3. Build

You can build everything from the root of the repository.

./mvnw clean install

This is a long build and you may want to skip tests:

./mvnw clean install -DskipTests

However, this may not be what you are interested in doing since you are probably interested in a single application or a few of them. In order to build the functions and applications that you are interested in, you need to build them selectively as shown below.

2.4. Building the root parent

First, we need to build the parent used in various components.

./mvnw clean install -f stream-applications-build

2.4.1. Building functions

./mvnw clean install -f functions -DskipTests

You can also build a single function or group of functions. For e.g if you are only interested in jdbc-supplier and log-consumer, do the following.

./mvnw clean install -pl :jdbc-suppler,:log-consumer

2.4.2. Building core for Stream Applications

./mvnw clean install -f applications/stream-applications-core -DskipTests

2.5. Building the applications

Let’s assume that you want to build JDBC Source application based on Kafka Binder in Spring Cloud Stream and Log Sink application based on Rabbit binder. Here is what you need to do. Assuming you built both functions and stream-applications-core as above.

./mvnw clean package -pl :jdbc-source
cd applications/source/jdbc-source/apps/jdbc-source-kafka
./mvnw clean package

This will generate the Kafka binder based uber jar in the target folder.

Similarly for the log sink, do the following.

./mvnw clean package -pl :log-sink
cd applications/sink/log-sink/apps/log-sink-rabbit
./mvnw clean package

2.5.1. Building a Docker image

The apps use the Jib Maven Plugin to build and publish the Docker image. If you have made some changes to an app, you may want to build the image and test it locally.

If you plan to use the image with minikube, run the following command before building the image:
eval $(minikube docker-env)

To build the image in your local registry:

./mvnw clean package jib:dockerBuild

To publish the image to a remote registry:

./mvnw jib:build \
    -Djib.to.image=myregistry/myimage:latest \
    -Djib.to.auth.username=$USERNAME \
    -Djib.to.auth.password=$PASSWORD

3. Patching Pre-built Applications

3.1. Adding new dependencies

If you are looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference. To add mysql driver to jdbc-sink application:

  1. Clone the GitHub repository at github.com/spring-cloud/stream-applications

  2. Find the module that you want to patch and add the additional dependencies, jdbc-sink in this case. For example, you can add the following mysql dependency to the application generator plugin’s configuration in the pom.xml:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.37</version>
  </dependency>

This is how the complete plugin configuration should look like.

 <plugin>
    <groupId>org.springframework.cloud.stream.app.plugin</groupId>
    <artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
    <configuration>
        <generatedApp>
            <name>jdbc</name>
            <type>sink</type>
            <version>${project.version}</version>
            <configClass>org.springframework.cloud.fn.consumer.jdbc.JdbcConsumerConfiguration.class</configClass>
        </generatedApp>
        <dependencies>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.37</version>
              </dependency>
            <dependency>
                <groupId>org.springframework.cloud.fn</groupId>
                <artifactId>jdbc-consumer</artifactId>
                <version>${java-functions.version}</version>
            </dependency>
        </dependencies>
    </configuration>
</plugin>

Once the above changes are done, you can generate the binder based apps as below from the root of the repository.

./mvnw clean install -pl :jdbc-sink

This generates the binder based applications in the apps folder under jdbc-sink folder. In order to build the app with the binder flavor that you are interested in, you need to do the following step.

cd applications/sink/jdbc-sink
cd apps/jdbc-sink-kafka # (or Rabbit if you are interested in that)
./mvnw clean package
cd target

There you will find the binder based uber jar with your changes.

3.2. Update existing dependencies or add new resources in the application

Modifying the plugin as above work when there are new dependencies to add to the application. However, when we need to update any existing dependencies, it is easier to make the maven changes in the generated application itself. If we have to update the binder dependencies from a new release of Spring Cloud Stream for example, then those versions need to be updated in the generated application.

Here are the steps (again, we are using jdbc-sink-kafka as an example).

./mvnw clean install -pl :jdbc-sink
cd applications/sink/jdbc-sink/apps/jdbc-sink-kafka

Open the generated application’s pom.xml and update the dependencies. If there is a new version of Spring Cloud Stream update available that contains the enhancements we are looking for, then it is easier to update the BOM itself. Find where the bom is declared in pom.xml and update the version.

For example, if we have to update Spring Cloud Stream to 3.2.4-SNAPSHOT, this version must be specified in the BOM declaration as below:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-dependencies</artifactId>
            <version>4.0.3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

We can also update any individual dependencies directly, but it is preferred to use the above dependencyManagement approach if there is a BOM available. This is because, when using a BOM, maven will properly use and align any transitive dependencies.

If you have to modify the application further, this method of modifying the generated application is again the recommended approach.

For instance, if you want to add security certificate files such as a key store, or a trust store to the application’s classpath, then generate the application first and add those resources to the classpath.

Make sure you are in the generated jdbc-sink-kafka folder, then do the following:

First, add the resources to the classpath by placing them under src/main/resources.

Then rebuild the application.

./mvnw clean package
cd target

Here you can find the modified application jar file.

4. Generating out of the box applications for other binders

By default, we only provide out of the box applications for Apache Kafka and RabbitMQ binders. There are other binder implementations exist, for which we can generate these same out of the box applications. For example, if one wants to generate these applications for the Kinesis binder, or the Solace binder etc. it is possible to do so by following the instructions below.

As a first step, clone the stream applications repository.

cd applications/stream-applications-core

We need to edit the pom.xml in this module. Find the following configuration where it defines the Kafka and RabbitMQ binders for the maven plugin.

<kafka>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
        </dependencies>
    </maven>
</kafka>
<rabbit>
    <maven>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    </maven>
</rabbit>

Add the binder for which you want to generate new apps for. For example, if we want to generate applications for the Kinesis binder, then modify as below.

<binders>
    <kafka>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </kafka>
    <rabbit>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
                </dependency>
            </dependencies>
        </maven>
    </rabbit>
    <kinesis>
        <maven>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
                    <version>2.0.3.RELEASE</version>
                </dependency>
            </dependencies>
        </maven>
    </kinesis>
</binders>

Note that, we need to use the Kinesis binder version here explicitly, while both Kafka and RabbitMQ do not need them. This is because, those versions come from a dependency management while the Kinesis binder is not available through such mechanisms. Therefore, we need to explicitly use the binder version. If we have a BOM available that defines the version, then that can be used instead, just ensure that is declared in the proper BOM section of the maven plugin.

If the binder for which you are generating the applications relies on a different version of Spring Cloud Stream, make sure it is updated in the maven properties.

Now, we can build: ./mvnw clean install -DskipTests.

If we go to the applications folder and look at the generated applications, we should see the new binder variants there. For instance, if we follow the configuration above for adding the Kinesis binder, then we should see the Kinesis binder based app in the generated apps. Let’s take time-source as an example.

cd applications/source/time-souce/apps

Here, we should see three different binder based apps projects - time-source-kafka, time-source-rabbit and time-source-kineses. Similarly, this should happen for all the out of the box application projects.

Keep in mind that, these generated applications further need to be built individually. For that, go to the generated applications folder and then initiate a maven build.

Applications

5. Sources

5.1. Debezium Source

Debezium Engine based Change Data Capture (CDC) source. The Debezium Source allows capturing database change events and streaming those over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.

This source can be used with any Spring Cloud Stream message binder. It is not restricted nor depended on the Kafka Connect framework. Though this approach is flexible it comes with certain limitations.

All Debezium configuration properties are supported. Just precede any Debezium properties with the debezium.properties. prefix. For example to set the Debezium’s connector.class property use the debezium.properties.connector.class source property instead.

5.1.1. Database Support

The Debezium Source currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.

5.1.2. Options

Properties grouped by prefix:

debezium
debezium-native-configuration

<documentation missing> (Properties, default: <none>)

header-format

{@link ChangeEvent} header format. Defaults to 'JSON'. (DebeziumFormat, default: <none>, possible values: JSON,AVRO,PROTOBUF)

offset-commit-policy

The policy that defines when the offsets should be committed to offset storage. (DebeziumOffsetCommitPolicy, default: <none>, possible values: ALWAYS,PERIODIC,DEFAULT)

payload-format

{@link ChangeEvent} Key and Payload formats. Defaults to 'JSON'. (DebeziumFormat, default: <none>, possible values: JSON,AVRO,PROTOBUF)

properties

Spring pass-trough wrapper for debezium configuration properties. All properties with a 'debezium.properties.*' prefix are native Debezium properties. (Map<String, String>, default: <none>)

debezium.supplier
copy-headers

Copy Change Event headers into Message headers. (Boolean, default: true)

Event flattening configuration

Debezium provides a comprehensive message format, that accurately details information about changes that happen in the system. Sometime this format, though, might not be suitable for the downstream consumers, that might require messages that are formatted so that field names and values are presented in a simplified, flattened structure.

To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening message transformation. Using the flattering configuration you can configure simple messages format like this:

--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium Offset Storage

When a Debezium source runs, it reads information from the source and periodically records offsets that define how much of that information it has processed. Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading. Out of the box, the following offset storage configuration options are provided:

  • In-Memory

    Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
  • Local Filesystem

    Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
    --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1)
    --debezium.properties.offset.flush.interval.ms=60000 (2)
    1 Path to file where offsets are to be stored. Required when offset.storage` is set to the FileOffsetBackingStore.
    2 Interval at which to try committing offsets. The default is 1 minute.
  • Kafka topic

    Uses a Kafka topic to store offset data.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
    --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1)
    --debezium.properties.offset.storage.partitions=2 (2)
    --debezium.properties.offset.storage.replication.factor=1 (3)
    --debezium.properties.offset.flush.interval.ms=60000 (4)
    1 The name of the Kafka topic where offsets are to be stored. Required when offset.storage is set to the KafkaOffsetBackingStore.
    2 The number of partitions used when creating the offset storage topic.
    3 Replication factor used when creating the offset storage topic.
    4 Interval at which to try committing offsets. The default is 1 minute.

One can implement the org.apache.kafka.connect.storage.OffsetBackingStore interface in to provide a offset storage bound to a custom backend key-value store.

5.1.3. Examples and Testing

The debezium integration tests use databases fixtures, running on the local machine. Pre-build debezium docker database images with the help of Testcontainers are leveraged.

To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.

MySQL

Start the debezium/example-mysql in a docker:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

Use following properties to connect the Debezium Source to MySQL DB:

debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)

debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)


debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)

debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)

debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 Configures the Debezium Source to use MySqlConnector.
2 Metadata used to identify and dispatch the incoming events.
3 Connection to the MySQL server running on localhost:3306 as debezium user.
4 Includes the Change Event Value schema in the ChangeEvent message.
5 Enables the Change Event Flattening.
6 Source state to preserver between multiple starts.

You can run also the DebeziumDatabasesIntegrationTest#mysql() using this mysql configuration.

Disable the mysql GenericContainer test initialization code.
PostgreSQL

Start a pre-configured postgres server from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final

You can connect to this server like this:

psql -U postgres -h localhost -p 5432

Use following properties to connect the Debezium Source to PostgreSQL:

debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=postgres  (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 Configures Debezium Source to use PostgresConnector.
2 Configures the Debezium engine to use memory stores.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the PostgreSQL server running on localhost:5432 as postgres user.
5 Includes the Change Event Value schema in the message.
6 Enables the Chage Event Flattening.

You can run also the DebeziumDatabasesIntegrationTest#postgres() using this postgres configuration.

Disable the postgres GenericContainer test initialization code.
MongoDB

Start a pre-configured mongodb from the debezium/example-mongodb:2.3.3.Final container image:

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:2.3.3.Final

Initialize the inventory collections

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

In the mongodb terminal output, search for a log entry like host: "3f95a8a6516e:27017" :

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

Add 127.0.0.1 3f95a8a6516e entry to your /etc/hosts

Use following properties to connect the Debezium Source to MongoDB:

debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)

debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)

debezium.properties.tasks.max=1 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 Configures Debezium Source to use MongoDB Connector.
2 Configures the Debezium engine to use memory.
3 Connection to the MongoDB running on localhost:27017 as debezium user.
4 debezium.io/docs/connectors/mongodb/#tasks
5 Includes the Change Event Value schema in the SourceRecord events.
6 Enables the Chnage Event Flattening.

You can run also the DebeziumDatabasesIntegrationTest#mongodb() using this mongodb configuration.

SQL Server

Start a sqlserver from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

Populate with sample data form debezium SqlServer tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

Use following properties to connect the Debezium Source to SQLServer:

debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=sa  (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 Configures Debezium Source to use SqlServerConnector.
2 Configures the Debezium engine to use memory state stores.
3 Metadata used to identify and dispatch the incoming events.
4 Connection to the SQL Server running on localhost:1433 as sa user.

You can run also the DebeziumDatabasesIntegrationTest#sqlServer() using this SqlServer configuration.

Oracle

Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up

Populate with sample data form Debezium Oracle tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --file.supplier.mode=lines, you can also provide the additional option --file.supplier.withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

5.2.1. Options

The file source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

file.supplier
delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

directory

The directory to poll for new files. (File, default: <none>)

filename-pattern

A simple ant pattern to match files. (String, default: <none>)

filename-regex

A regex pattern to match files. (Pattern, default: <none>)

prevent-duplicates

Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default: true)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

5.3. FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.3.1. Input

N/A (Fetches files from an FTP server).

5.3.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_originalFile: <java.io.File>

  • file_name: <file name>

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:

None.

Payload:

A java.io.File object.

5.3.3. Options

The ftp source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

ftp.factory
cache-sessions

Cache sessions. (Boolean, default: <none>)

client-mode

The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)

host

The host name of the server. (String, default: localhost)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 21)

username

The username to use to connect to the server. (String, default: <none>)

ftp.supplier
auto-create-local-dir

Set to true to create the local directory if it does not exist. (Boolean, default: true)

delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

delete-remote-files

Set to true to delete remote files after successful transfer. (Boolean, default: false)

filename-pattern

A filter pattern to match the names of files to transfer. (String, default: <none>)

filename-regex

A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)

local-dir

The local directory to use for file transfers. (File, default: <none>)

preserve-timestamp

Set to true to preserve the original timestamp. (Boolean, default: true)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

5.3.4. Examples

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

5.4.1. Payload:

If content type matches text/* or application/json

  • String

If content type does not match text/* or application/json

  • byte array

5.4.2. Options

The http source supports the following configuration properties:

Properties grouped by prefix:

http.cors
allow-credentials

Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default: <none>)

allowed-headers

List of request headers that can be used during the actual request. (String[], default: <none>)

allowed-origins

List of allowed origins, e.g. https://domain1.com. (String[], default: <none>)

http
mapped-request-headers

Headers that will be mapped. (String[], default: <none>)

path-pattern

HTTP endpoint path mapping. (String, default: /)

server
port

Server HTTP port. (Integer, default: 8080)

5.5. JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

5.5.1. Payload

  • Map<String, Object> when jdbc.split == true (default) and List<Map<String, Object>> otherwise

5.5.2. Options

The jdbc source has the following options:

Properties grouped by prefix:

jdbc.supplier
max-rows

Max numbers of rows to process for query. (Integer, default: 0)

query

The query to use to select data. (String, default: <none>)

split

Whether to split the SQL result as individual messages. (Boolean, default: true)

update

An SQL update statement to execute for marking polled messages as 'seen'. (String, default: <none>)

spring.datasource
driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

password

Login password of the database. (String, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

spring.integration.poller
cron

Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default: <none>)

fixed-delay

Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default: <none>)

fixed-rate

Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default: <none>)

initial-delay

Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default: <none>)

max-messages-per-poll

Maximum number of messages to poll per polling cycle. (Integer, default: <none>)

receive-timeout

How long to wait for messages on poll. (Duration, default: 1s)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

5.6. JMS Source

The JMS source enables receiving messages from JMS.

5.6.1. Options

The JMS source has the following options:

Properties grouped by prefix:

jms.supplier
client-id

Client id for durable subscriptions. (String, default: <none>)

destination

The destination from which to receive messages (queue or topic). (String, default: <none>)

message-selector

A selector for messages. (String, default: <none>)

session-transacted

True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default: true)

subscription-durable

True for a durable subscription. (Boolean, default: <none>)

subscription-name

The name of a durable or shared subscription. (String, default: <none>)

subscription-shared

True for a shared subscription. (Boolean, default: <none>)

spring.jms
jndi-name

Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default: <none>)

pub-sub-domain

Whether the default destination type is topic. (Boolean, default: false)

spring.jms.listener
acknowledge-mode

Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default: <none>, possible values: AUTO,CLIENT,DUPS_OK)

auto-startup

Start the container automatically on startup. (Boolean, default: true)

concurrency

Minimum number of concurrent consumers. When max-concurrency is not specified the minimum will also be used as the maximum. (Integer, default: <none>)

max-concurrency

Maximum number of concurrent consumers. (Integer, default: <none>)

receive-timeout

Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no timeout at all. The latter is only feasible if not running within a transaction manager and is generally discouraged since it prevents clean shutdown. (Duration, default: 1s)

5.7. Apache Kafka Source

This module consumes messages from Apache Kafka.

5.7.1. Options

The kafka source has the following options:

(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)

Properties grouped by prefix:

kafka.supplier
ack-discarded

Whether to acknowledge discarded records after 'RecordFilterStrategy'. (Boolean, default: false)

record-filter

SpEL expression for 'RecordFilterStrategy' with a 'ConsumerRecord' as a root object. (Expression, default: <none>)

topic-pattern

Apache Kafka topics pattern to subscribe. (Pattern, default: <none>)

topics

Apache Kafka topics to subscribe. (String[], default: <none>)

spring.kafka
bootstrap-servers

Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. (List<String>, default: <none>)

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)

properties

Additional properties, common to producers and consumers, used to configure the client. (Map<String, String>, default: <none>)

spring.kafka.consumer
auto-commit-interval

Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. (Duration, default: <none>)

auto-offset-reset

What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. (String, default: <none>)

bootstrap-servers

Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. (List<String>, default: <none>)

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)

enable-auto-commit

Whether the consumer's offset is periodically committed in the background. (Boolean, default: <none>)

fetch-max-wait

Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size". (Duration, default: <none>)

fetch-min-size

Minimum amount of data the server should return for a fetch request. (DataSize, default: <none>)

group-id

Unique string that identifies the consumer group to which this consumer belongs. (String, default: <none>)

heartbeat-interval

Expected time between heartbeats to the consumer coordinator. (Duration, default: <none>)

isolation-level

Isolation level for reading messages that have been written transactionally. (IsolationLevel, default: read-uncommitted, possible values: READ_UNCOMMITTED,READ_COMMITTED)

key-deserializer

Deserializer class for keys. (Class<?>, default: <none>)

max-poll-records

Maximum number of records returned in a single call to poll(). (Integer, default: <none>)

properties

Additional consumer-specific properties used to configure the client. (Map<String, String>, default: <none>)

value-deserializer

Deserializer class for values. (Class<?>, default: <none>)

spring.kafka.listener
ack-count

Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". (Integer, default: <none>)

ack-mode

Listener AckMode. See the spring-kafka documentation. (AckMode, default: <none>, possible values: RECORD,BATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE)

ack-time

Time between offset commits when ackMode is "TIME" or "COUNT_TIME". (Duration, default: <none>)

async-acks

Support for asynchronous record acknowledgements. Only applies when spring.kafka.listener.ack-mode is manual or manual-immediate. (Boolean, default: <none>)

auto-startup

Whether to auto start the container. (Boolean, default: true)

client-id

Prefix for the listener's consumer client.id property. (String, default: <none>)

concurrency

Number of threads to run in the listener containers. (Integer, default: <none>)

idle-between-polls

Sleep interval between Consumer.poll(Duration) calls. (Duration, default: 0)

idle-event-interval

Time between publishing idle consumer events (no data received). (Duration, default: <none>)

idle-partition-event-interval

Time between publishing idle partition consumer events (no data received for partition). (Duration, default: <none>)

immediate-stop

Whether the container stops after the current record is processed or after all the records from the previous poll are processed. (Boolean, default: false)

log-container-config

Whether to log the container configuration during initialization (INFO level). (Boolean, default: <none>)

missing-topics-fatal

Whether the container should fail to start if at least one of the configured topics are not present on the broker. (Boolean, default: false)

monitor-interval

Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

no-poll-threshold

Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. (Float, default: <none>)

poll-timeout

Timeout to use when polling the consumer. (Duration, default: <none>)

type

Listener type. (Type, default: single)

5.8. Load Generator Source

A source that sends generated data and dispatches it to the stream.

5.8.1. Options

The load-generator source has the following options:

load-generator.generate-timestamp

Whether timestamp generated. (Boolean, default: false)

load-generator.message-count

Message count. (Integer, default: 1000)

load-generator.message-size

Message size. (Integer, default: 1000)

load-generator.producers

Number of producers. (Integer, default: 1)

5.9. Mail Source

A source application that listens for Emails and emits the message body as a message payload.

5.9.1. Options

The mail source has the following options:

mail.supplier.charset

The charset for byte[] mail-to-string transformation. (String, default: UTF-8)

mail.supplier.delete

Set to true to delete email after download. (Boolean, default: false)

mail.supplier.expression

Configure a SpEL expression to select messages. (String, default: true)

mail.supplier.idle-imap

Set to true to use IdleImap Configuration. (Boolean, default: false)

mail.supplier.java-mail-properties

JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

mail.supplier.mark-as-read

Set to true to mark email as read. (Boolean, default: false)

mail.supplier.url

Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default: <none>)

mail.supplier.user-flag

The flag to mark messages when the server does not support \Recent. (String, default: <none>)

5.10. MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

5.10.1. Options

The mongodb source has the following options:

Properties grouped by prefix:

mongodb.supplier
collection

The MongoDB collection to query. (String, default: <none>)

query

The MongoDB query. (String, default: { })

query-expression

The SpEL expression in MongoDB query DSL style. (Expression, default: <none>)

split

Whether to split the query result as individual messages. (Boolean, default: true)

update-expression

The SpEL expression in MongoDB update DSL style. (Expression, default: <none>)

spring.data.mongodb
additional-hosts

Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default: <none>)

authentication-database

Authentication database name. (String, default: <none>)

auto-index-creation

Whether to enable auto-index creation. (Boolean, default: <none>)

database

Database name. Overrides database in URI. (String, default: <none>)

field-naming-strategy

Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)

host

Mongo server host. Cannot be set with URI. (String, default: <none>)

password

Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)

port

Mongo server port. Cannot be set with URI. (Integer, default: <none>)

replica-set-name

Required replica set name for the cluster. Cannot be set with URI. (String, default: <none>)

uri

Mongo database URI. Overrides host, port, username, and password. (String, default: mongodb://localhost/test)

username

Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

uuid-representation

Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default: java-legacy, possible values: UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)

spring.data.mongodb.gridfs
bucket

GridFS bucket name. (String, default: <none>)

database

GridFS database name. (String, default: <none>)

spring.data.mongodb.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default: <none>)

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

5.11. MQTT Source

Source that enables receiving messages from MQTT.

5.11.1. Payload:

  • String if binary setting is false (default)

  • byte[] if binary setting is true

5.11.2. Options

The mqtt source has the following options:

Properties grouped by prefix:

mqtt
clean-session

whether the client and server should remember state across restarts and reconnects. (Boolean, default: true)

connection-timeout

the connection timeout in seconds. (Integer, default: 30)

keep-alive-interval

the ping interval in seconds. (Integer, default: 60)

password

the password to use when connecting to the broker. (String, default: guest)

persistence

'memory' or 'file'. (String, default: memory)

persistence-directory

Persistence directory. (String, default: /tmp/paho)

ssl-properties

MQTT Client SSL properties. (Map<String, String>, default: <none>)

url

location of the mqtt broker(s) (comma-delimited list). (String[], default: [tcp://localhost:1883])

username

the username to use when connecting to the broker. (String, default: guest)

mqtt.supplier
binary

true to leave the payload as bytes. (Boolean, default: false)

charset

the charset used to convert bytes to String (when binary is false). (String, default: UTF-8)

client-id

identifies the client. (String, default: stream.client.id.source)

qos

the qos; a single value for all topics or a comma-delimited list to match the topics. (Integer[], default: [0])

topics

the topic(s) (comma-delimited) to which the source will subscribe. (String[], default: [stream.mqtt])

5.12. RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

5.12.1. Input

N/A

5.12.2. Output

Payload
  • byte[]

5.12.3. Options

The rabbit source has the following options:

Properties grouped by prefix:

rabbit.supplier
enable-retry

true to enable retry. (Boolean, default: false)

initial-retry-interval

Initial retry interval when retry is enabled. (Integer, default: 1000)

mapped-request-headers

Headers that will be mapped. (String[], default: [STANDARD_REQUEST_HEADERS])

max-attempts

The maximum delivery attempts when retry is enabled. (Integer, default: 3)

max-retry-interval

Max retry interval when retry is enabled. (Integer, default: 30000)

own-connection

When true, use a separate connection based on the boot properties. (Boolean, default: false)

queues

The queues to which the source will listen for messages. (String[], default: <none>)

requeue

Whether rejected messages should be requeued. (Boolean, default: true)

retry-multiplier

Retry backoff multiplier when retry is enabled. (Double, default: 2)

transacted

Whether the channel is transacted. (Boolean, default: false)

spring.rabbitmq
address-shuffle-mode

Mode used to shuffle configured addresses. (AddressShuffleMode, default: none, possible values: NONE,RANDOM,INORDER)

addresses

Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default: <none>)

channel-rpc-timeout

Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default: 10m)

connection-timeout

Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)

host

RabbitMQ host. Ignored if an address is set. (String, default: localhost)

password

Login to authenticate against the broker. (String, default: guest)

port

RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default: <none>)

publisher-confirm-type

Type of publisher confirms to use. (ConfirmType, default: <none>, possible values: SIMPLE,CORRELATED,NONE)

publisher-returns

Whether to enable publisher returns. (Boolean, default: false)

requested-channel-max

Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default: 2047)

requested-heartbeat

Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

username

Login user to authenticate to the broker. (String, default: guest)

virtual-host

Virtual host to use when connecting to the broker. (String, default: <none>)

spring.rabbitmq.listener.simple
acknowledge-mode

Acknowledge mode of container. (AcknowledgeMode, default: <none>, possible values: NONE,MANUAL,AUTO)

auto-startup

Whether to start the container automatically on startup. (Boolean, default: true)

batch-size

Batch size, expressed as the number of physical messages, to be used by the container. (Integer, default: <none>)

concurrency

Minimum number of listener invoker threads. (Integer, default: <none>)

consumer-batch-enabled

Whether the container creates a batch of messages based on the 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to include the contents of a producer created batch in the batch as discrete records. (Boolean, default: false)

de-batching-enabled

Whether the container should present batched messages as discrete messages or call the listener with the batch. (Boolean, default: true)

default-requeue-rejected

Whether rejected deliveries are re-queued by default. (Boolean, default: <none>)

idle-event-interval

How often idle container events should be published. (Duration, default: <none>)

max-concurrency

Maximum number of listener invoker threads. (Integer, default: <none>)

missing-queues-fatal

Whether to fail if the queues declared by the container are not available on the broker and/or whether to stop the container if one or more queues are deleted at runtime. (Boolean, default: true)

prefetch

Maximum number of unacknowledged messages that can be outstanding at each consumer. (Integer, default: <none>)

spring.rabbitmq.listener
type

Listener container type. (ContainerType, default: simple, possible values: SIMPLE,DIRECT,STREAM)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

5.12.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.12.5. Examples

java -jar rabbit-source.jar --rabbit.queues=

5.13. Amazon S3 Source

This source app supports transfer of files using the Amazon S3 protocol. Files are transferred from the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.13.1. mode = lines

Headers:
  • Content-Type: text/plain

  • file_orginalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

5.13.2. mode = ref

Headers:

None.

Payload:

A java.io.File object.

5.13.3. Options

The s3-source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

s3.supplier
auto-create-local-dir

Create or not the local directory. (Boolean, default: true)

delete-remote-files

Delete or not remote files after processing. (Boolean, default: false)

filename-pattern

The pattern to filter remote files. (String, default: <none>)

filename-regex

The regexp to filter remote files. (Pattern, default: <none>)

list-only

Set to true to return s3 object metadata without copying file to a local directory. (Boolean, default: false)

local-dir

The local directory to store files. (File, default: <none>)

preserve-timestamp

To transfer or not the timestamp of the remote file to the local one. (Boolean, default: true)

remote-dir

AWS S3 bucket resource. (String, default: bucket)

remote-file-separator

Remote File separator. (String, default: /)

tmp-file-suffix

Temporary file suffix. (String, default: .tmp)

spring.cloud.aws.credentials
access-key

The access key to be used with a static provider. (String, default: <none>)

instance-profile

Configures an instance profile credentials provider with no further configuration. (Boolean, default: false)

profile

The AWS profile. (Profile, default: <none>)

secret-key

The secret key to be used with a static provider. (String, default: <none>)

spring.cloud.aws.region
instance-profile

Configures an instance profile region provider with no further configuration. (Boolean, default: false)

profile

The AWS profile. (Profile, default: <none>)

static

<documentation missing> (String, default: <none>)

spring.cloud.aws.s3
accelerate-mode-enabled

Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations. (Boolean, default: <none>)

checksum-validation-enabled

Option to disable doing a validation of the checksum of an object stored in S3. (Boolean, default: <none>)

chunked-encoding-enabled

Option to enable using chunked encoding when signing the request payload for {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} and {@link software.amazon.awssdk.services.s3.model.UploadPartRequest}. (Boolean, default: <none>)

cross-region-enabled

Enables cross-region bucket access. (Boolean, default: <none>)

endpoint

Overrides the default endpoint. (URI, default: <none>)

path-style-access-enabled

Option to enable using path style access for accessing S3 objects instead of DNS style access. DNS style access is preferred as it will result in better load balancing when accessing S3. (Boolean, default: <none>)

region

Overrides the default region. (String, default: <none>)

use-arn-region-enabled

If an S3 resource ARN is passed in as the target of an S3 operation that has a different region to the one the client was configured with, this flag must be set to 'true' to permit the client to make a cross-region call to the region specified in the ARN otherwise an exception will be thrown. (Boolean, default: <none>)

5.13.4. Amazon AWS common options

The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

5.13.5. Examples

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP Source

This source application supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See sftp-supplier for advanced configuration options.

See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.

5.14.1. Input

N/A (Fetches files from an SFTP server).

5.14.2. Output

mode = contents
Headers:
  • Content-Type: application/octet-stream

  • file_name: <file name>

  • file_remoteFileInfo <file metadata>

  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A byte[] filled with the file contents.

mode = lines
Headers:
  • Content-Type: text/plain

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

  • file_marker : <file marker> (if with-markers is enabled)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref
Headers:
  • file_remoteHostPort: <host:port>

  • file_remoteDirectory: <relative-path>

  • file_remoteFile: <file-name>

  • file_originalFile: <absolute-path-of-local-file>

  • file_name <local-file-name>

  • file_relativePath

  • file_remoteFile: <remote-file-name>

  • sftp_selectedServer: <server-key> (if multi-source)

Payload:

A java.io.File object.

5.14.3. Options

The ftp source has the following options:

Properties grouped by prefix:

file.consumer
markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

metadata.store.dynamo-db
create-delay

Delay between create table retries. (Integer, default: 1)

create-retries

Retry number for create table request. (Integer, default: 25)

read-capacity

Read capacity on the table. (Long, default: 1)

table

Table name for metadata. (String, default: <none>)

time-to-live

TTL for table entries. (Integer, default: <none>)

write-capacity

Write capacity on the table. (Long, default: 1)

metadata.store.jdbc
region

Unique grouping identifier for messages persisted with this store. (String, default: DEFAULT)

table-prefix

Prefix for the custom table name. (String, default: <none>)

metadata.store.mongo-db
collection

MongoDB collection name for metadata. (String, default: metadataStore)

metadata.store.redis
key

Redis key for metadata. (String, default: <none>)

metadata.store
type

Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default: <none>, possible values: mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)

metadata.store.zookeeper
connect-string

Zookeeper connect string in form HOST:PORT. (String, default: 127.0.0.1:2181)

encoding

Encoding to use when storing data in Zookeeper. (Charset, default: UTF-8)

retry-interval

Retry interval for Zookeeper operations in milliseconds. (Integer, default: 1000)

root

Root node - store entries are children of this node. (String, default: /SpringIntegration-MetadataStore)

sftp.supplier
auto-create-local-dir

Set to true to create the local directory if it does not exist. (Boolean, default: true)

delay-when-empty

Duration of delay when no new files are detected. (Duration, default: 1s)

delete-remote-files

Set to true to delete remote files after successful transfer. (Boolean, default: false)

directories

A list of factory "name.directory" pairs. (String[], default: <none>)

factories

A map of factory names to factories. (Map<String, Factory>, default: <none>)

fair

True for fair rotation of multiple servers/directories. This is false by default so if a source has more than one entry, these will be received before the other sources are visited. (Boolean, default: false)

filename-pattern

A filter pattern to match the names of files to transfer. (String, default: <none>)

filename-regex

A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)

list-only

Set to true to return file metadata without the entire payload. (Boolean, default: false)

local-dir

The local directory to use for file transfers. (File, default: <none>)

max-fetch

The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default: <none>)

preserve-timestamp

Set to true to preserve the original timestamp. (Boolean, default: true)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

rename-remote-files-to

A SpEL expression resolving to the new name remote files must be renamed to after successful transfer. (Expression, default: <none>)

stream

Set to true to stream the file rather than copy to a local directory. (Boolean, default: false)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

sftp.supplier.factory
allow-unknown-keys

True to allow an unknown or changed key. (Boolean, default: false)

host

The host name of the server. (String, default: localhost)

known-hosts-expression

A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)

pass-phrase

Passphrase for user's private key. (String, default: <empty string>)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 22)

private-key

Resource location of user's private key. (Resource, default: <none>)

username

The username to use to connect to the server. (String, default: <none>)

sftp.supplier.sort-by
attribute

Attribute of the file listing entry to sort by (FILENAME, ATIME: last access time, MTIME: last modified time). (Attribute, default: <none>)

dir

Sorting direction (ASC or DESC). (Dir, default: <none>)

5.14.4. Examples

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. SYSLOG

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

5.15.1. Options

syslog.supplier.buffer-size

the buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)

syslog.supplier.nio

whether or not to use NIO (when supporting a large number of connections). (Boolean, default: false)

syslog.supplier.port

The port to listen on. (Integer, default: 1514)

syslog.supplier.protocol

Protocol used for SYSLOG (tcp or udp). (Protocol, default: <none>, possible values: tcp,udp,both)

syslog.supplier.reverse-lookup

whether or not to perform a reverse lookup on the incoming socket. (Boolean, default: false)

syslog.supplier.rfc

'5424' or '3164' - the syslog format according to the RFC; 3164 is aka 'BSD' format. (String, default: 3164)

syslog.supplier.socket-timeout

the socket timeout. (Integer, default: 0)

5.16. TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

5.16.1. Options

Properties grouped by prefix:

tcp
nio

Whether or not to use NIO. (Boolean, default: false)

port

The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)

reverse-lookup

Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)

socket-timeout

The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)

use-direct-buffers

Whether or not to use direct buffers. (Boolean, default: false)

tcp.supplier
buffer-size

The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)

decoder

The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

5.16.2. Available Decoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

5.17. Time Source

The time source will simply emit a String with the current time every so often.

5.17.1. Options

The time source has the following options:

spring.integration.poller.cron

Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default: <none>)

spring.integration.poller.fixed-delay

Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default: <none>)

spring.integration.poller.fixed-rate

Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default: <none>)

spring.integration.poller.initial-delay

Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default: <none>)

spring.integration.poller.max-messages-per-poll

Maximum number of messages to poll per polling cycle. (Integer, default: <none>)

spring.integration.poller.receive-timeout

How long to wait for messages on poll. (Duration, default: 1s)

time.date-format

Format for the date value. (String, default: MM/dd/yy HH:mm:ss)

5.18. Twitter Message Source

Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order. The relieved messages are cached (in a MetadataStore cache) to prevent duplications. By default an in-memory SimpleMetadataStore is used.

The twitter.message.source.count controls the number or returned messages.

The spring.cloud.stream.poller properties control the message poll interval. Must be aligned with used APIs rate limit

5.18.1. Options

Properties grouped by prefix:

spring.integration.poller
cron

Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default: <none>)

fixed-delay

Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default: <none>)

fixed-rate

Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default: <none>)

initial-delay

Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default: <none>)

max-messages-per-poll

Maximum number of messages to poll per polling cycle. (Integer, default: <none>)

receive-timeout

How long to wait for messages on poll. (Duration, default: 1s)

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

twitter.message.source
count

Max number of events to be returned. 20 default. 50 max. (Integer, default: 20)

5.19. Twitter Search Source

The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.

Returns a collection of relevant Tweets matching a specified query.

Use the spring.cloud.stream.poller properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)

The twitter.search query properties allows querying by keywords and filter the result by time and geolocation.

The twitter.search.count and twitter.search.page control the result pagination in accordance with to the Search API.

Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.

5.19.1. Options

Properties grouped by prefix:

spring.integration.poller
cron

Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default: <none>)

fixed-delay

Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default: <none>)

fixed-rate

Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default: <none>)

initial-delay

Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default: <none>)

max-messages-per-poll

Maximum number of messages to poll per polling cycle. (Integer, default: <none>)

receive-timeout

How long to wait for messages on poll. (Duration, default: 1s)

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

twitter.search
count

Number of tweets to return per page (e.g. per single request), up to a max of 100. (Integer, default: 100)

lang

Restricts searched tweets to the given language, given by an http://en.wikipedia.org/wiki/ISO_639-1 . (String, default: <none>)

page

Number of pages (e.g. requests) to search backwards (from most recent to the oldest tweets) before start the search from the most recent tweets again. The total amount of tweets searched backwards is (page * count) (Integer, default: 3)

query

Search tweets by search query string. (String, default: <none>)

restart-from-most-recent-on-empty-response

Restart search from the most recent tweets on empty response. Applied only after the first restart (e.g. when since_id != UNBOUNDED) (Boolean, default: false)

result-type

Specifies what type of search results you would prefer to receive. The current default is "mixed." Valid values include: mixed : Include both popular and real time results in the response. recent : return only the most recent results in the response popular : return only the most popular results in the response (ResultType, default: <none>, possible values: popular,mixed,recent)

since

If specified, returns tweets with since the given date. Date should be formatted as YYYY-MM-DD. (String, default: <none>)

twitter.search.geocode
latitude

User's latitude. (Double, default: -1)

longitude

User's longitude. (Double, default: -1)

radius

Radius (in kilometers) around the (latitude, longitude) point. (Double, default: -1)

5.20. Twitter Stream Source

Real-time Tweet streaming Filter and Sample APIs support.

  • The Filter API returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: The track, follow, and locations fields are combined with an OR operator! Queries with track=foo and follow=1234 returns Tweets matching test OR created by user 1234.

  • The Sample API returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.

The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.

5.20.1. Options

Properties grouped by prefix:

twitter.connection
access-token

Your Twitter token. (String, default: <none>)

access-token-secret

Your Twitter token secret. (String, default: <none>)

consumer-key

Your Twitter key. (String, default: <none>)

consumer-secret

Your Twitter secret. (String, default: <none>)

debug-enabled

Enables Twitter4J debug mode. (Boolean, default: false)

raw-json

Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default: true)

twitter.stream.filter
count

Indicates the number of previous statuses to stream before transitioning to the live stream. (Integer, default: 0)

filter-level

The filter level limits what tweets appear in the stream to those with a minimum filterLevel attribute value. One of either none, low, or medium. (FilterLevel, default: <none>)

follow

Specifies the users, by ID, to receive public tweets from. (List<Long>, default: <none>)

language

Specifies the tweets language of the stream. (List<String>, default: <none>)

locations

Locations to track. Internally represented as 2D array. Bounding box is invalid: 52.38, 4.90, 51.51, -0.12. The first pair must be the SW corner of the box (List<BoundingBox>, default: <none>)

track

Specifies keywords to track. (List<String>, default: <none>)

twitter.stream
type

<documentation missing> (StreamType, default: <none>, possible values: sample,filter,firehose,link)

5.21. Websocket Source

The Websocket source that produces messages through web socket.

5.21.1. Options

Properties grouped by prefix:

websocket.supplier
allowed-origins

The allowed origins. (String, default: *)

path

The path on which server WebSocket handler is exposed. (String, default: /websocket)

websocket.supplier.sock-js
enable

Enable SockJS service on the server. Default is 'false' (Boolean, default: false)

5.21.2. Examples

To verify that the websocket-source receives messages from Websocket clients, you can use the following simple end-to-end setup.

Step 1: Start kafka
Step 2: Deploy websocket-source on a specific port, say 8080
Step 3: Connect a websocket client on port 8080 path "/websocket", and send some messages.

You can start a kafka console consumer and see the messages there.

5.22. XMPP Source

The "xmpp" source enables receiving messages from an XMPP Server.

5.22.1. Input

N/A

5.22.2. Output

Payload
  • byte[]

5.22.3. Options

The xmpp source has the following options:

Properties grouped by prefix:

xmpp.factory
host

XMPP Host server to connect to. (String, default: <none>)

password

The Password for the connected user. (String, default: <none>)

port

Port for connecting to the host. - Default Client Port: 5222 (Integer, default: 5222)

resource

The Resource to bind to on the XMPP Host. - Can be empty, server will generate one if not set (String, default: <none>)

security-mode

<documentation missing> (SecurityMode, default: <none>, possible values: required,ifpossible,disabled)

service-name

The Service Name to set for the XMPP Domain. (String, default: <none>)

subscription-mode

<documentation missing> (SubscriptionMode, default: <none>, possible values: accept_all,reject_all,manual)

user

The User the connection should connect as. (String, default: <none>)

xmpp.supplier
payload-expression

<documentation missing> (Expression, default: <none>)

stanza-filter

<documentation missing> (StanzaFilter, default: <none>)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

5.22.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.22.5. Examples

java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost

5.23. ZeroMQ Source

The "zeromq" source enables receiving messages from ZeroMQ.

5.23.1. Input

N/A

5.23.2. Output

Payload
  • byte[]

5.23.3. Options

The zeromq source has the following options:

zeromq.supplier.bind-port

Bind Port for creating a ZeroMQ Socket; 0 selects a random port. (Integer, default: 0)

zeromq.supplier.connect-url

Connection URL for to the ZeroMQ Socket. (String, default: <none>)

zeromq.supplier.consume-delay

The delay to consume from the ZeroMQ Socket when no data received. (Duration, default: 1s)

zeromq.supplier.socket-type

The Socket Type the connection should make. (SocketType, default: <none>, possible values: PAIR,PUB,SUB,REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM,CLIENT,SERVER,RADIO,DISH,CHANNEL,PEER,RAW,SCATTER,GATHER)

zeromq.supplier.topics

The Topics to subscribe to. (String[], default: [])

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

5.23.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

5.23.5. Examples

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. Processors

6.1. Aggregator Processor

Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc

Change kafka to rabbit if you want to run it against RabbitMQ.

6.1.1. Payload

If an input payload is a byte[] and content-type header is a JSON, then JsonBytesToMap function tries to deserialize this payload to a Map for better data representation on the output of the aggregator function. Also, such a Map data representation makes it easy to access to the payload content from SpEL expressions mentioned below. Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.

6.1.2. Options

Properties grouped by prefix:

aggregator
aggregation

SpEL expression for aggregation strategy. Default is collection of payloads. (Expression, default: <none>)

correlation

SpEL expression for correlation key. Default to correlationId header. (Expression, default: <none>)

group-timeout

SpEL expression for timeout to expiring uncompleted groups. (Expression, default: <none>)

message-store-entity

Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc. (String, default: <none>)

message-store-type

Message store type. (String, default: <none>)

release

SpEL expression for release strategy. Default is based on the sequenceSize header. (Expression, default: <none>)

spring.data.mongodb
additional-hosts

Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default: <none>)

authentication-database

Authentication database name. (String, default: <none>)

auto-index-creation

Whether to enable auto-index creation. (Boolean, default: <none>)

database

Database name. Overrides database in URI. (String, default: <none>)

field-naming-strategy

Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)

host

Mongo server host. Cannot be set with URI. (String, default: <none>)

password

Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)

port

Mongo server port. Cannot be set with URI. (Integer, default: <none>)

replica-set-name

Required replica set name for the cluster. Cannot be set with URI. (String, default: <none>)

uri

Mongo database URI. Overrides host, port, username, and password. (String, default: mongodb://localhost/test)

username

Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

uuid-representation

Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default: java-legacy, possible values: UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)

spring.data.mongodb.gridfs
bucket

GridFS bucket name. (String, default: <none>)

database

GridFS database name. (String, default: <none>)

spring.data.mongodb.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default: <none>)

spring.data.redis
client-name

Client name to be set on connections with CLIENT SETNAME. (String, default: <none>)

client-type

Type of client to use. By default, auto-detected according to the classpath. (ClientType, default: <none>, possible values: LETTUCE,JEDIS)

connect-timeout

Connection timeout. (Duration, default: <none>)

database

Database index used by the connection factory. (Integer, default: 0)

host

Redis server host. (String, default: localhost)

password

Login password of the redis server. (String, default: <none>)

port

Redis server port. (Integer, default: 6379)

timeout

Read timeout. (Duration, default: <none>)

url

Connection URL. Overrides host, port, username, and password. Example: redis://user:[email protected]:6379 (String, default: <none>)

username

Login username of the redis server. (String, default: <none>)

spring.data.redis.cluster
max-redirects

Maximum number of redirects to follow when executing commands across the cluster. (Integer, default: <none>)

nodes

Comma-separated list of "host:port" pairs to bootstrap from. This represents an "initial" list of cluster nodes and is required to have at least one entry. (List<String>, default: <none>)

spring.data.redis.jedis.pool
enabled

Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default: <none>)

max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.data.redis.lettuce.pool
enabled

Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default: <none>)

max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.data.redis.lettuce
shutdown-timeout

Shutdown timeout. (Duration, default: 100ms)

spring.data.redis.sentinel
master

Name of the Redis server. (String, default: <none>)

nodes

Comma-separated list of "host:port" pairs. (List<String>, default: <none>)

password

Password for authenticating with sentinel(s). (String, default: <none>)

username

Login username for authenticating with sentinel(s). (String, default: <none>)

spring.data.redis.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default: <none>)

spring.datasource
driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

password

Login password of the database. (String, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

6.2. Bridge Processor

A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.

6.2.1. Payload

Any

6.3. Filter Processor

Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued. For example, if the incoming payload is of type String and you want to filter out anything that has less than five characters, you can run the filter processor as below.

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4

Change kafka to rabbit if you want to run it against RabbitMQ.

6.3.1. Payload

You can pass any type as payload and then apply SpEL expressions against it to filter. If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

6.3.2. Options

filter.function.expression

Boolean SpEL expression to apply against request message to filter. (Expression, default: <none>)

6.4. Groovy Processor

A Processor that applies a Groovy script against messages.

6.4.1. Options

The groovy-processor processor has the following options:

groovy-processor.script

Reference to a script used to process messages. (Resource, default: <none>)

groovy-processor.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

groovy-processor.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

6.5. Header Enricher Processor

Use the header-enricher app to add message headers.

The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions. For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'.

6.5.1. Options

The header-enricher processor has the following options:

header.enricher.headers

\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz. (Properties, default: <none>)

header.enricher.overwrite

set to true to overwrite any existing message headers. (Boolean, default: false)

6.6. Http Request Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload.

6.6.1. Input

Headers

Any Required HTTP headers must be explicitly set via the headers or headers-expression property. See examples below. Header values may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

Payload

The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:

  • the request body when referenced in the body-expression property.

  • the HTTP method when referenced in the http-method-expression property.

  • the URL when referenced in the url-expression property.

The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary. The expected-response-type property, String.class by default, may be set to any class in your application class path. Note that user defined payload types will require adding required dependencies to your pom file.

6.6.2. Output

Headers

No HTTP message headers are mapped to the outbound Message.

Payload

The raw output object is ResponseEntity<?> any of its fields (e.g., body, headers) or accessor methods (statusCode) may be referenced as part of the reply-expression. By default the outbound Message payload is the response body. Note that ResponseEntity (referenced by the expression #root) cannot be deserialized by Jackson by default, but may be rendered as a HashMap.

6.6.3. Options

The http-request processor has the following options:

6.6.4. Options

Properties grouped by prefix:

http.request
body-expression

A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)

expected-response-type

The type used to interpret the response. (Class<?>, default: <none>)

headers-expression

A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)

http-method-expression

A SpEL expression to derive the request method from the incoming message. (Expression, default: <none>)

reply-expression

A SpEL expression used to compute the final result, applied against the whole http {@link org.springframework.http.ResponseEntity}. (Expression, default: <none>)

timeout

Request timeout in milliseconds. (Long, default: 30000)

url-expression

A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>)

spring.codec
log-request-details

Whether to log form data at DEBUG level, and headers at TRACE level. (Boolean, default: false)

max-in-memory-size

Limit on the number of bytes that can be buffered whenever the input stream needs to be aggregated. This applies only to the auto-configured WebFlux server and WebClient instances. By default this is not set, in which case individual codec defaults apply. Most codecs are limited to 256K by default. (DataSize, default: <none>)

6.7. Image Recognition Processor

A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).

Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.

The input of the model is an image as binary array.

The output is a JSON message in this format:

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.

If the response-seize is set to value higher then 1, then the result will include the top response-seize probable labels. For example response-size=3 would return:

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}

6.7.1. Payload

If the incoming type is byte[] and the content type is set to application/octet-stream , then the application process the input byte[] image into and outputs augmented byte[] image payload and json header.

6.7.2. Options

image.recognition.cache-model

cache the pre-trained tensorflow model. (Boolean, default: true)

image.recognition.debug-output

<documentation missing> (Boolean, default: false)

image.recognition.debug-output-path

<documentation missing> (String, default: image-recognition-result.png)

image.recognition.model

pre-trained tensorflow image recognition model. Note that the model must match the selected model type! (String, default: https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb)

image.recognition.model-type

Supports three different pre-trained tensorflow image recognition models: Inception, MobileNetV1 and MobileNetV2 1. Inception graph uses 'input' as input and 'output' as output. 2. MobileNetV2 pre-trained models: https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - normalized image size is always square (e.g. H=W) - graph uses 'input' as input and 'MobilenetV2/Predictions/Reshape_1' as output. 3. MobileNetV1 pre-trained models: https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph uses 'input' as input and 'MobilenetV1/Predictions/Reshape_1' as output. (ModelType, default: <none>, possible values: inception,mobilenetv1,mobilenetv2)

image.recognition.normalized-image-size

Normalized image size. (Integer, default: 224)

image.recognition.response-size

number of recognized images. (Integer, default: 5)

6.8. Object Detection Processor

The Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor is built on top of the Object Detection Function.

You have to provide the Processor with a pre-trained object detection model, and the corresponding object labels.

Here are some sensible configuration defaults:

The following diagram shows a Spring Cloud Data Flow, streaming pipeline, that predicts, in real-time, the object types in input image stream.

scdf tensorflow object detection arch

Processor’s input is an image byte array, and the output is an augmented image, and a header, called detected_objects, that provides textual description of the detected objects:

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

The detected_objects header format is:

  • object-name:confidence - human readable name of the detected object (e.g. label) with its confidence as a float between [0-1]

  • x1, y1, x2, y2 - Response also provides the bounding box of the detected objects represented as (x1, y1, x2, y2). The coordinates are relative to the size of the image size.

  • cid - Classification identifier as defined in the provided labels configuration file.

6.8.1. Payload

The incoming type is byte[], and the content type is application/octet-stream. The processor processes the input byte[] image and outputs an augmented byte[] image payload and a JSON header (detected_objects).

6.8.2. Options

object.detection.cache-model

<documentation missing> (Boolean, default: true)

object.detection.confidence

<documentation missing> (Float, default: 0.4)

object.detection.debug-output

<documentation missing> (Boolean, default: false)

object.detection.debug-output-path

<documentation missing> (String, default: object-detection-result.png)

object.detection.labels

Labels URI. (String, default: https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt)

object.detection.model

pre-trained tensorflow object detection model. (String, default: https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb)

object.detection.response-size

<documentation missing> (Integer, default: <none>)

object.detection.with-masks

<documentation missing> (Boolean, default: false)

6.9. Semantic Segmentation Processor

Image Semantic Segmentation based on the state-of-art DeepLab Tensorflow model.

The Semantic Segmentation is the process of associating each pixel of an image with a class label, (such as flower, person, road, sky, ocean, or car). Unlike the Instance Segmentation, which produces instance-aware region masks, the Semantic Segmentation produces class-aware masks. For implementing Instance Segmentation consult the Object Detection Service instead.

The Semantic Segmentation Processor uses the Semantic Segmentation Function library and the TensorFlow Service.

6.9.1. Payload

The incoming type is byte[], and the content type is application/octet-stream. The processor processes the input byte[] image and outputs augmented byte[] image payload and json header.

Processor’s input is an image byte array, and the output is an augmented image byte array, and a JSON header semantic_segmentation in this format:

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ],
    ...
]

The output header json format represents the color pixel map computed from the input image.

6.9.2. Options

semantic.segmentation.color-map-uri

Every pre-trained model is based on certain object color maps. The pre-defined options are: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (String, default: classpath:/colormap/citymap_colormap.json)

semantic.segmentation.debug-output

save output image inn the local debugOutputPath path. (Boolean, default: false)

semantic.segmentation.debug-output-path

<documentation missing> (String, default: semantic-segmentation-result.png)

semantic.segmentation.mask-transparency

The alpha color of the computed segmentation mask image. (Float, default: 0.45)

semantic.segmentation.model

pre-trained tensorflow semantic segmentation model. (String, default: https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb)

semantic.segmentation.output-type

Specifies the output image type. You can return either the input image with the computed mask overlay, or the mask alone. (OutputType, default: <none>, possible values: blended,mask)

6.10. Script Processor

Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

6.10.1. Options

The script-processor processor has the following options:

script-processor.language

Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default: <none>)

script-processor.script

Text of the script. (String, default: <none>)

script-processor.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

script-processor.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

6.11. Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages. The processor uses a function that takes a Message<?> as input and then produces a List<Message<?> as output based on various properties (see below). You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.

6.11.1. Payload

  • Incoming payload - Message<?>

If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

  • Outgoing payload - List<Message<?>

6.11.2. Options

splitter.apply-sequence

Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)

splitter.charset

The charset to use when converting bytes in text-based files to String. (String, default: <none>)

splitter.delimiters

When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)

splitter.expression

A SpEL expression for splitting payloads. (String, default: <none>)

splitter.file-markers

Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)

splitter.markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

6.12. Transform Processor

Transformer processor allows you to convert the message payload structure based on a SpEL expression.

Here is an example of how you can run this application.

java -jar transform-processor-kafka-<version>.jar \
    --spel.function.expression=payload.toUpperCase()

Change kafka to rabbit if you want to run it against RabbitMQ.

6.12.1. Payload

The incoming message can contain any type of payload.

6.12.2. Options

spel.function.expression

A SpEL expression to apply. (String, default: <none>)

6.13. Twitter Trend and Trend Locations Processor

Processor that can return either trending topic or the Locations of the trending topics. The twitter.trend.trend-query-type property allow to select the query type.

6.13.1. Retrieve trending topic in a location (optionally)

For this mode set twitter.trend.trend-query-type to trend.

Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.

6.13.2. Retrieve trend Locations

For this mode set twitter.trend.trend-query-type to trendLocation.

Retrieve a full or nearby locations list of trending topics by location.

If the latitude, longitude parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.

If the latitude, longitude parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.

Response is an array of locations that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.

6.13.3. Options

Properties grouped by prefix:

twitter.trend.closest
lat

If provided with a long parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default: <none>)

lon

If provided with a lat parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default: <none>)

twitter.trend
location-id

The Yahoo! Where On Earth ID of the location to return trending information for. Global information is available by using 1 as the WOEID. (Expression, default: payload)

trend-query-type

<documentation missing> (TrendQueryType, default: <none>, possible values: trend,trendLocation)

7. Sinks

7.1. Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

7.1.1. Payload

A JSON String or byte array representing the entity (or a list of entities) to be persisted.

7.1.2. Options

The cassandra sink has the following options:

Properties grouped by prefix:

cassandra.cluster
create-keyspace

Flag to create (or not) keyspace on application startup. (Boolean, default: false)

entity-base-packages

Base packages to scan for entities annotated with Table annotations. (String[], default: [])

init-script

Resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default: <none>)

skip-ssl-validation

Flag to validate the Servers' SSL certs. (Boolean, default: false)

cassandra
consistency-level

The consistency level for write operation. (ConsistencyLevel, default: <none>)

ingest-query

Ingest Cassandra query. (String, default: <none>)

query-type

QueryType for Cassandra Sink. (Type, default: <none>, possible values: INSERT,UPDATE,DELETE,STATEMENT)

statement-expression

Expression in Cassandra query DSL style. (Expression, default: <none>)

ttl

Time-to-live option of WriteOptions. (Integer, default: 0)

spring.cassandra
compression

Compression supported by the Cassandra binary protocol. (Compression, default: none, possible values: LZ4,SNAPPY,NONE)

config

Location of the configuration file to use. (Resource, default: <none>)

contact-points

Cluster node addresses in the form 'host:port', or a simple 'host' to use the configured port. (List<String>, default: [127.0.0.1:9042])

keyspace-name

Keyspace name to use. (String, default: <none>)

local-datacenter

Datacenter that is considered "local". Contact points should be from this datacenter. (String, default: <none>)

password

Login password of the server. (String, default: <none>)

port

Port to use if a contact point does not specify one. (Integer, default: 9042)

schema-action

Schema action to take at startup. (String, default: none)

session-name

Name of the Cassandra session. (String, default: <none>)

username

Login user of the server. (String, default: <none>)

spring.cassandra.connection
connect-timeout

Timeout to use when establishing driver connections. (Duration, default: 5s)

init-query-timeout

Timeout to use for internal queries that run as part of the initialization process, just after a connection is opened. (Duration, default: 5s)

spring.cassandra.controlconnection
timeout

Timeout to use for control queries. (Duration, default: 5s)

spring.cassandra.pool
heartbeat-interval

Heartbeat interval after which a message is sent on an idle connection to make sure it's still alive. (Duration, default: 30s)

idle-timeout

Idle timeout before an idle connection is removed. (Duration, default: 5s)

spring.cassandra.request
consistency

Queries consistency level. (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

page-size

How many rows will be retrieved simultaneously in a single network round-trip. (Integer, default: 5000)

serial-consistency

Queries serial consistency level. (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

timeout

How long the driver waits for a request to complete. (Duration, default: 2s)

spring.cassandra.request.throttler
drain-interval

How often the throttler attempts to dequeue requests. Set this high enough that each attempt will process multiple entries in the queue, but not delay requests too much. (Duration, default: <none>)

max-concurrent-requests

Maximum number of requests that are allowed to execute in parallel. (Integer, default: <none>)

max-queue-size

Maximum number of requests that can be enqueued when the throttling threshold is exceeded. (Integer, default: <none>)

max-requests-per-second

Maximum allowed request rate. (Integer, default: <none>)

type

Request throttling type. (ThrottlerType, default: none, possible values: CONCURRENCY_LIMITING,RATE_LIMITING,NONE)

spring.cassandra.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. (Boolean, default: <none>)

7.2. Analytics Sink

Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.

The analytics sink can produce two metrics types:

  • Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.

  • Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.

A Meter (e.g Counter or Gauge) is uniquely identified by its name and dimensions (the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.

As a metrics is uniquely identified by its name and dimensions, you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.

Use the analytics.name or analytics.name-expression properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.

Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>, property for adding one or many tags to your metrics. the TAG_NAME used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL expression that dynamically computes the tag value from the incoming message.

The SpEL expressions use the headers and payload keywords to access message’s headers and payload values.

You can use literals (e.g. 'fixed value') to set tags with fixed values.

All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront, Prometheus and InfluxDB and you can enable each of them declaratively. You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink applications.

Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.

  • To enable the Prometheus meter registry, set the following properties.

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
  • To enable Wavefront meter registry, set the following properties.

management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
  • To enable InfluxDB meter registry, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.

Following diagram illustrates how the Analytics Sink helps to collection business insides for stock-exchange, real-time pipeline.

Analytics Architecture

7.2.1. Payload

The incoming message can contain any type of payload.

7.2.2. Options

Properties grouped by prefix:

analytics
amount-expression

A SpEL expression to compute the output metrics value (e.g. amount). It defaults to 1.0 (Expression, default: <none>)

meter-type

Micrometer meter type used to report the metrics to the backend. (MeterType, default: <none>, possible values: counter,gauge)

name

The name of the output metrics. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (String, default: <none>)

name-expression

A SpEL expression to compute the output metrics name from the input message. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (Expression, default: <none>)

analytics.tag
expression

Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate meter increment. Tag expression format is: analytics.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default: <none>)

fixed

DEPRECATED: Please use the analytics.tag.expression with literal SpEL expression. Custom, fixed Tags. Those tags have constant values, created once and then sent along with every published metrics. The convention to define a fixed Tags is: <code> analytics.tag.fixed.[tag-name]=[tag-value] </code> (Map<String, String>, default: <none>)

7.3. Elasticsearch Sink

Sink that indexes documents into Elasticsearch.

This Elasticsearch sink only supports indexing JSON documents. It consumes data from an input destination and then indexes it to Elasticsearch. The input data can be a plain json string, or a java.util.Map that represents the JSON. It also accepts the data as the Elasticsearch provided XContentBuilder. However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder. This is provided mainly for direct invocation of the consumer.

7.3.1. Options

The Elasticsearch sink has the following options:

Properties grouped by prefix:

elasticsearch.consumer
async

Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default: false)

batch-size

Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default: 1)

group-timeout

Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default: -1)

id

The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default: <none>)

index

Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default: <none>)

routing

Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default: <none>)

timeout-seconds

Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default: 0)

spring.elasticsearch
connection-timeout

Connection timeout used when communicating with Elasticsearch. (Duration, default: 1s)

password

Password for authentication with Elasticsearch. (String, default: <none>)

path-prefix

Prefix added to the path of every request sent to Elasticsearch. (String, default: <none>)

socket-keep-alive

Whether to enable socket keep alive between client and Elasticsearch. (Boolean, default: false)

socket-timeout

Socket timeout used when communicating with Elasticsearch. (Duration, default: 30s)

uris

Comma-separated list of the Elasticsearch instances to use. (List<String>, default: [http://localhost:9200])

username

Username for authentication with Elasticsearch. (String, default: <none>)

spring.elasticsearch.restclient.sniffer
delay-after-failure

Delay of a sniff execution scheduled after a failure. (Duration, default: 1m)

interval

Interval between consecutive ordinary sniff executions. (Duration, default: 5m)

spring.elasticsearch.restclient.ssl
bundle

SSL bundle name. (String, default: <none>)

7.3.2. Examples of running this sink

  1. From the folder elasticsearch-sink: ./mvnw clean package

  2. cd apps

  3. cd to the proper binder generated app (Kafka or RabbitMQ)

  4. ./mvnw clean package

  5. Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command. docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

  6. Start the middleware (Kafka or RabbitMQ) if it is not already running.

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing

  8. Send some JSON data into the middleware destination. For e.g: {"foo":"bar"}

  9. Verify that the data is indexed: curl localhost:9200/testing/_search

7.4. File Sink

The file sink app writes each message it receives to a file.

7.4.1. Payload

  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.4.2. Options

The file-sink has the following options:

file.consumer.binary

A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default: false)

file.consumer.charset

The charset to use when writing text content. (String, default: UTF-8)

file.consumer.directory

The parent directory of the target file. (File, default: <none>)

file.consumer.directory-expression

The expression to evaluate for the parent directory of the target file. (String, default: <none>)

file.consumer.mode

The FileExistsMode to use if the target file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

file.consumer.name

The name of the target file. (String, default: file-consumer)

file.consumer.name-expression

The expression to evaluate for the name of the target file. (String, default: <none>)

file.consumer.suffix

The suffix to append to file name. (String, default: <empty string>)

7.5. FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

7.5.1. Headers

  • file_name (See note above)

7.5.2. Payload

  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.5.3. Output

N/A (writes to the FTP server).

7.5.4. Options

The ftp sink has the following options:

Properties grouped by prefix:

ftp.consumer
auto-create-dir

Whether or not to create the remote directory. (Boolean, default: true)

filename-expression

A SpEL expression to generate the remote file name. (String, default: <none>)

mode

Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

temporary-remote-dir

A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

use-temporary-filename

Whether or not to write to a temporary file and rename. (Boolean, default: true)

ftp.factory
cache-sessions

Cache sessions. (Boolean, default: <none>)

client-mode

The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)

host

The host name of the server. (String, default: localhost)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 21)

username

The username to use to connect to the server. (String, default: <none>)

7.6. JDBC Sink

JDBC sink allows you to persist incoming payload into an RDBMS database.

The jdbc.consumer.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.consumer.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.6.1. Examples

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
            --jdbc.consumer.columns=name \
            --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
            --spring.datasource.url='jdbc:mysql://localhost:3306/test

7.6.2. Payload

7.6.3. Options

The jdbc sink has the following options:

Properties grouped by prefix:

jdbc.consumer
batch-size

Threshold in number of messages when data will be flushed to database table. (Integer, default: 1)

columns

The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())

idle-timeout

Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)

initialize

'true', 'false' or the location of a custom initialization script for the table. (String, default: false)

table-name

The name of the table to write into. (String, default: messages)

spring.datasource
driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

password

Login password of the database. (String, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)

7.7. Apache Kafka Sink

This module publishes messages to Apache Kafka.

7.7.1. Options

The kafka sink has the following options:

(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)

Properties grouped by prefix:

kafka.publisher
key

Kafka record key - overridden by keyExpression, if supplied. (String, default: <none>)

key-expression

A SpEL expression that evaluates to a Kafka record key. (Expression, default: <none>)

mapped-headers

Headers that will be mapped. (String[], default: [*])

partition

Kafka topic partition - overridden by partitionExpression, if supplied. (Integer, default: <none>)

partition-expression

A SpEL expression that evaluates to a Kafka topic partition. (Expression, default: <none>)

send-timeout

How long Kafka producer handler should wait for send operation results. Defaults to 10 seconds. (Duration, default: 10s)

sync

True if Kafka producer handler should operation in a sync mode. (Boolean, default: false)

timestamp

Kafka record timestamp - overridden by timestampExpression, if supplied. (Long, default: <none>)

timestamp-expression

A SpEL expression that evaluates to a Kafka record timestamp. (Expression, default: <none>)

topic

Kafka topic - overridden by topicExpression, if supplied. Defaults to KafkaTemplate.getDefaultTopic() (String, default: <none>)

topic-expression

A SpEL expression that evaluates to a Kafka topic. (Expression, default: <none>)

use-template-converter

Whether to use the template's message converter to create a Kafka record. (Boolean, default: false)

spring.kafka
bootstrap-servers

Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. (List<String>, default: <none>)

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)

properties

Additional properties, common to producers and consumers, used to configure the client. (Map<String, String>, default: <none>)

spring.kafka.producer
acks

Number of acknowledgments the producer requires the leader to have received before considering a request complete. (String, default: <none>)

batch-size

Default batch size. A small batch size will make batching less common and may reduce throughput (a batch size of zero disables batching entirely). (DataSize, default: <none>)

bootstrap-servers

Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers. (List<String>, default: <none>)

buffer-memory

Total memory size the producer can use to buffer records waiting to be sent to the server. (DataSize, default: <none>)

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)

compression-type

Compression type for all data generated by the producer. (String, default: <none>)

key-serializer

Serializer class for keys. (Class<?>, default: <none>)

properties

Additional producer-specific properties used to configure the client. (Map<String, String>, default: <none>)

retries

When greater than zero, enables retrying of failed sends. (Integer, default: <none>)

transaction-id-prefix

When non empty, enables transaction support for producer. (String, default: <none>)

value-serializer

Serializer class for values. (Class<?>, default: <none>)

spring.kafka.template
default-topic

Default topic to which messages are sent. (String, default: <none>)

transaction-id-prefix

Transaction id prefix, override the transaction id prefix in the producer factory. (String, default: <none>)

7.8. Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

7.8.1. Options

The log sink has the following options:

log.expression

A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)

log.level

The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)

log.name

The name of the logger to use. (String, default: <none>)

7.9. MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

7.9.1. Input

Payload
  • Any POJO

  • String

  • byte[]

7.9.2. Options

The mongodb sink has the following options:

Properties grouped by prefix:

mongodb.consumer
collection

The MongoDB collection to store data. (String, default: <none>)

collection-expression

The SpEL expression to evaluate MongoDB collection. (Expression, default: <none>)

spring.data.mongodb
additional-hosts

Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default: <none>)

authentication-database

Authentication database name. (String, default: <none>)

auto-index-creation

Whether to enable auto-index creation. (Boolean, default: <none>)

database

Database name. Overrides database in URI. (String, default: <none>)

field-naming-strategy

Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)

host

Mongo server host. Cannot be set with URI. (String, default: <none>)

password

Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)

port

Mongo server port. Cannot be set with URI. (Integer, default: <none>)

replica-set-name

Required replica set name for the cluster. Cannot be set with URI. (String, default: <none>)

uri

Mongo database URI. Overrides host, port, username, and password. (String, default: mongodb://localhost/test)

username

Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

uuid-representation

Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default: java-legacy, possible values: UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)

spring.data.mongodb.gridfs
bucket

GridFS bucket name. (String, default: <none>)

database

GridFS database name. (String, default: <none>)

spring.data.mongodb.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default: <none>)

7.10. MQTT Sink

This module sends messages to MQTT.

7.10.1. Payload:

  • byte[]

  • String

7.10.2. Options

The mqtt sink has the following options:

Properties grouped by prefix:

mqtt
clean-session

whether the client and server should remember state across restarts and reconnects. (Boolean, default: true)

connection-timeout

the connection timeout in seconds. (Integer, default: 30)

keep-alive-interval

the ping interval in seconds. (Integer, default: 60)

password

the password to use when connecting to the broker. (String, default: guest)

persistence

'memory' or 'file'. (String, default: memory)

persistence-directory

Persistence directory. (String, default: /tmp/paho)

ssl-properties

MQTT Client SSL properties. (Map<String, String>, default: <none>)

url

location of the mqtt broker(s) (comma-delimited list). (String[], default: [tcp://localhost:1883])

username

the username to use when connecting to the broker. (String, default: guest)

mqtt.consumer
async

whether or not to use async sends. (Boolean, default: false)

charset

the charset used to convert a String payload to byte[]. (String, default: UTF-8)

client-id

identifies the client. (String, default: stream.client.id.sink)

qos

the quality of service to use. (Integer, default: 1)

retained

whether to set the 'retained' flag. (Boolean, default: false)

topic

the topic to which the sink will publish. (String, default: stream.mqtt)

7.11. Pgcopy Sink

A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.

7.11.1. Input

Headers
Payload
  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

7.11.2. Output

N/A

7.11.3. Options

The jdbc sink has the following options:

spring.datasource.driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

spring.datasource.password

Login password of the database. (String, default: <none>)

spring.datasource.url

JDBC URL of the database. (String, default: <none>)

spring.datasource.username

Login username of the database. (String, default: <none>)

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

7.11.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

For integration tests to run, start a PostgreSQL database on localhost:

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. Examples

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

This module sends messages to RabbitMQ.

7.12.1. Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

Properties grouped by prefix:

rabbit
converter-bean-name

The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default: <none>)

exchange

Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)

exchange-expression

A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)

headers-mapped-last

When mapping headers for the outbound message, determine whether the headers are mapped before the message is converted, or afterwards. (Boolean, default: true)

mapped-request-headers

Headers that will be mapped. (String[], default: [*])

own-connection

When true, use a separate connection based on the boot properties. (Boolean, default: false)

persistent-delivery-mode

Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)

routing-key

Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)

routing-key-expression

A SpEL expression that evaluates to a routing key. (Expression, default: <none>)

spring.rabbitmq
address-shuffle-mode

Mode used to shuffle configured addresses. (AddressShuffleMode, default: none, possible values: NONE,RANDOM,INORDER)

addresses

Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default: <none>)

channel-rpc-timeout

Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default: 10m)

connection-timeout

Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)

host

RabbitMQ host. Ignored if an address is set. (String, default: localhost)

password

Login to authenticate against the broker. (String, default: guest)

port

RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default: <none>)

publisher-confirm-type

Type of publisher confirms to use. (ConfirmType, default: <none>, possible values: SIMPLE,CORRELATED,NONE)

publisher-returns

Whether to enable publisher returns. (Boolean, default: false)

requested-channel-max

Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default: 2047)

requested-heartbeat

Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

username

Login user to authenticate to the broker. (String, default: guest)

virtual-host

Virtual host to use when connecting to the broker. (String, default: <none>)

spring.rabbitmq.template
default-receive-queue

Name of the default queue to receive messages from when none is specified explicitly. (String, default: <none>)

exchange

Name of the default exchange to use for send operations. (String, default: <empty string>)

mandatory

Whether to enable mandatory messages. (Boolean, default: <none>)

receive-timeout

Timeout for receive() operations. (Duration, default: <none>)

reply-timeout

Timeout for sendAndReceive() operations. (Duration, default: <none>)

routing-key

Value of a default routing key to use for send operations. (String, default: <empty string>)

7.13. Redis Sink

Sends messages to Redis.

7.13.1. Options

The redis sink has the following options:

Properties grouped by prefix:

redis.consumer
key

A literal key name to use when storing to a key. (String, default: <none>)

key-expression

A SpEL expression to use for storing to a key. (String, default: <none>)

queue

A literal queue name to use when storing in a queue. (String, default: <none>)

queue-expression

A SpEL expression to use for queue. (String, default: <none>)

topic

A literal topic name to use when publishing to a topic. (String, default: <none>)

topic-expression

A SpEL expression to use for topic. (String, default: <none>)

spring.data.redis
client-name

Client name to be set on connections with CLIENT SETNAME. (String, default: <none>)

client-type

Type of client to use. By default, auto-detected according to the classpath. (ClientType, default: <none>, possible values: LETTUCE,JEDIS)

connect-timeout

Connection timeout. (Duration, default: <none>)

database

Database index used by the connection factory. (Integer, default: 0)

host

Redis server host. (String, default: localhost)

password

Login password of the redis server. (String, default: <none>)

port

Redis server port. (Integer, default: 6379)

timeout

Read timeout. (Duration, default: <none>)

url

Connection URL. Overrides host, port, username, and password. Example: redis://user:[email protected]:6379 (String, default: <none>)

username

Login username of the redis server. (String, default: <none>)

spring.data.redis.cluster
max-redirects

Maximum number of redirects to follow when executing commands across the cluster. (Integer, default: <none>)

nodes

Comma-separated list of "host:port" pairs to bootstrap from. This represents an "initial" list of cluster nodes and is required to have at least one entry. (List<String>, default: <none>)

spring.data.redis.jedis.pool
enabled

Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default: <none>)

max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.data.redis.lettuce.pool
enabled

Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default: <none>)

max-active

Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)

max-idle

Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)

max-wait

Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)

min-idle

Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)

time-between-eviction-runs

Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)

spring.data.redis.lettuce
shutdown-timeout

Shutdown timeout. (Duration, default: 100ms)

spring.data.redis.sentinel
master

Name of the Redis server. (String, default: <none>)

nodes

Comma-separated list of "host:port" pairs. (List<String>, default: <none>)

password

Password for authenticating with sentinel(s). (String, default: <none>)

username

Login username for authenticating with sentinel(s). (String, default: <none>)

spring.data.redis.ssl
bundle

SSL bundle name. (String, default: <none>)

enabled

Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default: <none>)

7.14. Router Sink

This application routes messages to named channels.

7.14.1. Options

The router sink has the following options:

router.default-output-binding

Where to send un-routable messages. (String, default: <none>)

router.destination-mappings

Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.expression

The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default: <none>)

router.refresh-delay

How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)

router.resolution-required

Whether channel resolution is required. (Boolean, default: false)

router.script

The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)

router.variables

Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)

router.variables-location

The location of a properties file containing custom script variable bindings. (Resource, default: <none>)

This router sink is based on a StreamBridge API from Spring Cloud Stream, therefore destinations can be created as needed. In this case a defaultOutputBinding can only be reached if key is not included into destinationMappings. The resolutionRequired = true neglects defaultOutputBinding and throws an exception if no mapping and no respective binding already declared.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputBinding, which must also appear in the list.

The destinationMappings are used to map the evaluation results to an actual destination name.

7.14.2. SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring a Generic Router section.

7.14.3. Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

7.15. RSocket Sink

RSocket sink to send data using RSocket protocols' fire and forget strategy.

7.15.1. Options

The rsocket sink has the following options:

rsocket.consumer.host

RSocket host. (String, default: localhost)

rsocket.consumer.port

RSocket port. (Integer, default: 7000)

rsocket.consumer.route

Route used for RSocket. (String, default: <none>)

rsocket.consumer.uri

URI that can be used for websocket based transport. (URI, default: <none>)

7.16. Amazon S3 Sink

This sink app supports transferring objects to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain one of the payload as:

  • File, including directories for recursive upload;

  • InputStream;

  • byte[]

7.16.1. Options

The s3 sink has the following options:

Properties grouped by prefix:

s3.consumer
acl

S3 Object access control list. (ObjectCannedACL, default: <none>, possible values: private,public-read,public-read-write,authenticated-read,aws-exec-read,bucket-owner-read,bucket-owner-full-control,null)

acl-expression

Expression to evaluate S3 Object access control list. (Expression, default: <none>)

bucket

AWS bucket for target file(s) to store. (String, default: <none>)

bucket-expression

Expression to evaluate AWS bucket name. (Expression, default: <none>)

key-expression

Expression to evaluate S3 Object key. (Expression, default: <none>)

spring.cloud.aws.credentials
access-key

The access key to be used with a static provider. (String, default: <none>)

instance-profile

Configures an instance profile credentials provider with no further configuration. (Boolean, default: false)

profile

The AWS profile. (Profile, default: <none>)

secret-key

The secret key to be used with a static provider. (String, default: <none>)

spring.cloud.aws.region
instance-profile

Configures an instance profile region provider with no further configuration. (Boolean, default: false)

profile

The AWS profile. (Profile, default: <none>)

static

<documentation missing> (String, default: <none>)

spring.cloud.aws.s3
accelerate-mode-enabled

Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations. (Boolean, default: <none>)

checksum-validation-enabled

Option to disable doing a validation of the checksum of an object stored in S3. (Boolean, default: <none>)

chunked-encoding-enabled

Option to enable using chunked encoding when signing the request payload for {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} and {@link software.amazon.awssdk.services.s3.model.UploadPartRequest}. (Boolean, default: <none>)

cross-region-enabled

Enables cross-region bucket access. (Boolean, default: <none>)

endpoint

Overrides the default endpoint. (URI, default: <none>)

path-style-access-enabled

Option to enable using path style access for accessing S3 objects instead of DNS style access. DNS style access is preferred as it will result in better load balancing when accessing S3. (Boolean, default: <none>)

region

Overrides the default region. (String, default: <none>)

use-arn-region-enabled

If an S3 resource ARN is passed in as the target of an S3 operation that has a different region to the one the client was configured with, this flag must be set to 'true' to permit the client to make a cross-region call to the region specified in the ARN otherwise an exception will be thrown. (Boolean, default: <none>)

spring.cloud.aws.s3.crt
initial-read-buffer-size-in-bytes

Configure the starting buffer size the client will use to buffer the parts downloaded from S3. Maintain a larger window to keep up a high download throughput; parts cannot download in parallel unless the window is large enough to hold multiple parts. Maintain a smaller window to limit the amount of data buffered in memory. (Long, default: <none>)

max-concurrency

Specifies the maximum number of S3 connections that should be established during transfer. (Integer, default: <none>)

minimum-part-size-in-bytes

Sets the minimum part size for transfer parts. Decreasing the minimum part size causes multipart transfer to be split into a larger number of smaller parts. Setting this value too low has a negative effect on transfer speeds, causing extra latency and network communication for each part. (Long, default: <none>)

target-throughput-in-gbps

The target throughput for transfer requests. Higher value means more S3 connections will be opened. Whether the transfer manager can achieve the configured target throughput depends on various factors such as the network bandwidth of the environment and the configured `max-concurrency`. (Double, default: <none>)

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean. See Spring Integration AWS support for more details.

7.16.2. Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • spring.cloud.aws.credentials.accessKey

  • spring.cloud.aws.credentials.secretKey

  • spring.cloud.aws.credentials.instanceProfile

  • spring.cloud.aws.credentials.profileName

  • spring.cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto

  • cloud.aws.region.static

Examples
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

7.17.1. Input

Headers
  • file_name (See note above)

Payload
  • java.io.File

  • java.io.InputStream

  • byte[]

  • String

7.17.2. Output

N/A (writes to the SFTP server).

7.17.3. Options

The sftp sink has the following options:

Properties grouped by prefix:

sftp.consumer
auto-create-dir

Whether to create the remote directory. (Boolean, default: true)

filename-expression

A SpEL expression to generate the remote file name. (String, default: <none>)

mode

Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)

remote-dir

The remote FTP directory. (String, default: /)

remote-file-separator

The remote file separator. (String, default: /)

temporary-remote-dir

A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default: /)

tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

use-temporary-filename

Whether to write to a temporary file and rename. (Boolean, default: true)

sftp.consumer.factory
allow-unknown-keys

True to allow an unknown or changed key. (Boolean, default: false)

cache-sessions

Cache sessions. (Boolean, default: <none>)

host

The host name of the server. (String, default: localhost)

known-hosts-expression

A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)

pass-phrase

Passphrase for user's private key. (String, default: <empty string>)

password

The password to use to connect to the server. (String, default: <none>)

port

The port of the server. (Integer, default: 22)

private-key

Resource location of user's private key. (Resource, default: <none>)

username

The username to use to connect to the server. (String, default: <none>)

7.18. TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

7.18.1. Options

The tcp sink has the following options:

Properties grouped by prefix:

tcp.consumer
charset

The charset used when converting from bytes to String. (String, default: UTF-8)

close

Whether to close the socket after each message. (Boolean, default: false)

encoder

The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

host

The host to which this sink will connect. (String, default: <none>)

tcp
nio

Whether or not to use NIO. (Boolean, default: false)

port

The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)

reverse-lookup

Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)

socket-timeout

The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)

use-direct-buffers

Whether or not to use direct buffers. (Boolean, default: false)

7.18.2. Available Encoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

7.19. Throughput Sink

Sink that will count messages and log the observed throughput at a selected interval.

7.19.1. Options

The throughput sink has the following options:

throughput.report-every-ms

how often to report. (Integer, default: 1000)

7.20. Twitter Message Sink

Send Direct Messages to a specified user from the authenticating user. Requires a JSON POST body and Content-Type header to be set to application/json.

When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint.

SpEL expressions are used to compute the request parameters from the input message.

7.20.1. Options

Use single quotes (') to wrap the literal values of the SpEL expression properties. For example to set a fixed message text use text='Fixed Text'. For fixed target userId use userId='666'.
twitter.message.update.media-id

A media id to associate with the message. A Direct Message may only reference a single media id. (Expression, default: <none>)

twitter.message.update.screen-name

The screen name of the user to whom send the direct message. (Expression, default: <none>)

twitter.message.update.text

The direct message text. URL encode as necessary. Max length of 10,000 characters. (Expression, default: payload)

twitter.message.update.user-id

The user id of the user to whom send the direct message. (Expression, default: <none>)

7.21. Twitter Update Sink

Updates the authenticating user’s current text (e.g Tweeting).

For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row.

While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.

7.21.1. Options

Properties grouped by prefix:

twitter.update
attachment-url

(SpEL expression) In order for a URL to not be counted in the text body of an extended Tweet, provide a URL as a Tweet attachment. This URL must be a Tweet permalink, or Direct Message deep link. Arbitrary, non-Twitter URLs must remain in the text text. URLs passed to the attachment_url parameter not matching either a Tweet permalink or Direct Message deep link will fail at Tweet creation and cause an exception. (Expression, default: <none>)

display-coordinates

(SpEL expression) Whether or not to put a pin on the exact coordinates a Tweet has been sent from. (Expression, default: <none>)

in-reply-to-status-id

(SpEL expression) The ID of an existing text that the update is in reply to. Note: This parameter will be ignored unless the author of the Tweet this parameter references is mentioned within the text text. Therefore, you must include @username, where username is the author of the referenced Tweet, within the update. When inReplyToStatusId is set the auto_populate_reply_metadata is automatically set as well. Later ensures that leading @mentions will be looked up from the original Tweet, and added to the new Tweet from there. This wil append @mentions into the metadata of an extended Tweet as a reply chain grows, until the limit on @mentions is reached. In cases where the original Tweet has been deleted, the reply will fail. (Expression, default: <none>)

media-ids

(SpEL expression) A comma-delimited list of media_ids to associate with the Tweet. You may include up to 4 photos or 1 animated GIF or 1 video in a Tweet. See Uploading Media for further details on uploading media. (Expression, default: <none>)

place-id

(SpEL expression) A place in the world. (Expression, default: <none>)

text

(SpEL expression) The text of the text update. URL encode as necessary. t.co link wrapping will affect character counts. Defaults to message's payload (Expression, default: payload)

twitter.update.location
lat

The latitude of the location this Tweet refers to. This parameter will be ignored unless it is inside the range -90.0 to +90.0 (North is positive) inclusive. It will also be ignored if there is no corresponding long parameter. (Expression, default: <none>)

lon

The longitude of the location this Tweet refers to. The valid ranges for longitude are -180.0 to +180.0 (East is positive) inclusive. This parameter will be ignored if outside that range, if it is not a number, if geo_enabled is disabled, or if there no corresponding lat parameter. (Expression, default: <none>)

7.22. Wavefront Sink

The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.

Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.

7.22.1. Options

The Wavefront sink has the following options:

wavefront.api-token

Wavefront API access token. (String, default: <none>)

wavefront.metric-expression

A SpEL expression that evaluates to a metric value. (Expression, default: <none>)

wavefront.metric-name

The name of the metric.Defaults to the application name. (String, default: <none>)

wavefront.proxy-uri

The URL of the Wavefront proxy. (String, default: <none>)

wavefront.source

Unique application, host, container, or instance that emits metrics. (String, default: <none>)

wavefront.tag-expression

Collection of custom metadata associated with the metric.Point tags cannot be empty. Valid characters for keys: alphanumeric, hyphen ('-'), underscore ('_'), dot ('.'). For values any character is allowed, including spaces. To include a double quote, escape it with a backslash, A backslash cannot be the last character in the tag value. Maximum allowed length for a combination of a point tag key and value is 254 characters (255 including the '=' separating key and value). If the value is longer, the point is rejected and logged (Map<String, Expression>, default: <none>)

wavefront.timestamp-expression

A SpEL expression that evaluates to a timestamp of the metric (optional). (Expression, default: <none>)

wavefront.uri

The URL of the Wavefront environment. (String, default: <none>)

7.23. Websocket Sink

A simple Websocket Sink implementation.

7.23.1. Options

The following options are supported:

websocket.consumer.log-level

the logLevel for netty channels. Default is <tt>WARN</tt> (String, default: <none>)

websocket.consumer.path

the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default: /websocket)

websocket.consumer.port

the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default: 9292)

websocket.consumer.ssl

whether or not to create a {@link io.netty.handler.ssl.SslContext}. (Boolean, default: false)

websocket.consumer.threads

the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default: 1)

7.23.2. Examples

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Rabbitmq
Step 2: Deploy a time-source
Step 3: Deploy the websocket-sink

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.23.3. Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketconsumertrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketconsumertrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser.

7.24. XMPP Sink

The "xmpp" sink enables sending messages to a XMPP server.

7.24.1. Input

  • byte[]

7.24.2. Output

Payload

N/A

7.24.3. Options

The zeromq sink has the following options:

Properties grouped by prefix:

xmpp.consumer
chat-to

XMPP handle to send message to. (String, default: <none>)

xmpp.factory
host

XMPP Host server to connect to. (String, default: <none>)

password

The Password for the connected user. (String, default: <none>)

port

Port for connecting to the host. - Default Client Port: 5222 (Integer, default: 5222)

resource

The Resource to bind to on the XMPP Host. - Can be empty, server will generate one if not set (String, default: <none>)

security-mode

<documentation missing> (SecurityMode, default: <none>, possible values: required,ifpossible,disabled)

service-name

The Service Name to set for the XMPP Domain. (String, default: <none>)

subscription-mode

<documentation missing> (SubscriptionMode, default: <none>, possible values: accept_all,reject_all,manual)

user

The User the connection should connect as. (String, default: <none>)

7.24.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

7.25. ZeroMQ Sink

The "zeromq" sink enables sending messages to a ZeroMQ socket.

7.25.1. Input

  • byte[]

7.25.2. Output

Payload

N/A

7.25.3. Options

The zeromq sink has the following options:

zeromq.consumer.connect-url

Connection URL for connecting to the ZeroMQ Socket. (String, default: <none>)

zeromq.consumer.socket-type

The Socket Type the connection should establish. (SocketType, default: <none>, possible values: PAIR,PUB,SUB,REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM,CLIENT,SERVER,RADIO,DISH,CHANNEL,PEER,RAW,SCATTER,GATHER)

zeromq.consumer.topic

A Topic SpEL expression to evaluate a topic before sending messages to subscribers. (Expression, default: <none>)

7.25.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

7.25.5. Examples

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=

Appendices

Appendix A: Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

A.1. Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

A.2. Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).