Reference Guide
This section provides you with a detailed overview of the out-of-the-box Spring Cloud Stream Applications. It assumes familiarity with general Spring Cloud Stream concepts, which you can find in the Spring Cloud Stream reference documentation.
These Spring Cloud Stream Applications provide you with out-of-the-box Spring Cloud Stream utility applications that you can run independently or with Spring Cloud Data Flow. They include:
-
Connectors (sources, processors, and sinks) for a variety of middleware technologies, including message brokers, storage (relational, non-relational, filesystem).
-
Adapters for various network protocols.
-
Generic processors that you can customize with Spring Expression Language (SpEL) or by scripting.
You can find a detailed listing of all the applications and their options in the following sections of this guide.
Most of these applications are based on core elements that are exposed as a java.util.function
component.
You can learn more about these foundational components and how they are all connected to the applications by reading this README.
1. Pre-built Applications
Out-of-the-box applications are Spring Boot applications that include a binder implementation on top of the basic logic of the app (a function for example) — a fully functional uber-jar. These uber-jars include the minimal code required for standalone execution. For each function application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.
Prebuilt applications are generated according to the stream apps generator Maven plugin. |
2. Classification
Based on their target application type, they can be either:
-
A source that connects to an external resource to poll and receive data that is published to the default “output” channel;
-
A processor that receives data from an “input” channel and processes it, sending the result on the default “output” channel;
-
A sink that connects to an external resource to send the received data to the default “input” channel.
The prebuilt applications follow a naming convention: <functionality>-<type>-<binder>
. For example, rabbit-sink-kafka
is a Rabbit sink that uses the Kafka binder that is running with Kafka as the middleware.
2.1. Maven Access
The core functionality of the applications is available as functions.
See the Java Functions section in the stream-applications
repository for more details.
Prebuilt applications are available as Maven artifacts.
You can download the executable jar artifacts from the Spring Maven repositories.
The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/.
From there, you can navigate to the latest released version of a specific app.
If you want to use functions directly in a custom application, those artifacts are available under the directory structure org/springframework/cloud/fn
.
You need to use the Release, Milestone and Snapshot repository locations for Release, Milestone and Snapshot executable jar artifacts respectively.
2.2. Docker Access
The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/
.
Naming and versioning follows the same general conventions as Maven — for example:
docker pull springcloudstream/cassandra-sink-kafka
The preceding command pulls the latest Docker image of the Cassandra sink with the Kafka binder.
2.3. Build
You can build everything from the root of the repository.
./mvnw clean install
This is a long build and you may want to skip tests:
./mvnw clean install -DskipTests
However, this may not be what you are interested in doing since you are probably interested in a single application or a few of them. In order to build the functions and applications that you are interested in, you need to build them selectively as shown below.
2.4. Building the root parent
First, we need to build the parent used in various components.
./mvnw clean install -f stream-applications-build
2.4.1. Building functions
./mvnw clean install -f functions -DskipTests
You can also build a single function or group of functions. For e.g if you are only interested in jdbc-supplier and log-consumer, do the following.
./mvnw clean install -pl :jdbc-suppler,:log-consumer
2.4.2. Building core for Stream Applications
./mvnw clean install -f applications/stream-applications-core -DskipTests
2.5. Building the applications
Let’s assume that you want to build JDBC Source application based on Kafka Binder in Spring Cloud Stream and Log Sink application based on Rabbit binder. Here is what you need to do. Assuming you built both functions and stream-applications-core as above.
./mvnw clean package -pl :jdbc-source
cd applications/source/jdbc-source/apps/jdbc-source-kafka
./mvnw clean package
This will generate the Kafka binder based uber jar in the target folder.
Similarly for the log sink, do the following.
./mvnw clean package -pl :log-sink
cd applications/sink/log-sink/apps/log-sink-rabbit
./mvnw clean package
2.5.1. Building a Docker image
The apps use the Jib Maven Plugin to build and publish the Docker image. If you have made some changes to an app, you may want to build the image and test it locally.
If you plan to use the image with minikube, run the following command before building the image: |
eval $(minikube docker-env)
To build the image in your local registry:
./mvnw clean package jib:dockerBuild
To publish the image to a remote registry:
./mvnw jib:build \
-Djib.to.image=myregistry/myimage:latest \
-Djib.to.auth.username=$USERNAME \
-Djib.to.auth.password=$PASSWORD
3. Patching Pre-built Applications
3.1. Adding new dependencies
If you are looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference.
To add mysql
driver to jdbc-sink
application:
-
Clone the GitHub repository at github.com/spring-cloud/stream-applications
-
Find the module that you want to patch and add the additional dependencies,
jdbc-sink
in this case. For example, you can add the following mysql dependency to the application generator plugin’s configuration in the pom.xml:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
This is how the complete plugin configuration should look like.
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>jdbc</name>
<type>sink</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.consumer.jdbc.JdbcConsumerConfiguration.class</configClass>
</generatedApp>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>jdbc-consumer</artifactId>
<version>${java-functions.version}</version>
</dependency>
</dependencies>
</configuration>
</plugin>
Once the above changes are done, you can generate the binder based apps as below from the root of the repository.
./mvnw clean install -pl :jdbc-sink
This generates the binder based applications in the apps
folder under jdbc-sink
folder.
In order to build the app with the binder flavor that you are interested in, you need to do the following step.
cd applications/sink/jdbc-sink
cd apps/jdbc-sink-kafka # (or Rabbit if you are interested in that)
./mvnw clean package
cd target
There you will find the binder based uber jar with your changes.
3.2. Update existing dependencies or add new resources in the application
Modifying the plugin as above work when there are new dependencies to add to the application. However, when we need to update any existing dependencies, it is easier to make the maven changes in the generated application itself. If we have to update the binder dependencies from a new release of Spring Cloud Stream for example, then those versions need to be updated in the generated application.
Here are the steps (again, we are using jdbc-sink-kafka
as an example).
./mvnw clean install -pl :jdbc-sink
cd applications/sink/jdbc-sink/apps/jdbc-sink-kafka
Open the generated application’s pom.xml
and update the dependencies.
If there is a new version of Spring Cloud Stream update available that contains the enhancements we are looking for, then it is easier to update the BOM itself.
Find where the bom is declared in pom.xml
and update the version.
For example, if we have to update Spring Cloud Stream to 3.2.4-SNAPSHOT
, this version must be specified in the BOM declaration as below:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>4.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
We can also update any individual dependencies directly, but it is preferred to use the above dependencyManagement
approach if there is a BOM available.
This is because, when using a BOM, maven will properly use and align any transitive dependencies.
If you have to modify the application further, this method of modifying the generated application is again the recommended approach.
For instance, if you want to add security certificate files such as a key store, or a trust store to the application’s classpath, then generate the application first and add those resources to the classpath.
Make sure you are in the generated jdbc-sink-kafka
folder, then do the following:
First, add the resources to the classpath by placing them under src/main/resources
.
Then rebuild the application.
./mvnw clean package
cd target
Here you can find the modified application jar file.
4. Generating out of the box applications for other binders
By default, we only provide out of the box applications for Apache Kafka and RabbitMQ binders. There are other binder implementations exist, for which we can generate these same out of the box applications. For example, if one wants to generate these applications for the Kinesis binder, or the Solace binder etc. it is possible to do so by following the instructions below.
As a first step, clone the stream applications repository.
cd applications/stream-applications-core
We need to edit the pom.xml in this module. Find the following configuration where it defines the Kafka and RabbitMQ binders for the maven plugin.
<kafka>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
</maven>
</kafka>
<rabbit>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</maven>
</rabbit>
Add the binder for which you want to generate new apps for. For example, if we want to generate applications for the Kinesis binder, then modify as below.
<binders>
<kafka>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
</maven>
</kafka>
<rabbit>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</maven>
</rabbit>
<kinesis>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
</dependencies>
</maven>
</kinesis>
</binders>
Note that, we need to use the Kinesis binder version here explicitly, while both Kafka and RabbitMQ do not need them. This is because, those versions come from a dependency management while the Kinesis binder is not available through such mechanisms. Therefore, we need to explicitly use the binder version. If we have a BOM available that defines the version, then that can be used instead, just ensure that is declared in the proper BOM section of the maven plugin.
If the binder for which you are generating the applications relies on a different version of Spring Cloud Stream, make sure it is updated in the maven properties.
Now, we can build: ./mvnw clean install -DskipTests
.
If we go to the applications folder and look at the generated applications, we should see the new binder variants there.
For instance, if we follow the configuration above for adding the Kinesis binder, then we should see the Kinesis binder based app in the generated apps.
Let’s take time-source
as an example.
cd applications/source/time-souce/apps
Here, we should see three different binder based apps projects - time-source-kafka
, time-source-rabbit
and time-source-kineses
.
Similarly, this should happen for all the out of the box application projects.
Keep in mind that, these generated applications further need to be built individually. For that, go to the generated applications folder and then initiate a maven build.
Applications
5. Sources
5.1. Debezium Source
Debezium Engine based Change Data Capture (CDC) source.
The Debezium Source
allows capturing database change events and streaming those over different message binders such Apache Kafka
, RabbitMQ
and all Spring Cloud Stream supporter brokers.
This source can be used with any Spring Cloud Stream message binder. It is not restricted nor depended on the Kafka Connect framework. Though this approach is flexible it comes with certain limitations. |
All Debezium configuration properties are supported.
Just precede any Debezium properties with the debezium.properties.
prefix.
For example to set the Debezium’s connector.class
property use the debezium.properties.connector.class
source property instead.
5.1.1. Database Support
The Debezium Source
currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.
5.1.2. Options
Properties grouped by prefix:
debezium
- debezium-native-configuration
-
<documentation missing> (Properties, default:
<none>
) - header-format
-
{@link ChangeEvent} header format. Defaults to 'JSON'. (DebeziumFormat, default:
<none>
, possible values:JSON
,AVRO
,PROTOBUF
) - offset-commit-policy
-
The policy that defines when the offsets should be committed to offset storage. (DebeziumOffsetCommitPolicy, default:
<none>
, possible values:ALWAYS
,PERIODIC
,DEFAULT
) - payload-format
-
{@link ChangeEvent} Key and Payload formats. Defaults to 'JSON'. (DebeziumFormat, default:
<none>
, possible values:JSON
,AVRO
,PROTOBUF
) - properties
-
Spring pass-trough wrapper for debezium configuration properties. All properties with a 'debezium.properties.*' prefix are native Debezium properties. (Map<String, String>, default:
<none>
)
debezium.supplier
- copy-headers
-
Copy Change Event headers into Message headers. (Boolean, default:
true
)
Event flattening configuration
Debezium provides a comprehensive message format, that accurately details information about changes that happen in the system.
Sometime this format, though, might not be suitable for the downstream consumers, that might require messages that are formatted so that field names and values are presented in a simplified, flattened
structure.
To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening message transformation. Using the flattering configuration you can configure simple messages format like this:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium Offset Storage
When a Debezium source runs, it reads information from the source and periodically records offsets
that define how much of that information it has processed.
Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading.
Out of the box, the following offset storage configuration options are provided:
-
In-Memory
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
Local Filesystem
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)
1 Path to file where offsets are to be stored. Required when offset.storage`
is set to theFileOffsetBackingStore
.2 Interval at which to try committing offsets. The default is 1 minute. -
Kafka topic
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)
1 The name of the Kafka topic where offsets are to be stored. Required when offset.storage
is set to theKafkaOffsetBackingStore
.2 The number of partitions used when creating the offset storage topic. 3 Replication factor used when creating the offset storage topic. 4 Interval at which to try committing offsets. The default is 1 minute.
One can implement the org.apache.kafka.connect.storage.OffsetBackingStore
interface in to provide a offset storage bound to a custom backend key-value store.
Connectors properties
The table below lists all available Debezium properties for each connecter.
Those properties can be used by prefixing them by the debezium.properties.
prefix.
5.1.3. Examples and Testing
The debezium integration tests use databases fixtures, running on the local machine. Pre-build debezium docker database images with the help of Testcontainers are leveraged.
To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.
MySQL
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:
|
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
Use following properties to connect the Debezium Source to MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 | Configures the Debezium Source to use MySqlConnector. |
2 | Metadata used to identify and dispatch the incoming events. |
3 | Connection to the MySQL server running on localhost:3306 as debezium user. |
4 | Includes the Change Event Value schema in the ChangeEvent message. |
5 | Enables the Change Event Flattening. |
6 | Source state to preserver between multiple starts. |
You can run also the DebeziumDatabasesIntegrationTest#mysql()
using this mysql configuration.
Disable the mysql GenericContainer test initialization code. |
PostgreSQL
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the Debezium Source to PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | Configures Debezium Source to use PostgresConnector. |
2 | Configures the Debezium engine to use memory stores. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the PostgreSQL server running on localhost:5432 as postgres user. |
5 | Includes the Change Event Value schema in the message. |
6 | Enables the Chage Event Flattening. |
You can run also the DebeziumDatabasesIntegrationTest#postgres()
using this postgres configuration.
Disable the postgres GenericContainer test initialization code. |
MongoDB
Start a pre-configured mongodb from the debezium/example-mongodb:2.3.3.Final
container image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the Debezium Source to MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | Configures Debezium Source to use MongoDB Connector. |
2 | Configures the Debezium engine to use memory . |
3 | Connection to the MongoDB running on localhost:27017 as debezium user. |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | Includes the Change Event Value schema in the SourceRecord events. |
6 | Enables the Chnage Event Flattening. |
You can run also the DebeziumDatabasesIntegrationTest#mongodb()
using this mongodb configuration.
SQL Server
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium SqlServer tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the Debezium Source to SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 | Configures Debezium Source to use SqlServerConnector. |
2 | Configures the Debezium engine to use memory state stores. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the SQL Server running on localhost:1433 as sa user. |
You can run also the DebeziumDatabasesIntegrationTest#sqlServer()
using this SqlServer configuration.
Oracle
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. File Source
This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:
-
ref Provides a java.io.File reference
-
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --file.supplier.mode=lines
, you can also provide the additional option --file.supplier.withMarkers=true
.
If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers defaults to false if not explicitly set.
5.2.1. Options
The file source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
file.supplier
- delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - directory
-
The directory to poll for new files. (File, default:
<none>
) - filename-pattern
-
A simple ant pattern to match files. (String, default:
<none>
) - filename-regex
-
A regex pattern to match files. (Pattern, default:
<none>
) - prevent-duplicates
-
Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default:
true
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store.redis
- key
-
Redis key for metadata. (String, default:
<none>
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
5.3. FTP Source
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.3.1. Input
N/A (Fetches files from an FTP server).
5.3.2. Output
mode = contents
Headers:
-
Content-Type: application/octet-stream
-
file_originalFile: <java.io.File>
-
file_name: <file name>
Payload:
A byte[]
filled with the file contents.
mode = lines
Headers:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
mode = ref
Headers:
None.
Payload:
A java.io.File
object.
5.3.3. Options
The ftp source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
ftp.factory
- cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - client-mode
-
The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values:ACTIVE
,PASSIVE
) - host
-
The host name of the server. (String, default:
localhost
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
21
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
ftp.supplier
- auto-create-local-dir
-
Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - delete-remote-files
-
Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - filename-pattern
-
A filter pattern to match the names of files to transfer. (String, default:
<none>
) - filename-regex
-
A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - local-dir
-
The local directory to use for file transfers. (File, default:
<none>
) - preserve-timestamp
-
Set to true to preserve the original timestamp. (Boolean, default:
true
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store.redis
- key
-
Redis key for metadata. (String, default:
<none>
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
5.3.4. Examples
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. Http Source
A source application that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
5.4.1. Payload:
If content type matches text/*
or application/json
-
String
If content type does not match text/*
or application/json
-
byte array
5.4.2. Options
The http source supports the following configuration properties:
Properties grouped by prefix:
http.cors
- allow-credentials
-
Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default:
<none>
) - allowed-headers
-
List of request headers that can be used during the actual request. (String[], default:
<none>
) - allowed-origins
-
List of allowed origins, e.g. https://domain1.com. (String[], default:
<none>
)
http
- mapped-request-headers
-
Headers that will be mapped. (String[], default:
<none>
) - path-pattern
-
HTTP endpoint path mapping. (String, default:
/
)
server
- port
-
Server HTTP port. (Integer, default:
8080
)
5.5. JDBC Source
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the Spring Boot JDBC Support for more information.
5.5.1. Payload
-
Map<String, Object>
whenjdbc.split == true
(default) andList<Map<String, Object>>
otherwise
5.5.2. Options
The jdbc source has the following options:
Properties grouped by prefix:
jdbc.supplier
- max-rows
-
Max numbers of rows to process for query. (Integer, default:
0
) - query
-
The query to use to select data. (String, default:
<none>
) - split
-
Whether to split the SQL result as individual messages. (Boolean, default:
true
) - update
-
An SQL update statement to execute for marking polled messages as 'seen'. (String, default:
<none>
)
spring.datasource
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
5.6. JMS Source
The JMS source enables receiving messages from JMS.
5.6.1. Options
The JMS source has the following options:
Properties grouped by prefix:
jms.supplier
- client-id
-
Client id for durable subscriptions. (String, default:
<none>
) - destination
-
The destination from which to receive messages (queue or topic). (String, default:
<none>
) - message-selector
-
A selector for messages. (String, default:
<none>
) - session-transacted
-
True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default:
true
) - subscription-durable
-
True for a durable subscription. (Boolean, default:
<none>
) - subscription-name
-
The name of a durable or shared subscription. (String, default:
<none>
) - subscription-shared
-
True for a shared subscription. (Boolean, default:
<none>
)
spring.jms
- jndi-name
-
Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default:
<none>
) - pub-sub-domain
-
Whether the default destination type is topic. (Boolean, default:
false
)
spring.jms.listener
- acknowledge-mode
-
Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default:
<none>
, possible values:AUTO
,CLIENT
,DUPS_OK
) - auto-startup
-
Start the container automatically on startup. (Boolean, default:
true
) - concurrency
-
Minimum number of concurrent consumers. When max-concurrency is not specified the minimum will also be used as the maximum. (Integer, default:
<none>
) - max-concurrency
-
Maximum number of concurrent consumers. (Integer, default:
<none>
) - receive-timeout
-
Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no timeout at all. The latter is only feasible if not running within a transaction manager and is generally discouraged since it prevents clean shutdown. (Duration, default:
1s
)
5.7. Apache Kafka Source
This module consumes messages from Apache Kafka.
5.7.1. Options
The kafka source has the following options:
(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)
Properties grouped by prefix:
kafka.supplier
- ack-discarded
-
Whether to acknowledge discarded records after 'RecordFilterStrategy'. (Boolean, default:
false
) - record-filter
-
SpEL expression for 'RecordFilterStrategy' with a 'ConsumerRecord' as a root object. (Expression, default:
<none>
) - topic-pattern
-
Apache Kafka topics pattern to subscribe. (Pattern, default:
<none>
) - topics
-
Apache Kafka topics to subscribe. (String[], default:
<none>
)
spring.kafka
- bootstrap-servers
-
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. (List<String>, default:
<none>
) - client-id
-
ID to pass to the server when making requests. Used for server-side logging. (String, default:
<none>
) - properties
-
Additional properties, common to producers and consumers, used to configure the client. (Map<String, String>, default:
<none>
)
spring.kafka.consumer
- auto-commit-interval
-
Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. (Duration, default:
<none>
) - auto-offset-reset
-
What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. (String, default:
<none>
) - bootstrap-servers
-
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. (List<String>, default:
<none>
) - client-id
-
ID to pass to the server when making requests. Used for server-side logging. (String, default:
<none>
) - enable-auto-commit
-
Whether the consumer's offset is periodically committed in the background. (Boolean, default:
<none>
) - fetch-max-wait
-
Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size". (Duration, default:
<none>
) - fetch-min-size
-
Minimum amount of data the server should return for a fetch request. (DataSize, default:
<none>
) - group-id
-
Unique string that identifies the consumer group to which this consumer belongs. (String, default:
<none>
) - heartbeat-interval
-
Expected time between heartbeats to the consumer coordinator. (Duration, default:
<none>
) - isolation-level
-
Isolation level for reading messages that have been written transactionally. (IsolationLevel, default:
read-uncommitted
, possible values:READ_UNCOMMITTED
,READ_COMMITTED
) - key-deserializer
-
Deserializer class for keys. (Class<?>, default:
<none>
) - max-poll-records
-
Maximum number of records returned in a single call to poll(). (Integer, default:
<none>
) - properties
-
Additional consumer-specific properties used to configure the client. (Map<String, String>, default:
<none>
) - value-deserializer
-
Deserializer class for values. (Class<?>, default:
<none>
)
spring.kafka.listener
- ack-count
-
Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". (Integer, default:
<none>
) - ack-mode
-
Listener AckMode. See the spring-kafka documentation. (AckMode, default:
<none>
, possible values:RECORD
,BATCH
,TIME
,COUNT
,COUNT_TIME
,MANUAL
,MANUAL_IMMEDIATE
) - ack-time
-
Time between offset commits when ackMode is "TIME" or "COUNT_TIME". (Duration, default:
<none>
) - async-acks
-
Support for asynchronous record acknowledgements. Only applies when spring.kafka.listener.ack-mode is manual or manual-immediate. (Boolean, default:
<none>
) - auto-startup
-
Whether to auto start the container. (Boolean, default:
true
) - client-id
-
Prefix for the listener's consumer client.id property. (String, default:
<none>
) - concurrency
-
Number of threads to run in the listener containers. (Integer, default:
<none>
) - idle-between-polls
-
Sleep interval between Consumer.poll(Duration) calls. (Duration, default:
0
) - idle-event-interval
-
Time between publishing idle consumer events (no data received). (Duration, default:
<none>
) - idle-partition-event-interval
-
Time between publishing idle partition consumer events (no data received for partition). (Duration, default:
<none>
) - immediate-stop
-
Whether the container stops after the current record is processed or after all the records from the previous poll are processed. (Boolean, default:
false
) - log-container-config
-
Whether to log the container configuration during initialization (INFO level). (Boolean, default:
<none>
) - missing-topics-fatal
-
Whether the container should fail to start if at least one of the configured topics are not present on the broker. (Boolean, default:
false
) - monitor-interval
-
Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. (Duration, default:
<none>
) - no-poll-threshold
-
Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. (Float, default:
<none>
) - poll-timeout
-
Timeout to use when polling the consumer. (Duration, default:
<none>
) - type
-
Listener type. (Type, default:
single
)
5.8. Load Generator Source
A source that sends generated data and dispatches it to the stream.
5.8.1. Options
The load-generator source has the following options:
- load-generator.generate-timestamp
-
Whether timestamp generated. (Boolean, default:
false
) - load-generator.message-count
-
Message count. (Integer, default:
1000
) - load-generator.message-size
-
Message size. (Integer, default:
1000
) - load-generator.producers
-
Number of producers. (Integer, default:
1
)
5.9. Mail Source
A source application that listens for Emails and emits the message body as a message payload.
5.9.1. Options
The mail source has the following options:
- mail.supplier.charset
-
The charset for byte[] mail-to-string transformation. (String, default:
UTF-8
) - mail.supplier.delete
-
Set to true to delete email after download. (Boolean, default:
false
) - mail.supplier.expression
-
Configure a SpEL expression to select messages. (String, default:
true
) - mail.supplier.idle-imap
-
Set to true to use IdleImap Configuration. (Boolean, default:
false
) - mail.supplier.java-mail-properties
-
JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - mail.supplier.mark-as-read
-
Set to true to mark email as read. (Boolean, default:
false
) - mail.supplier.url
-
Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default:
<none>
) - mail.supplier.user-flag
-
The flag to mark messages when the server does not support \Recent. (String, default:
<none>
)
5.10. MongoDB Source
This source polls data from MongoDB.
This source is fully based on the MongoDataAutoConfiguration
, so refer to the
Spring Boot MongoDB Support
for more information.
5.10.1. Options
The mongodb source has the following options:
Properties grouped by prefix:
mongodb.supplier
- collection
-
The MongoDB collection to query. (String, default:
<none>
) - query
-
The MongoDB query. (String, default:
{ }
) - query-expression
-
The SpEL expression in MongoDB query DSL style. (Expression, default:
<none>
) - split
-
Whether to split the query result as individual messages. (Boolean, default:
true
) - update-expression
-
The SpEL expression in MongoDB update DSL style. (Expression, default:
<none>
)
spring.data.mongodb
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. Overrides database in URI. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, and password. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
spring.data.mongodb.gridfs
- bucket
-
GridFS bucket name. (String, default:
<none>
) - database
-
GridFS database name. (String, default:
<none>
)
spring.data.mongodb.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
Also see the Spring Boot Documentation for additional MongoProperties
properties.
See and TriggerProperties
for polling options.
5.11. MQTT Source
Source that enables receiving messages from MQTT.
5.11.1. Payload:
-
String
if binary setting isfalse
(default) -
byte[]
if binary setting istrue
5.11.2. Options
The mqtt source has the following options:
Properties grouped by prefix:
mqtt
- clean-session
-
whether the client and server should remember state across restarts and reconnects. (Boolean, default:
true
) - connection-timeout
-
the connection timeout in seconds. (Integer, default:
30
) - keep-alive-interval
-
the ping interval in seconds. (Integer, default:
60
) - password
-
the password to use when connecting to the broker. (String, default:
guest
) - persistence
-
'memory' or 'file'. (String, default:
memory
) - persistence-directory
-
Persistence directory. (String, default:
/tmp/paho
) - ssl-properties
-
MQTT Client SSL properties. (Map<String, String>, default:
<none>
) - url
-
location of the mqtt broker(s) (comma-delimited list). (String[], default:
[tcp://localhost:1883]
) - username
-
the username to use when connecting to the broker. (String, default:
guest
)
mqtt.supplier
- binary
-
true to leave the payload as bytes. (Boolean, default:
false
) - charset
-
the charset used to convert bytes to String (when binary is false). (String, default:
UTF-8
) - client-id
-
identifies the client. (String, default:
stream.client.id.source
) - qos
-
the qos; a single value for all topics or a comma-delimited list to match the topics. (Integer[], default:
[0]
) - topics
-
the topic(s) (comma-delimited) to which the source will subscribe. (String[], default:
[stream.mqtt]
)
5.12. RabbitMQ Source
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.
5.12.1. Input
N/A
5.12.2. Output
Payload
-
byte[]
5.12.3. Options
The rabbit source has the following options:
Properties grouped by prefix:
rabbit.supplier
- enable-retry
-
true to enable retry. (Boolean, default:
false
) - initial-retry-interval
-
Initial retry interval when retry is enabled. (Integer, default:
1000
) - mapped-request-headers
-
Headers that will be mapped. (String[], default:
[STANDARD_REQUEST_HEADERS]
) - max-attempts
-
The maximum delivery attempts when retry is enabled. (Integer, default:
3
) - max-retry-interval
-
Max retry interval when retry is enabled. (Integer, default:
30000
) - own-connection
-
When true, use a separate connection based on the boot properties. (Boolean, default:
false
) - queues
-
The queues to which the source will listen for messages. (String[], default:
<none>
) - requeue
-
Whether rejected messages should be requeued. (Boolean, default:
true
) - retry-multiplier
-
Retry backoff multiplier when retry is enabled. (Double, default:
2
) - transacted
-
Whether the channel is transacted. (Boolean, default:
false
)
spring.rabbitmq
- address-shuffle-mode
-
Mode used to shuffle configured addresses. (AddressShuffleMode, default:
none
, possible values:NONE
,RANDOM
,INORDER
) - addresses
-
Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default:
<none>
) - channel-rpc-timeout
-
Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default:
10m
) - connection-timeout
-
Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - host
-
RabbitMQ host. Ignored if an address is set. (String, default:
localhost
) - password
-
Login to authenticate against the broker. (String, default:
guest
) - port
-
RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default:
<none>
) - publisher-confirm-type
-
Type of publisher confirms to use. (ConfirmType, default:
<none>
, possible values:SIMPLE
,CORRELATED
,NONE
) - publisher-returns
-
Whether to enable publisher returns. (Boolean, default:
false
) - requested-channel-max
-
Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default:
2047
) - requested-heartbeat
-
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default:
<none>
) - username
-
Login user to authenticate to the broker. (String, default:
guest
) - virtual-host
-
Virtual host to use when connecting to the broker. (String, default:
<none>
)
spring.rabbitmq.listener.simple
- acknowledge-mode
-
Acknowledge mode of container. (AcknowledgeMode, default:
<none>
, possible values:NONE
,MANUAL
,AUTO
) - auto-startup
-
Whether to start the container automatically on startup. (Boolean, default:
true
) - batch-size
-
Batch size, expressed as the number of physical messages, to be used by the container. (Integer, default:
<none>
) - concurrency
-
Minimum number of listener invoker threads. (Integer, default:
<none>
) - consumer-batch-enabled
-
Whether the container creates a batch of messages based on the 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to include the contents of a producer created batch in the batch as discrete records. (Boolean, default:
false
) - de-batching-enabled
-
Whether the container should present batched messages as discrete messages or call the listener with the batch. (Boolean, default:
true
) - default-requeue-rejected
-
Whether rejected deliveries are re-queued by default. (Boolean, default:
<none>
) - idle-event-interval
-
How often idle container events should be published. (Duration, default:
<none>
) - max-concurrency
-
Maximum number of listener invoker threads. (Integer, default:
<none>
) - missing-queues-fatal
-
Whether to fail if the queues declared by the container are not available on the broker and/or whether to stop the container if one or more queues are deleted at runtime. (Boolean, default:
true
) - prefetch
-
Maximum number of unacknowledged messages that can be outstanding at each consumer. (Integer, default:
<none>
)
spring.rabbitmq.listener
- type
-
Listener container type. (ContainerType, default:
simple
, possible values:SIMPLE
,DIRECT
,STREAM
)
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered).
|
5.12.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.12.5. Examples
java -jar rabbit-source.jar --rabbit.queues=
5.13. Amazon S3 Source
This source app supports transfer of files using the Amazon S3 protocol.
Files are transferred from the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.13.1. mode = lines
Headers:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
5.13.2. mode = ref
Headers:
None.
Payload:
A java.io.File
object.
5.13.3. Options
The s3-source
has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store.redis
- key
-
Redis key for metadata. (String, default:
<none>
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
s3.supplier
- auto-create-local-dir
-
Create or not the local directory. (Boolean, default:
true
) - delete-remote-files
-
Delete or not remote files after processing. (Boolean, default:
false
) - filename-pattern
-
The pattern to filter remote files. (String, default:
<none>
) - filename-regex
-
The regexp to filter remote files. (Pattern, default:
<none>
) - list-only
-
Set to true to return s3 object metadata without copying file to a local directory. (Boolean, default:
false
) - local-dir
-
The local directory to store files. (File, default:
<none>
) - preserve-timestamp
-
To transfer or not the timestamp of the remote file to the local one. (Boolean, default:
true
) - remote-dir
-
AWS S3 bucket resource. (String, default:
bucket
) - remote-file-separator
-
Remote File separator. (String, default:
/
) - tmp-file-suffix
-
Temporary file suffix. (String, default:
.tmp
)
spring.cloud.aws.credentials
- access-key
-
The access key to be used with a static provider. (String, default:
<none>
) - instance-profile
-
Configures an instance profile credentials provider with no further configuration. (Boolean, default:
false
) - profile
-
The AWS profile. (Profile, default:
<none>
) - secret-key
-
The secret key to be used with a static provider. (String, default:
<none>
)
spring.cloud.aws.region
- instance-profile
-
Configures an instance profile region provider with no further configuration. (Boolean, default:
false
) - profile
-
The AWS profile. (Profile, default:
<none>
) - static
-
<documentation missing> (String, default:
<none>
)
spring.cloud.aws.s3
- accelerate-mode-enabled
-
Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations. (Boolean, default:
<none>
) - checksum-validation-enabled
-
Option to disable doing a validation of the checksum of an object stored in S3. (Boolean, default:
<none>
) - chunked-encoding-enabled
-
Option to enable using chunked encoding when signing the request payload for {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} and {@link software.amazon.awssdk.services.s3.model.UploadPartRequest}. (Boolean, default:
<none>
) - cross-region-enabled
-
Enables cross-region bucket access. (Boolean, default:
<none>
) - endpoint
-
Overrides the default endpoint. (URI, default:
<none>
) - path-style-access-enabled
-
Option to enable using path style access for accessing S3 objects instead of DNS style access. DNS style access is preferred as it will result in better load balancing when accessing S3. (Boolean, default:
<none>
) - region
-
Overrides the default region. (String, default:
<none>
) - use-arn-region-enabled
-
If an S3 resource ARN is passed in as the target of an S3 operation that has a different region to the one the client was configured with, this flag must be set to 'true' to permit the client to make a cross-region call to the region specified in the ARN otherwise an exception will be thrown. (Boolean, default:
<none>
)
5.13.4. Amazon AWS common options
The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
5.13.5. Examples
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP Source
This source application supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See sftp-supplier
for advanced configuration options.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.14.1. Input
N/A (Fetches files from an SFTP server).
5.14.2. Output
mode = contents
Headers:
-
Content-Type: application/octet-stream
-
file_name: <file name>
-
file_remoteFileInfo <file metadata>
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
sftp_selectedServer: <server-key>
(if multi-source)
Payload:
A byte[]
filled with the file contents.
mode = lines
Headers:
-
Content-Type: text/plain
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read) -
file_marker : <file marker>
(if with-markers is enabled)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
mode = ref
Headers:
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
file_originalFile: <absolute-path-of-local-file>
-
file_name <local-file-name>
-
file_relativePath
-
file_remoteFile: <remote-file-name>
-
sftp_selectedServer: <server-key>
(if multi-source)
Payload:
A java.io.File
object.
5.14.3. Options
The ftp source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store.redis
- key
-
Redis key for metadata. (String, default:
<none>
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
sftp.supplier
- auto-create-local-dir
-
Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - delete-remote-files
-
Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - directories
-
A list of factory "name.directory" pairs. (String[], default:
<none>
) - factories
-
A map of factory names to factories. (Map<String, Factory>, default:
<none>
) - fair
-
True for fair rotation of multiple servers/directories. This is false by default so if a source has more than one entry, these will be received before the other sources are visited. (Boolean, default:
false
) - filename-pattern
-
A filter pattern to match the names of files to transfer. (String, default:
<none>
) - filename-regex
-
A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - list-only
-
Set to true to return file metadata without the entire payload. (Boolean, default:
false
) - local-dir
-
The local directory to use for file transfers. (File, default:
<none>
) - max-fetch
-
The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default:
<none>
) - preserve-timestamp
-
Set to true to preserve the original timestamp. (Boolean, default:
true
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - rename-remote-files-to
-
A SpEL expression resolving to the new name remote files must be renamed to after successful transfer. (Expression, default:
<none>
) - stream
-
Set to true to stream the file rather than copy to a local directory. (Boolean, default:
false
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
)
sftp.supplier.factory
- allow-unknown-keys
-
True to allow an unknown or changed key. (Boolean, default:
false
) - host
-
The host name of the server. (String, default:
localhost
) - known-hosts-expression
-
A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - pass-phrase
-
Passphrase for user's private key. (String, default:
<empty string>
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
22
) - private-key
-
Resource location of user's private key. (Resource, default:
<none>
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
sftp.supplier.sort-by
- attribute
-
Attribute of the file listing entry to sort by (FILENAME, ATIME: last access time, MTIME: last modified time). (Attribute, default:
<none>
) - dir
-
Sorting direction (ASC or DESC). (Dir, default:
<none>
)
5.14.4. Examples
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.15. SYSLOG
The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.
5.15.1. Options
- syslog.supplier.buffer-size
-
the buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - syslog.supplier.nio
-
whether or not to use NIO (when supporting a large number of connections). (Boolean, default:
false
) - syslog.supplier.port
-
The port to listen on. (Integer, default:
1514
) - syslog.supplier.protocol
-
Protocol used for SYSLOG (tcp or udp). (Protocol, default:
<none>
, possible values:tcp
,udp
,both
) - syslog.supplier.reverse-lookup
-
whether or not to perform a reverse lookup on the incoming socket. (Boolean, default:
false
) - syslog.supplier.rfc
-
'5424' or '3164' - the syslog format according to the RFC; 3164 is aka 'BSD' format. (String, default:
3164
) - syslog.supplier.socket-timeout
-
the socket timeout. (Integer, default:
0
)
5.16. TCP
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
5.16.1. Options
Properties grouped by prefix:
tcp
- nio
-
Whether or not to use NIO. (Boolean, default:
false
) - port
-
The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - reverse-lookup
-
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default:
false
) - socket-timeout
-
The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - use-direct-buffers
-
Whether or not to use direct buffers. (Boolean, default:
false
)
tcp.supplier
- buffer-size
-
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - decoder
-
The decoder to use when receiving messages. (Encoding, default:
<none>
, possible values:CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)
5.16.2. Available Decoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
5.17. Time Source
The time source will simply emit a String with the current time every so often.
5.17.1. Options
The time source has the following options:
- spring.integration.poller.cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - spring.integration.poller.fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - spring.integration.poller.fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - spring.integration.poller.initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - spring.integration.poller.max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - spring.integration.poller.receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
) - time.date-format
-
Format for the date value. (String, default:
MM/dd/yy HH:mm:ss
)
5.18. Twitter Message Source
Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order.
The relieved messages are cached (in a MetadataStore
cache) to prevent duplications.
By default an in-memory SimpleMetadataStore
is used.
The twitter.message.source.count
controls the number or returned messages.
The spring.cloud.stream.poller
properties control the message poll interval.
Must be aligned with used APIs rate limit
5.18.1. Options
Properties grouped by prefix:
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
twitter.message.source
- count
-
Max number of events to be returned. 20 default. 50 max. (Integer, default:
20
)
5.19. Twitter Search Source
The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source
provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.
Returns a collection of relevant Tweets matching a specified query.
Use the spring.cloud.stream.poller
properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)
The twitter.search
query properties allows querying by keywords and filter the result by time and geolocation.
The twitter.search.count
and twitter.search.page
control the result pagination in accordance with to the Search API.
Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.
5.19.1. Options
Properties grouped by prefix:
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
twitter.search
- count
-
Number of tweets to return per page (e.g. per single request), up to a max of 100. (Integer, default:
100
) - lang
-
Restricts searched tweets to the given language, given by an http://en.wikipedia.org/wiki/ISO_639-1 . (String, default:
<none>
) - page
-
Number of pages (e.g. requests) to search backwards (from most recent to the oldest tweets) before start the search from the most recent tweets again. The total amount of tweets searched backwards is (page * count) (Integer, default:
3
) - query
-
Search tweets by search query string. (String, default:
<none>
) - restart-from-most-recent-on-empty-response
-
Restart search from the most recent tweets on empty response. Applied only after the first restart (e.g. when since_id != UNBOUNDED) (Boolean, default:
false
) - result-type
-
Specifies what type of search results you would prefer to receive. The current default is "mixed." Valid values include: mixed : Include both popular and real time results in the response. recent : return only the most recent results in the response popular : return only the most popular results in the response (ResultType, default:
<none>
, possible values:popular
,mixed
,recent
) - since
-
If specified, returns tweets with since the given date. Date should be formatted as YYYY-MM-DD. (String, default:
<none>
)
twitter.search.geocode
- latitude
-
User's latitude. (Double, default:
-1
) - longitude
-
User's longitude. (Double, default:
-1
) - radius
-
Radius (in kilometers) around the (latitude, longitude) point. (Double, default:
-1
)
5.20. Twitter Stream Source
-
The
Filter API
returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: Thetrack
,follow
, andlocations
fields are combined with an OR operator! Queries withtrack=foo
andfollow=1234
returns Tweets matchingtest
OR created by user1234
. -
The
Sample API
returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.
The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.
5.20.1. Options
Properties grouped by prefix:
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
twitter.stream.filter
- count
-
Indicates the number of previous statuses to stream before transitioning to the live stream. (Integer, default:
0
) - filter-level
-
The filter level limits what tweets appear in the stream to those with a minimum filterLevel attribute value. One of either none, low, or medium. (FilterLevel, default:
<none>
) - follow
-
Specifies the users, by ID, to receive public tweets from. (List<Long>, default:
<none>
) - language
-
Specifies the tweets language of the stream. (List<String>, default:
<none>
) - locations
-
Locations to track. Internally represented as 2D array. Bounding box is invalid: 52.38, 4.90, 51.51, -0.12. The first pair must be the SW corner of the box (List<BoundingBox>, default:
<none>
) - track
-
Specifies keywords to track. (List<String>, default:
<none>
)
twitter.stream
- type
-
<documentation missing> (StreamType, default:
<none>
, possible values:sample
,filter
,firehose
,link
)
5.21. Websocket Source
The Websocket
source that produces messages through web socket.
5.21.1. Options
Properties grouped by prefix:
websocket.supplier
- allowed-origins
-
The allowed origins. (String, default:
*
) - path
-
The path on which server WebSocket handler is exposed. (String, default:
/websocket
)
websocket.supplier.sock-js
- enable
-
Enable SockJS service on the server. Default is 'false' (Boolean, default:
false
)
5.21.2. Examples
To verify that the websocket-source receives messages from Websocket clients, you can use the following simple end-to-end setup.
Step 1: Start kafka
Step 2: Deploy websocket-source
on a specific port, say 8080
Step 3: Connect a websocket client on port 8080 path "/websocket", and send some messages.
You can start a kafka console consumer and see the messages there.
5.22. XMPP Source
The "xmpp" source enables receiving messages from an XMPP Server.
5.22.1. Input
N/A
5.22.2. Output
Payload
-
byte[]
5.22.3. Options
The xmpp source has the following options:
Properties grouped by prefix:
xmpp.factory
- host
-
XMPP Host server to connect to. (String, default:
<none>
) - password
-
The Password for the connected user. (String, default:
<none>
) - port
-
Port for connecting to the host. - Default Client Port: 5222 (Integer, default:
5222
) - resource
-
The Resource to bind to on the XMPP Host. - Can be empty, server will generate one if not set (String, default:
<none>
) - security-mode
-
<documentation missing> (SecurityMode, default:
<none>
, possible values:required
,ifpossible
,disabled
) - service-name
-
The Service Name to set for the XMPP Domain. (String, default:
<none>
) - subscription-mode
-
<documentation missing> (SubscriptionMode, default:
<none>
, possible values:accept_all
,reject_all
,manual
) - user
-
The User the connection should connect as. (String, default:
<none>
)
xmpp.supplier
- payload-expression
-
<documentation missing> (Expression, default:
<none>
) - stanza-filter
-
<documentation missing> (StanzaFilter, default:
<none>
)
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
5.22.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.22.5. Examples
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ Source
The "zeromq" source enables receiving messages from ZeroMQ.
5.23.1. Input
N/A
5.23.2. Output
Payload
-
byte[]
5.23.3. Options
The zeromq source has the following options:
- zeromq.supplier.bind-port
-
Bind Port for creating a ZeroMQ Socket; 0 selects a random port. (Integer, default:
0
) - zeromq.supplier.connect-url
-
Connection URL for to the ZeroMQ Socket. (String, default:
<none>
) - zeromq.supplier.consume-delay
-
The delay to consume from the ZeroMQ Socket when no data received. (Duration, default:
1s
) - zeromq.supplier.socket-type
-
The Socket Type the connection should make. (SocketType, default:
<none>
, possible values:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
,CLIENT
,SERVER
,RADIO
,DISH
,CHANNEL
,PEER
,RAW
,SCATTER
,GATHER
) - zeromq.supplier.topics
-
The Topics to subscribe to. (String[], default:
[]
)
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
5.23.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.23.5. Examples
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. Processors
6.1. Aggregator Processor
Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
Change kafka to rabbit if you want to run it against RabbitMQ.
6.1.1. Payload
If an input payload is a byte[]
and content-type header is a JSON, then JsonBytesToMap
function tries to deserialize this payload to a Map
for better data representation on the output of the aggregator function.
Also, such a Map
data representation makes it easy to access to the payload content from SpEL expressions mentioned below.
Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.
6.1.2. Options
Properties grouped by prefix:
aggregator
- aggregation
-
SpEL expression for aggregation strategy. Default is collection of payloads. (Expression, default:
<none>
) - correlation
-
SpEL expression for correlation key. Default to correlationId header. (Expression, default:
<none>
) - group-timeout
-
SpEL expression for timeout to expiring uncompleted groups. (Expression, default:
<none>
) - message-store-entity
-
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc. (String, default:
<none>
) - message-store-type
-
Message store type. (String, default:
<none>
) - release
-
SpEL expression for release strategy. Default is based on the sequenceSize header. (Expression, default:
<none>
)
spring.data.mongodb
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. Overrides database in URI. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, and password. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
spring.data.mongodb.gridfs
- bucket
-
GridFS bucket name. (String, default:
<none>
) - database
-
GridFS database name. (String, default:
<none>
)
spring.data.mongodb.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
spring.data.redis
- client-name
-
Client name to be set on connections with CLIENT SETNAME. (String, default:
<none>
) - client-type
-
Type of client to use. By default, auto-detected according to the classpath. (ClientType, default:
<none>
, possible values:LETTUCE
,JEDIS
) - connect-timeout
-
Connection timeout. (Duration, default:
<none>
) - database
-
Database index used by the connection factory. (Integer, default:
0
) - host
-
Redis server host. (String, default:
localhost
) - password
-
Login password of the redis server. (String, default:
<none>
) - port
-
Redis server port. (Integer, default:
6379
) - timeout
-
Read timeout. (Duration, default:
<none>
) - url
-
Connection URL. Overrides host, port, username, and password. Example: redis://user:[email protected]:6379 (String, default:
<none>
) - username
-
Login username of the redis server. (String, default:
<none>
)
spring.data.redis.cluster
- max-redirects
-
Maximum number of redirects to follow when executing commands across the cluster. (Integer, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs to bootstrap from. This represents an "initial" list of cluster nodes and is required to have at least one entry. (List<String>, default:
<none>
)
spring.data.redis.jedis.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.lettuce.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.lettuce
- shutdown-timeout
-
Shutdown timeout. (Duration, default:
100ms
)
spring.data.redis.sentinel
- master
-
Name of the Redis server. (String, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs. (List<String>, default:
<none>
) - password
-
Password for authenticating with sentinel(s). (String, default:
<none>
) - username
-
Login username for authenticating with sentinel(s). (String, default:
<none>
)
spring.data.redis.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
spring.datasource
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
6.2. Bridge Processor
A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.
6.2.1. Payload
Any
6.3. Filter Processor
Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued.
For example, if the incoming payload is of type String
and you want to filter out anything that has less than five characters, you can run the filter processor as below.
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
Change kafka to rabbit if you want to run it against RabbitMQ.
6.3.1. Payload
You can pass any type as payload and then apply SpEL expressions against it to filter.
If the incoming type is byte[]
and the content type is set to text/plain
or application/json
, then the application converts the byte[]
into String
.
6.3.2. Options
- filter.function.expression
-
Boolean SpEL expression to apply against request message to filter. (Expression, default:
<none>
)
6.4. Groovy Processor
A Processor that applies a Groovy script against messages.
6.4.1. Options
The groovy-processor processor has the following options:
- groovy-processor.script
-
Reference to a script used to process messages. (Resource, default:
<none>
) - groovy-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - groovy-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.5. Header Enricher Processor
Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
.
6.5.1. Options
The header-enricher processor has the following options:
- header.enricher.headers
-
\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz. (Properties, default:
<none>
) - header.enricher.overwrite
-
set to true to overwrite any existing message headers. (Boolean, default:
false
)
6.6. Http Request Processor
A processor app that makes requests to an HTTP resource and emits the response body as a message payload.
6.6.1. Input
Headers
Any Required HTTP headers must be explicitly set via the headers
or headers-expression
property. See examples below.
Header values may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
Payload
The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
Note that user defined payload types will require adding required dependencies to your pom file.
6.6.2. Output
Headers
No HTTP message headers are mapped to the outbound Message.
Payload
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
Note that ResponseEntity (referenced by the expression #root
) cannot be deserialized by Jackson by default, but may be rendered as a HashMap
.
6.6.3. Options
The http-request processor has the following options:
6.6.4. Options
Properties grouped by prefix:
http.request
- body-expression
-
A SpEL expression to derive the request body from the incoming message. (Expression, default:
<none>
) - expected-response-type
-
The type used to interpret the response. (Class<?>, default:
<none>
) - headers-expression
-
A SpEL expression used to derive the http headers map to use. (Expression, default:
<none>
) - http-method-expression
-
A SpEL expression to derive the request method from the incoming message. (Expression, default:
<none>
) - reply-expression
-
A SpEL expression used to compute the final result, applied against the whole http {@link org.springframework.http.ResponseEntity}. (Expression, default:
<none>
) - timeout
-
Request timeout in milliseconds. (Long, default:
30000
) - url-expression
-
A SpEL expression against incoming message to determine the URL to use. (Expression, default:
<none>
)
spring.codec
- log-request-details
-
Whether to log form data at DEBUG level, and headers at TRACE level. (Boolean, default:
false
) - max-in-memory-size
-
Limit on the number of bytes that can be buffered whenever the input stream needs to be aggregated. This applies only to the auto-configured WebFlux server and WebClient instances. By default this is not set, in which case individual codec defaults apply. Most codecs are limited to 256K by default. (DataSize, default:
<none>
)
6.7. Image Recognition Processor
A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).
Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.
The input of the model is an image as binary array.
The output is a JSON message in this format:
{
"labels" : [
{"giant panda":0.98649305}
]
}
Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.
If the response-seize
is set to value higher then 1, then the result will include the top response-seize
probable labels. For example response-size=3
would return:
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
6.7.1. Payload
If the incoming type is byte[]
and the content type is set to application/octet-stream
, then the application process the input byte[]
image into and outputs augmented byte[]
image payload and json header.
6.7.2. Options
- image.recognition.cache-model
-
cache the pre-trained tensorflow model. (Boolean, default:
true
) - image.recognition.debug-output
-
<documentation missing> (Boolean, default:
false
) - image.recognition.debug-output-path
-
<documentation missing> (String, default:
image-recognition-result.png
) - image.recognition.model
-
pre-trained tensorflow image recognition model. Note that the model must match the selected model type! (String, default:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb
) - image.recognition.model-type
-
Supports three different pre-trained tensorflow image recognition models: Inception, MobileNetV1 and MobileNetV2 1. Inception graph uses 'input' as input and 'output' as output. 2. MobileNetV2 pre-trained models: https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - normalized image size is always square (e.g. H=W) - graph uses 'input' as input and 'MobilenetV2/Predictions/Reshape_1' as output. 3. MobileNetV1 pre-trained models: https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph uses 'input' as input and 'MobilenetV1/Predictions/Reshape_1' as output. (ModelType, default:
<none>
, possible values:inception
,mobilenetv1
,mobilenetv2
) - image.recognition.normalized-image-size
-
Normalized image size. (Integer, default:
224
) - image.recognition.response-size
-
number of recognized images. (Integer, default:
5
)
6.8. Object Detection Processor
The Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor is built on top of the Object Detection Function.
You have to provide the Processor with a pre-trained object detection model, and the corresponding object labels.
Here are some sensible configuration defaults:
-
object.detection.model
:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
-
object.detection.labels
:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
-
object.detection.with-masks
:false
The following diagram shows a Spring Cloud Data Flow, streaming pipeline, that predicts, in real-time, the object types in input image stream.
Processor’s input is an image byte array, and the output is an augmented image, and a header, called detected_objects
, that provides textual description of the detected objects:
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
The detected_objects
header format is:
-
object-name:confidence - human readable name of the detected object (e.g. label) with its confidence as a float between [0-1]
-
x1, y1, x2, y2 - Response also provides the bounding box of the detected objects represented as
(x1, y1, x2, y2)
. The coordinates are relative to the size of the image size. -
cid - Classification identifier as defined in the provided labels configuration file.
6.8.1. Payload
The incoming type is byte[]
, and the content type is application/octet-stream
. The processor processes the input byte[]
image and outputs an augmented byte[]
image payload and a JSON header (detected_objects
).
6.8.2. Options
- object.detection.cache-model
-
<documentation missing> (Boolean, default:
true
) - object.detection.confidence
-
<documentation missing> (Float, default:
0.4
) - object.detection.debug-output
-
<documentation missing> (Boolean, default:
false
) - object.detection.debug-output-path
-
<documentation missing> (String, default:
object-detection-result.png
) - object.detection.labels
-
Labels URI. (String, default:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
) - object.detection.model
-
pre-trained tensorflow object detection model. (String, default:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb
) - object.detection.response-size
-
<documentation missing> (Integer, default:
<none>
) - object.detection.with-masks
-
<documentation missing> (Boolean, default:
false
)
6.9. Semantic Segmentation Processor
Image Semantic Segmentation based on the state-of-art DeepLab Tensorflow model.
The Semantic Segmentation
is the process of associating each pixel of an image with a class label, (such as flower, person, road, sky, ocean, or car).
Unlike the Instance Segmentation
, which produces instance-aware region masks, the Semantic Segmentation
produces class-aware masks.
For implementing Instance Segmentation
consult the Object Detection Service instead.
The Semantic Segmentation Processor
uses the Semantic Segmentation Function library and the TensorFlow Service.
6.9.1. Payload
The incoming type is byte[]
, and the content type is application/octet-stream
. The processor processes the input byte[]
image and outputs augmented byte[]
image payload and json header.
Processor’s input is an image byte array, and the output is an augmented image byte array, and a JSON header semantic_segmentation
in this format:
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ],
...
]
The output header json format represents the color pixel map computed from the input image.
6.9.2. Options
- semantic.segmentation.color-map-uri
-
Every pre-trained model is based on certain object color maps. The pre-defined options are: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (String, default:
classpath:/colormap/citymap_colormap.json
) - semantic.segmentation.debug-output
-
save output image inn the local debugOutputPath path. (Boolean, default:
false
) - semantic.segmentation.debug-output-path
-
<documentation missing> (String, default:
semantic-segmentation-result.png
) - semantic.segmentation.mask-transparency
-
The alpha color of the computed segmentation mask image. (Float, default:
0.45
) - semantic.segmentation.model
-
pre-trained tensorflow semantic segmentation model. (String, default:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb
) - semantic.segmentation.output-type
-
Specifies the output image type. You can return either the input image with the computed mask overlay, or the mask alone. (OutputType, default:
<none>
, possible values:blended
,mask
)
6.10. Script Processor
Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
6.10.1. Options
The script-processor processor has the following options:
- script-processor.language
-
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default:
<none>
) - script-processor.script
-
Text of the script. (String, default:
<none>
) - script-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - script-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.11. Splitter Processor
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
The processor uses a function that takes a Message<?>
as input and then produces a List<Message<?>
as output based on various properties (see below).
You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.
6.11.1. Payload
-
Incoming payload -
Message<?
>
If the incoming type is byte[]
and the content type is set to text/plain
or application/json
, then the application converts the byte[]
into String
.
-
Outgoing payload -
List<Message<?>
6.11.2. Options
- splitter.apply-sequence
-
Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default:
true
) - splitter.charset
-
The charset to use when converting bytes in text-based files to String. (String, default:
<none>
) - splitter.delimiters
-
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default:
<none>
) - splitter.expression
-
A SpEL expression for splitting payloads. (String, default:
<none>
) - splitter.file-markers
-
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default:
<none>
) - splitter.markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
)
6.12. Transform Processor
Transformer processor allows you to convert the message payload structure based on a SpEL expression.
Here is an example of how you can run this application.
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
Change kafka to rabbit if you want to run it against RabbitMQ.
6.12.1. Payload
The incoming message can contain any type of payload.
6.12.2. Options
- spel.function.expression
-
A SpEL expression to apply. (String, default:
<none>
)
6.13. Twitter Trend and Trend Locations Processor
Processor that can return either trending topic or the Locations of the trending topics.
The twitter.trend.trend-query-type
property allow to select the query type.
6.13.1. Retrieve trending topic in a location (optionally)
For this mode set twitter.trend.trend-query-type
to trend
.
Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.
6.13.2. Retrieve trend Locations
For this mode set twitter.trend.trend-query-type
to trendLocation
.
Retrieve a full or nearby locations list of trending topics by location.
If the latitude
, longitude
parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.
If the latitude
, longitude
parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.
Response is an array of locations
that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.
6.13.3. Options
Properties grouped by prefix:
twitter.trend.closest
- lat
-
If provided with a long parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default:
<none>
) - lon
-
If provided with a lat parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default:
<none>
)
twitter.trend
- location-id
-
The Yahoo! Where On Earth ID of the location to return trending information for. Global information is available by using 1 as the WOEID. (Expression, default:
payload
) - trend-query-type
-
<documentation missing> (TrendQueryType, default:
<none>
, possible values:trend
,trendLocation
)
7. Sinks
7.1. Cassandra Sink
This sink application writes the content of each message it receives into Cassandra.
It expects a payload of JSON String and uses it’s properties to map to table columns.
7.1.1. Payload
A JSON String or byte array representing the entity (or a list of entities) to be persisted.
7.1.2. Options
The cassandra sink has the following options:
Properties grouped by prefix:
cassandra.cluster
- create-keyspace
-
Flag to create (or not) keyspace on application startup. (Boolean, default:
false
) - entity-base-packages
-
Base packages to scan for entities annotated with Table annotations. (String[], default:
[]
) - init-script
-
Resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default:
<none>
) - skip-ssl-validation
-
Flag to validate the Servers' SSL certs. (Boolean, default:
false
)
cassandra
- consistency-level
-
The consistency level for write operation. (ConsistencyLevel, default:
<none>
) - ingest-query
-
Ingest Cassandra query. (String, default:
<none>
) - query-type
-
QueryType for Cassandra Sink. (Type, default:
<none>
, possible values:INSERT
,UPDATE
,DELETE
,STATEMENT
) - statement-expression
-
Expression in Cassandra query DSL style. (Expression, default:
<none>
) - ttl
-
Time-to-live option of WriteOptions. (Integer, default:
0
)
spring.cassandra
- compression
-
Compression supported by the Cassandra binary protocol. (Compression, default:
none
, possible values:LZ4
,SNAPPY
,NONE
) - config
-
Location of the configuration file to use. (Resource, default:
<none>
) - contact-points
-
Cluster node addresses in the form 'host:port', or a simple 'host' to use the configured port. (List<String>, default:
[127.0.0.1:9042]
) - keyspace-name
-
Keyspace name to use. (String, default:
<none>
) - local-datacenter
-
Datacenter that is considered "local". Contact points should be from this datacenter. (String, default:
<none>
) - password
-
Login password of the server. (String, default:
<none>
) - port
-
Port to use if a contact point does not specify one. (Integer, default:
9042
) - schema-action
-
Schema action to take at startup. (String, default:
none
) - session-name
-
Name of the Cassandra session. (String, default:
<none>
) - username
-
Login user of the server. (String, default:
<none>
)
spring.cassandra.connection
- connect-timeout
-
Timeout to use when establishing driver connections. (Duration, default:
5s
) - init-query-timeout
-
Timeout to use for internal queries that run as part of the initialization process, just after a connection is opened. (Duration, default:
5s
)
spring.cassandra.controlconnection
- timeout
-
Timeout to use for control queries. (Duration, default:
5s
)
spring.cassandra.pool
- heartbeat-interval
-
Heartbeat interval after which a message is sent on an idle connection to make sure it's still alive. (Duration, default:
30s
) - idle-timeout
-
Idle timeout before an idle connection is removed. (Duration, default:
5s
)
spring.cassandra.request
- consistency
-
Queries consistency level. (DefaultConsistencyLevel, default:
<none>
, possible values:ANY
,ONE
,TWO
,THREE
,QUORUM
,ALL
,LOCAL_ONE
,LOCAL_QUORUM
,EACH_QUORUM
,SERIAL
,LOCAL_SERIAL
) - page-size
-
How many rows will be retrieved simultaneously in a single network round-trip. (Integer, default:
5000
) - serial-consistency
-
Queries serial consistency level. (DefaultConsistencyLevel, default:
<none>
, possible values:ANY
,ONE
,TWO
,THREE
,QUORUM
,ALL
,LOCAL_ONE
,LOCAL_QUORUM
,EACH_QUORUM
,SERIAL
,LOCAL_SERIAL
) - timeout
-
How long the driver waits for a request to complete. (Duration, default:
2s
)
spring.cassandra.request.throttler
- drain-interval
-
How often the throttler attempts to dequeue requests. Set this high enough that each attempt will process multiple entries in the queue, but not delay requests too much. (Duration, default:
<none>
) - max-concurrent-requests
-
Maximum number of requests that are allowed to execute in parallel. (Integer, default:
<none>
) - max-queue-size
-
Maximum number of requests that can be enqueued when the throttling threshold is exceeded. (Integer, default:
<none>
) - max-requests-per-second
-
Maximum allowed request rate. (Integer, default:
<none>
) - type
-
Request throttling type. (ThrottlerType, default:
none
, possible values:CONCURRENCY_LIMITING
,RATE_LIMITING
,NONE
)
spring.cassandra.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. (Boolean, default:
<none>
)
7.2. Analytics Sink
Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.
The analytics sink can produce two metrics types:
-
Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.
-
Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.
A Meter (e.g Counter or Gauge) is uniquely identified by its name
and dimensions
(the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.
As a metrics is uniquely identified by its name and dimensions , you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.
|
Use the analytics.name
or analytics.name-expression
properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.
Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
, property for adding one or many tags to your metrics. the TAG_NAME
used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL
expression that dynamically computes the tag value from the incoming message.
The SpEL
expressions use the headers
and payload
keywords to access message’s headers and payload values.
You can use literals (e.g. 'fixed value' ) to set tags with fixed values.
|
All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront
, Prometheus
and InfluxDB
and you can enable each of them declaratively.
You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink
applications.
Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.
-
To enable the Prometheus meter registry, set the following properties.
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
To enable Wavefront meter registry, set the following properties.
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
To enable InfluxDB meter registry, set the following property.
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.
|
Following diagram illustrates how the Analytics Sink
helps to collection business insides for stock-exchange, real-time pipeline.
7.2.1. Payload
The incoming message can contain any type of payload.
7.2.2. Options
Properties grouped by prefix:
analytics
- amount-expression
-
A SpEL expression to compute the output metrics value (e.g. amount). It defaults to 1.0 (Expression, default:
<none>
) - meter-type
-
Micrometer meter type used to report the metrics to the backend. (MeterType, default:
<none>
, possible values:counter
,gauge
) - name
-
The name of the output metrics. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (String, default:
<none>
) - name-expression
-
A SpEL expression to compute the output metrics name from the input message. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (Expression, default:
<none>
)
analytics.tag
- expression
-
Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate meter increment. Tag expression format is: analytics.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default:
<none>
) - fixed
-
DEPRECATED: Please use the analytics.tag.expression with literal SpEL expression. Custom, fixed Tags. Those tags have constant values, created once and then sent along with every published metrics. The convention to define a fixed Tags is: <code> analytics.tag.fixed.[tag-name]=[tag-value] </code> (Map<String, String>, default:
<none>
)
7.3. Elasticsearch Sink
Sink that indexes documents into Elasticsearch.
This Elasticsearch sink only supports indexing JSON documents.
It consumes data from an input destination and then indexes it to Elasticsearch.
The input data can be a plain json string, or a java.util.Map
that represents the JSON.
It also accepts the data as the Elasticsearch provided XContentBuilder
.
However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder
.
This is provided mainly for direct invocation of the consumer.
7.3.1. Options
The Elasticsearch sink has the following options:
Properties grouped by prefix:
elasticsearch.consumer
- async
-
Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default:
false
) - batch-size
-
Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default:
1
) - group-timeout
-
Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default:
-1
) - id
-
The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default:
<none>
) - index
-
Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default:
<none>
) - routing
-
Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default:
<none>
) - timeout-seconds
-
Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default:
0
)
spring.elasticsearch
- connection-timeout
-
Connection timeout used when communicating with Elasticsearch. (Duration, default:
1s
) - password
-
Password for authentication with Elasticsearch. (String, default:
<none>
) - path-prefix
-
Prefix added to the path of every request sent to Elasticsearch. (String, default:
<none>
) - socket-keep-alive
-
Whether to enable socket keep alive between client and Elasticsearch. (Boolean, default:
false
) - socket-timeout
-
Socket timeout used when communicating with Elasticsearch. (Duration, default:
30s
) - uris
-
Comma-separated list of the Elasticsearch instances to use. (List<String>, default:
[http://localhost:9200]
) - username
-
Username for authentication with Elasticsearch. (String, default:
<none>
)
spring.elasticsearch.restclient.sniffer
- delay-after-failure
-
Delay of a sniff execution scheduled after a failure. (Duration, default:
1m
) - interval
-
Interval between consecutive ordinary sniff executions. (Duration, default:
5m
)
spring.elasticsearch.restclient.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
)
7.3.2. Examples of running this sink
-
From the folder
elasticsearch-sink
:./mvnw clean package
-
cd apps
-
cd to the proper binder generated app (Kafka or RabbitMQ)
-
./mvnw clean package
-
Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command.
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
Start the middleware (Kafka or RabbitMQ) if it is not already running.
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
Send some JSON data into the middleware destination. For e.g:
{"foo":"bar"}
-
Verify that the data is indexed:
curl localhost:9200/testing/_search
7.4. File Sink
The file sink app writes each message it receives to a file.
7.4.1. Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.4.2. Options
The file-sink
has the following options:
- file.consumer.binary
-
A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default:
false
) - file.consumer.charset
-
The charset to use when writing text content. (String, default:
UTF-8
) - file.consumer.directory
-
The parent directory of the target file. (File, default:
<none>
) - file.consumer.directory-expression
-
The expression to evaluate for the parent directory of the target file. (String, default:
<none>
) - file.consumer.mode
-
The FileExistsMode to use if the target file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - file.consumer.name
-
The name of the target file. (String, default:
file-consumer
) - file.consumer.name-expression
-
The expression to evaluate for the name of the target file. (String, default:
<none>
) - file.consumer.suffix
-
The suffix to append to file name. (String, default:
<empty string>
)
7.5. FTP Sink
FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
7.5.1. Headers
-
file_name
(See note above)
7.5.2. Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.5.3. Output
N/A (writes to the FTP server).
7.5.4. Options
The ftp sink has the following options:
Properties grouped by prefix:
ftp.consumer
- auto-create-dir
-
Whether or not to create the remote directory. (Boolean, default:
true
) - filename-expression
-
A SpEL expression to generate the remote file name. (String, default:
<none>
) - mode
-
Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - temporary-remote-dir
-
A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
) - use-temporary-filename
-
Whether or not to write to a temporary file and rename. (Boolean, default:
true
)
ftp.factory
- cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - client-mode
-
The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values:ACTIVE
,PASSIVE
) - host
-
The host name of the server. (String, default:
localhost
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
21
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
7.6. JDBC Sink
JDBC sink allows you to persist incoming payload into an RDBMS database.
The jdbc.consumer.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.consumer.columns=name,city:address.city,street:address.street
This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the batch-size
and idle-timeout
properties:
Incoming messages are aggregated until batch-size
messages are present, then inserted as a batch.
If idle-timeout
milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size
, capping maximum latency.
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.6.1. Examples
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. Payload
7.6.3. Options
The jdbc sink has the following options:
Properties grouped by prefix:
jdbc.consumer
- batch-size
-
Threshold in number of messages when data will be flushed to database table. (Integer, default:
1
) - columns
-
The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default:
payload:payload.toString()
) - idle-timeout
-
Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default:
-1
) - initialize
-
'true', 'false' or the location of a custom initialization script for the table. (String, default:
false
) - table-name
-
The name of the table to write into. (String, default:
messages
)
spring.datasource
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
7.7. Apache Kafka Sink
This module publishes messages to Apache Kafka.
7.7.1. Options
The kafka sink has the following options:
(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)
Properties grouped by prefix:
kafka.publisher
- key
-
Kafka record key - overridden by keyExpression, if supplied. (String, default:
<none>
) - key-expression
-
A SpEL expression that evaluates to a Kafka record key. (Expression, default:
<none>
) - mapped-headers
-
Headers that will be mapped. (String[], default:
[*]
) - partition
-
Kafka topic partition - overridden by partitionExpression, if supplied. (Integer, default:
<none>
) - partition-expression
-
A SpEL expression that evaluates to a Kafka topic partition. (Expression, default:
<none>
) - send-timeout
-
How long Kafka producer handler should wait for send operation results. Defaults to 10 seconds. (Duration, default:
10s
) - sync
-
True if Kafka producer handler should operation in a sync mode. (Boolean, default:
false
) - timestamp
-
Kafka record timestamp - overridden by timestampExpression, if supplied. (Long, default:
<none>
) - timestamp-expression
-
A SpEL expression that evaluates to a Kafka record timestamp. (Expression, default:
<none>
) - topic
-
Kafka topic - overridden by topicExpression, if supplied. Defaults to KafkaTemplate.getDefaultTopic() (String, default:
<none>
) - topic-expression
-
A SpEL expression that evaluates to a Kafka topic. (Expression, default:
<none>
) - use-template-converter
-
Whether to use the template's message converter to create a Kafka record. (Boolean, default:
false
)
spring.kafka
- bootstrap-servers
-
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. (List<String>, default:
<none>
) - client-id
-
ID to pass to the server when making requests. Used for server-side logging. (String, default:
<none>
) - properties
-
Additional properties, common to producers and consumers, used to configure the client. (Map<String, String>, default:
<none>
)
spring.kafka.producer
- acks
-
Number of acknowledgments the producer requires the leader to have received before considering a request complete. (String, default:
<none>
) - batch-size
-
Default batch size. A small batch size will make batching less common and may reduce throughput (a batch size of zero disables batching entirely). (DataSize, default:
<none>
) - bootstrap-servers
-
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers. (List<String>, default:
<none>
) - buffer-memory
-
Total memory size the producer can use to buffer records waiting to be sent to the server. (DataSize, default:
<none>
) - client-id
-
ID to pass to the server when making requests. Used for server-side logging. (String, default:
<none>
) - compression-type
-
Compression type for all data generated by the producer. (String, default:
<none>
) - key-serializer
-
Serializer class for keys. (Class<?>, default:
<none>
) - properties
-
Additional producer-specific properties used to configure the client. (Map<String, String>, default:
<none>
) - retries
-
When greater than zero, enables retrying of failed sends. (Integer, default:
<none>
) - transaction-id-prefix
-
When non empty, enables transaction support for producer. (String, default:
<none>
) - value-serializer
-
Serializer class for values. (Class<?>, default:
<none>
)
spring.kafka.template
- default-topic
-
Default topic to which messages are sent. (String, default:
<none>
) - transaction-id-prefix
-
Transaction id prefix, override the transaction id prefix in the producer factory. (String, default:
<none>
)
7.8. Log Sink
The log
sink uses the application logger to output the data for inspection.
Please understand that log
sink uses type-less handler, which affects how the actual logging will be performed.
This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged.
Please see more info in the user-guide.
7.8.1. Options
The log sink has the following options:
- log.expression
-
A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default:
payload
) - log.level
-
The level at which to log messages. (Level, default:
<none>
, possible values:FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
) - log.name
-
The name of the logger to use. (String, default:
<none>
)
7.9. MongoDB Sink
This sink application ingest incoming data into MongoDB.
This application is fully based on the MongoDataAutoConfiguration
, so refer to the Spring Boot MongoDB Support for more information.
7.9.1. Input
Payload
-
Any POJO
-
String
-
byte[]
7.9.2. Options
The mongodb sink has the following options:
Properties grouped by prefix:
mongodb.consumer
- collection
-
The MongoDB collection to store data. (String, default:
<none>
) - collection-expression
-
The SpEL expression to evaluate MongoDB collection. (Expression, default:
<none>
)
spring.data.mongodb
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. Overrides database in URI. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, and password. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
spring.data.mongodb.gridfs
- bucket
-
GridFS bucket name. (String, default:
<none>
) - database
-
GridFS database name. (String, default:
<none>
)
spring.data.mongodb.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
7.10. MQTT Sink
This module sends messages to MQTT.
7.10.1. Payload:
-
byte[]
-
String
7.10.2. Options
The mqtt sink has the following options:
Properties grouped by prefix:
mqtt
- clean-session
-
whether the client and server should remember state across restarts and reconnects. (Boolean, default:
true
) - connection-timeout
-
the connection timeout in seconds. (Integer, default:
30
) - keep-alive-interval
-
the ping interval in seconds. (Integer, default:
60
) - password
-
the password to use when connecting to the broker. (String, default:
guest
) - persistence
-
'memory' or 'file'. (String, default:
memory
) - persistence-directory
-
Persistence directory. (String, default:
/tmp/paho
) - ssl-properties
-
MQTT Client SSL properties. (Map<String, String>, default:
<none>
) - url
-
location of the mqtt broker(s) (comma-delimited list). (String[], default:
[tcp://localhost:1883]
) - username
-
the username to use when connecting to the broker. (String, default:
guest
)
mqtt.consumer
- async
-
whether or not to use async sends. (Boolean, default:
false
) - charset
-
the charset used to convert a String payload to byte[]. (String, default:
UTF-8
) - client-id
-
identifies the client. (String, default:
stream.client.id.sink
) - qos
-
the quality of service to use. (Integer, default:
1
) - retained
-
whether to set the 'retained' flag. (Boolean, default:
false
) - topic
-
the topic to which the sink will publish. (String, default:
stream.mqtt
)
7.11. Pgcopy Sink
A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.
7.11.1. Input
Headers
Payload
-
Any
Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)
7.11.2. Output
N/A
7.11.3. Options
The jdbc sink has the following options:
- spring.datasource.driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.password
-
Login password of the database. (String, default:
<none>
) - spring.datasource.url
-
JDBC URL of the database. (String, default:
<none>
) - spring.datasource.username
-
Login username of the database. (String, default:
<none>
)
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.11.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
For integration tests to run, start a PostgreSQL database on localhost:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. Examples
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.12. RabbitMQ Sink
This module sends messages to RabbitMQ.
7.12.1. Options
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
Properties grouped by prefix:
rabbit
- converter-bean-name
-
The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default:
<none>
) - exchange
-
Exchange name - overridden by exchangeNameExpression, if supplied. (String, default:
<empty string>
) - exchange-expression
-
A SpEL expression that evaluates to an exchange name. (Expression, default:
<none>
) - headers-mapped-last
-
When mapping headers for the outbound message, determine whether the headers are mapped before the message is converted, or afterwards. (Boolean, default:
true
) - mapped-request-headers
-
Headers that will be mapped. (String[], default:
[*]
) - own-connection
-
When true, use a separate connection based on the boot properties. (Boolean, default:
false
) - persistent-delivery-mode
-
Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default:
false
) - routing-key
-
Routing key - overridden by routingKeyExpression, if supplied. (String, default:
<none>
) - routing-key-expression
-
A SpEL expression that evaluates to a routing key. (Expression, default:
<none>
)
spring.rabbitmq
- address-shuffle-mode
-
Mode used to shuffle configured addresses. (AddressShuffleMode, default:
none
, possible values:NONE
,RANDOM
,INORDER
) - addresses
-
Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default:
<none>
) - channel-rpc-timeout
-
Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default:
10m
) - connection-timeout
-
Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - host
-
RabbitMQ host. Ignored if an address is set. (String, default:
localhost
) - password
-
Login to authenticate against the broker. (String, default:
guest
) - port
-
RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default:
<none>
) - publisher-confirm-type
-
Type of publisher confirms to use. (ConfirmType, default:
<none>
, possible values:SIMPLE
,CORRELATED
,NONE
) - publisher-returns
-
Whether to enable publisher returns. (Boolean, default:
false
) - requested-channel-max
-
Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default:
2047
) - requested-heartbeat
-
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default:
<none>
) - username
-
Login user to authenticate to the broker. (String, default:
guest
) - virtual-host
-
Virtual host to use when connecting to the broker. (String, default:
<none>
)
spring.rabbitmq.template
- default-receive-queue
-
Name of the default queue to receive messages from when none is specified explicitly. (String, default:
<none>
) - exchange
-
Name of the default exchange to use for send operations. (String, default:
<empty string>
) - mandatory
-
Whether to enable mandatory messages. (Boolean, default:
<none>
) - receive-timeout
-
Timeout for receive() operations. (Duration, default:
<none>
) - reply-timeout
-
Timeout for sendAndReceive() operations. (Duration, default:
<none>
) - routing-key
-
Value of a default routing key to use for send operations. (String, default:
<empty string>
)
7.13. Redis Sink
Sends messages to Redis.
7.13.1. Options
The redis sink has the following options:
Properties grouped by prefix:
redis.consumer
- key
-
A literal key name to use when storing to a key. (String, default:
<none>
) - key-expression
-
A SpEL expression to use for storing to a key. (String, default:
<none>
) - queue
-
A literal queue name to use when storing in a queue. (String, default:
<none>
) - queue-expression
-
A SpEL expression to use for queue. (String, default:
<none>
) - topic
-
A literal topic name to use when publishing to a topic. (String, default:
<none>
) - topic-expression
-
A SpEL expression to use for topic. (String, default:
<none>
)
spring.data.redis
- client-name
-
Client name to be set on connections with CLIENT SETNAME. (String, default:
<none>
) - client-type
-
Type of client to use. By default, auto-detected according to the classpath. (ClientType, default:
<none>
, possible values:LETTUCE
,JEDIS
) - connect-timeout
-
Connection timeout. (Duration, default:
<none>
) - database
-
Database index used by the connection factory. (Integer, default:
0
) - host
-
Redis server host. (String, default:
localhost
) - password
-
Login password of the redis server. (String, default:
<none>
) - port
-
Redis server port. (Integer, default:
6379
) - timeout
-
Read timeout. (Duration, default:
<none>
) - url
-
Connection URL. Overrides host, port, username, and password. Example: redis://user:[email protected]:6379 (String, default:
<none>
) - username
-
Login username of the redis server. (String, default:
<none>
)
spring.data.redis.cluster
- max-redirects
-
Maximum number of redirects to follow when executing commands across the cluster. (Integer, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs to bootstrap from. This represents an "initial" list of cluster nodes and is required to have at least one entry. (List<String>, default:
<none>
)
spring.data.redis.jedis.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.lettuce.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.lettuce
- shutdown-timeout
-
Shutdown timeout. (Duration, default:
100ms
)
spring.data.redis.sentinel
- master
-
Name of the Redis server. (String, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs. (List<String>, default:
<none>
) - password
-
Password for authenticating with sentinel(s). (String, default:
<none>
) - username
-
Login username for authenticating with sentinel(s). (String, default:
<none>
)
spring.data.redis.ssl
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
7.14. Router Sink
This application routes messages to named channels.
7.14.1. Options
The router sink has the following options:
- router.default-output-binding
-
Where to send un-routable messages. (String, default:
<none>
) - router.destination-mappings
-
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.expression
-
The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default:
<none>
) - router.refresh-delay
-
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default:
60000
) - router.resolution-required
-
Whether channel resolution is required. (Boolean, default:
false
) - router.script
-
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default:
<none>
) - router.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
This router sink is based on a StreamBridge API from Spring Cloud Stream, therefore destinations can be created as needed.
In this case a defaultOutputBinding can only be reached if key is not included into destinationMappings .
The resolutionRequired = true neglects defaultOutputBinding and throws an exception if no mapping and no respective binding already declared.
|
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputBinding
, which must also appear in the list.
The destinationMappings
are used to map the evaluation results to an actual destination name.
7.14.2. SpEL-based Routing
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring a Generic Router section.
7.14.3. Groovy-based Routing
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
7.15. RSocket Sink
RSocket sink to send data using RSocket protocols' fire and forget strategy.
7.15.1. Options
The rsocket sink has the following options:
- rsocket.consumer.host
-
RSocket host. (String, default:
localhost
) - rsocket.consumer.port
-
RSocket port. (Integer, default:
7000
) - rsocket.consumer.route
-
Route used for RSocket. (String, default:
<none>
) - rsocket.consumer.uri
-
URI that can be used for websocket based transport. (URI, default:
<none>
)
7.16. Amazon S3 Sink
This sink app supports transferring objects to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain one of the payload
as:
-
File
, including directories for recursive upload; -
InputStream
; -
byte[]
7.16.1. Options
The s3 sink has the following options:
Properties grouped by prefix:
s3.consumer
- acl
-
S3 Object access control list. (ObjectCannedACL, default:
<none>
, possible values:private
,public-read
,public-read-write
,authenticated-read
,aws-exec-read
,bucket-owner-read
,bucket-owner-full-control
,null
) - acl-expression
-
Expression to evaluate S3 Object access control list. (Expression, default:
<none>
) - bucket
-
AWS bucket for target file(s) to store. (String, default:
<none>
) - bucket-expression
-
Expression to evaluate AWS bucket name. (Expression, default:
<none>
) - key-expression
-
Expression to evaluate S3 Object key. (Expression, default:
<none>
)
spring.cloud.aws.credentials
- access-key
-
The access key to be used with a static provider. (String, default:
<none>
) - instance-profile
-
Configures an instance profile credentials provider with no further configuration. (Boolean, default:
false
) - profile
-
The AWS profile. (Profile, default:
<none>
) - secret-key
-
The secret key to be used with a static provider. (String, default:
<none>
)
spring.cloud.aws.region
- instance-profile
-
Configures an instance profile region provider with no further configuration. (Boolean, default:
false
) - profile
-
The AWS profile. (Profile, default:
<none>
) - static
-
<documentation missing> (String, default:
<none>
)
spring.cloud.aws.s3
- accelerate-mode-enabled
-
Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations. (Boolean, default:
<none>
) - checksum-validation-enabled
-
Option to disable doing a validation of the checksum of an object stored in S3. (Boolean, default:
<none>
) - chunked-encoding-enabled
-
Option to enable using chunked encoding when signing the request payload for {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} and {@link software.amazon.awssdk.services.s3.model.UploadPartRequest}. (Boolean, default:
<none>
) - cross-region-enabled
-
Enables cross-region bucket access. (Boolean, default:
<none>
) - endpoint
-
Overrides the default endpoint. (URI, default:
<none>
) - path-style-access-enabled
-
Option to enable using path style access for accessing S3 objects instead of DNS style access. DNS style access is preferred as it will result in better load balancing when accessing S3. (Boolean, default:
<none>
) - region
-
Overrides the default region. (String, default:
<none>
) - use-arn-region-enabled
-
If an S3 resource ARN is passed in as the target of an S3 operation that has a different region to the one the client was configured with, this flag must be set to 'true' to permit the client to make a cross-region call to the region specified in the ARN otherwise an exception will be thrown. (Boolean, default:
<none>
)
spring.cloud.aws.s3.crt
- initial-read-buffer-size-in-bytes
-
Configure the starting buffer size the client will use to buffer the parts downloaded from S3. Maintain a larger window to keep up a high download throughput; parts cannot download in parallel unless the window is large enough to hold multiple parts. Maintain a smaller window to limit the amount of data buffered in memory. (Long, default:
<none>
) - max-concurrency
-
Specifies the maximum number of S3 connections that should be established during transfer. (Integer, default:
<none>
) - minimum-part-size-in-bytes
-
Sets the minimum part size for transfer parts. Decreasing the minimum part size causes multipart transfer to be split into a larger number of smaller parts. Setting this value too low has a negative effect on transfer speeds, causing extra latency and network communication for each part. (Long, default:
<none>
) - target-throughput-in-gbps
-
The target throughput for transfer requests. Higher value means more S3 connections will be opened. Whether the transfer manager can achieve the configured target throughput depends on various factors such as the network bandwidth of the environment and the configured `max-concurrency`. (Double, default:
<none>
)
The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
See Spring Integration AWS support for more details.
7.16.2. Amazon AWS common options
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
-
cloud.aws.region.auto
-
cloud.aws.region.static
Examples
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP Sink
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
7.17.1. Input
Headers
-
file_name
(See note above)
Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.17.2. Output
N/A (writes to the SFTP server).
7.17.3. Options
The sftp sink has the following options:
Properties grouped by prefix:
sftp.consumer
- auto-create-dir
-
Whether to create the remote directory. (Boolean, default:
true
) - filename-expression
-
A SpEL expression to generate the remote file name. (String, default:
<none>
) - mode
-
Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - temporary-remote-dir
-
A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
) - use-temporary-filename
-
Whether to write to a temporary file and rename. (Boolean, default:
true
)
sftp.consumer.factory
- allow-unknown-keys
-
True to allow an unknown or changed key. (Boolean, default:
false
) - cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - host
-
The host name of the server. (String, default:
localhost
) - known-hosts-expression
-
A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - pass-phrase
-
Passphrase for user's private key. (String, default:
<empty string>
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
22
) - private-key
-
Resource location of user's private key. (Resource, default:
<none>
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
7.18. TCP Sink
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
7.18.1. Options
The tcp sink has the following options:
Properties grouped by prefix:
tcp.consumer
- charset
-
The charset used when converting from bytes to String. (String, default:
UTF-8
) - close
-
Whether to close the socket after each message. (Boolean, default:
false
) - encoder
-
The encoder to use when sending messages. (Encoding, default:
<none>
, possible values:CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - host
-
The host to which this sink will connect. (String, default:
<none>
)
tcp
- nio
-
Whether or not to use NIO. (Boolean, default:
false
) - port
-
The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - reverse-lookup
-
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default:
false
) - socket-timeout
-
The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - use-direct-buffers
-
Whether or not to use direct buffers. (Boolean, default:
false
)
7.18.2. Available Encoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
7.19. Throughput Sink
Sink that will count messages and log the observed throughput at a selected interval.
7.19.1. Options
The throughput sink has the following options:
- throughput.report-every-ms
-
how often to report. (Integer, default:
1000
)
7.20. Twitter Message Sink
Send Direct Messages to a specified user from the authenticating user.
Requires a JSON POST body and Content-Type
header to be set to application/json
.
When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint. |
SpEL expressions are used to compute the request parameters from the input message.
7.20.1. Options
Use single quotes (' ) to wrap the literal values of the SpEL expression properties.
For example to set a fixed message text use text='Fixed Text' .
For fixed target userId use userId='666' .
|
- twitter.message.update.media-id
-
A media id to associate with the message. A Direct Message may only reference a single media id. (Expression, default:
<none>
) - twitter.message.update.screen-name
-
The screen name of the user to whom send the direct message. (Expression, default:
<none>
) - twitter.message.update.text
-
The direct message text. URL encode as necessary. Max length of 10,000 characters. (Expression, default:
payload
) - twitter.message.update.user-id
-
The user id of the user to whom send the direct message. (Expression, default:
<none>
)
7.21. Twitter Update Sink
Updates the authenticating user’s current text (e.g Tweeting).
For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row. |
While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.
You can find details for the Update API here: developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. Options
Properties grouped by prefix:
twitter.update
- attachment-url
-
(SpEL expression) In order for a URL to not be counted in the text body of an extended Tweet, provide a URL as a Tweet attachment. This URL must be a Tweet permalink, or Direct Message deep link. Arbitrary, non-Twitter URLs must remain in the text text. URLs passed to the attachment_url parameter not matching either a Tweet permalink or Direct Message deep link will fail at Tweet creation and cause an exception. (Expression, default:
<none>
) - display-coordinates
-
(SpEL expression) Whether or not to put a pin on the exact coordinates a Tweet has been sent from. (Expression, default:
<none>
) - in-reply-to-status-id
-
(SpEL expression) The ID of an existing text that the update is in reply to. Note: This parameter will be ignored unless the author of the Tweet this parameter references is mentioned within the text text. Therefore, you must include @username, where username is the author of the referenced Tweet, within the update. When inReplyToStatusId is set the auto_populate_reply_metadata is automatically set as well. Later ensures that leading @mentions will be looked up from the original Tweet, and added to the new Tweet from there. This wil append @mentions into the metadata of an extended Tweet as a reply chain grows, until the limit on @mentions is reached. In cases where the original Tweet has been deleted, the reply will fail. (Expression, default:
<none>
) - media-ids
-
(SpEL expression) A comma-delimited list of media_ids to associate with the Tweet. You may include up to 4 photos or 1 animated GIF or 1 video in a Tweet. See Uploading Media for further details on uploading media. (Expression, default:
<none>
) - place-id
-
(SpEL expression) A place in the world. (Expression, default:
<none>
) - text
-
(SpEL expression) The text of the text update. URL encode as necessary. t.co link wrapping will affect character counts. Defaults to message's payload (Expression, default:
payload
)
twitter.update.location
- lat
-
The latitude of the location this Tweet refers to. This parameter will be ignored unless it is inside the range -90.0 to +90.0 (North is positive) inclusive. It will also be ignored if there is no corresponding long parameter. (Expression, default:
<none>
) - lon
-
The longitude of the location this Tweet refers to. The valid ranges for longitude are -180.0 to +180.0 (East is positive) inclusive. This parameter will be ignored if outside that range, if it is not a number, if geo_enabled is disabled, or if there no corresponding lat parameter. (Expression, default:
<none>
)
7.22. Wavefront Sink
The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.
Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.
7.22.1. Options
The Wavefront sink has the following options:
- wavefront.api-token
-
Wavefront API access token. (String, default:
<none>
) - wavefront.metric-expression
-
A SpEL expression that evaluates to a metric value. (Expression, default:
<none>
) - wavefront.metric-name
-
The name of the metric.Defaults to the application name. (String, default:
<none>
) - wavefront.proxy-uri
-
The URL of the Wavefront proxy. (String, default:
<none>
) - wavefront.source
-
Unique application, host, container, or instance that emits metrics. (String, default:
<none>
) - wavefront.tag-expression
-
Collection of custom metadata associated with the metric.Point tags cannot be empty. Valid characters for keys: alphanumeric, hyphen ('-'), underscore ('_'), dot ('.'). For values any character is allowed, including spaces. To include a double quote, escape it with a backslash, A backslash cannot be the last character in the tag value. Maximum allowed length for a combination of a point tag key and value is 254 characters (255 including the '=' separating key and value). If the value is longer, the point is rejected and logged (Map<String, Expression>, default:
<none>
) - wavefront.timestamp-expression
-
A SpEL expression that evaluates to a timestamp of the metric (optional). (Expression, default:
<none>
) - wavefront.uri
-
The URL of the Wavefront environment. (String, default:
<none>
)
7.23. Websocket Sink
A simple Websocket Sink implementation.
7.23.1. Options
The following options are supported:
- websocket.consumer.log-level
-
the logLevel for netty channels. Default is <tt>WARN</tt> (String, default:
<none>
) - websocket.consumer.path
-
the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default:
/websocket
) - websocket.consumer.port
-
the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default:
9292
) - websocket.consumer.ssl
-
whether or not to create a {@link io.netty.handler.ssl.SslContext}. (Boolean, default:
false
) - websocket.consumer.threads
-
the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default:
1
)
7.23.2. Examples
To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
Step 1: Start Rabbitmq
Step 2: Deploy a time-source
Step 3: Deploy the websocket-sink
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. Actuators
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketconsumertrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketconsumertrace
. Here is a sample output:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser.
7.24. XMPP Sink
The "xmpp" sink enables sending messages to a XMPP server.
7.24.1. Input
-
byte[]
7.24.2. Output
Payload
N/A
7.24.3. Options
The zeromq sink has the following options:
Properties grouped by prefix:
xmpp.consumer
- chat-to
-
XMPP handle to send message to. (String, default:
<none>
)
xmpp.factory
- host
-
XMPP Host server to connect to. (String, default:
<none>
) - password
-
The Password for the connected user. (String, default:
<none>
) - port
-
Port for connecting to the host. - Default Client Port: 5222 (Integer, default:
5222
) - resource
-
The Resource to bind to on the XMPP Host. - Can be empty, server will generate one if not set (String, default:
<none>
) - security-mode
-
<documentation missing> (SecurityMode, default:
<none>
, possible values:required
,ifpossible
,disabled
) - service-name
-
The Service Name to set for the XMPP Domain. (String, default:
<none>
) - subscription-mode
-
<documentation missing> (SubscriptionMode, default:
<none>
, possible values:accept_all
,reject_all
,manual
) - user
-
The User the connection should connect as. (String, default:
<none>
)
7.24.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
7.25. ZeroMQ Sink
The "zeromq" sink enables sending messages to a ZeroMQ socket.
7.25.1. Input
-
byte[]
7.25.2. Output
Payload
N/A
7.25.3. Options
The zeromq sink has the following options:
- zeromq.consumer.connect-url
-
Connection URL for connecting to the ZeroMQ Socket. (String, default:
<none>
) - zeromq.consumer.socket-type
-
The Socket Type the connection should establish. (SocketType, default:
<none>
, possible values:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
,CLIENT
,SERVER
,RADIO
,DISH
,CHANNEL
,PEER
,RAW
,SCATTER
,GATHER
) - zeromq.consumer.topic
-
A Topic SpEL expression to evaluate a topic before sending messages to subscribers. (Expression, default:
<none>
)
7.25.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
7.25.5. Examples
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=
Appendices
Appendix A: Contributing
Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.
A.1. Sign the Contributor License Agreement
Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.
A.2. Code Conventions and Housekeeping
None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.
-
Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the
eclipse-code-formatter.xml
file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file. -
Make sure all new
.java
files to have a simple Javadoc class comment with at least an@author
tag identifying you, and preferably at least a paragraph on what the class is for. -
Add the ASF license header comment to all new
.java
files (copy from existing files in the project) -
Add yourself as an
@author
to the .java files that you modify substantially (more than cosmetic changes). -
Add some Javadocs and, if you change the namespace, some XSD doc elements.
-
A few unit tests would help a lot as well — someone has to do it.
-
If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).
-
When writing a commit message please follow these conventions, if you are fixing an existing issue please add
Fixes gh-XXXX
at the end of the commit message (where XXXX is the issue number).