2. Sources

2.1 File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.1.1 Input

N/A (Reads from a file system directory).

2.1.2 Output

mode = contents

Headers:
  • Content-Type: application/octet-stream
  • file_originalFile: <java.io.File>
  • file_name: <file name>
Payload:

A byte[] filled with the file contents.

mode = lines

Headers:
  • Content-Type: text/plain
  • file_originalFile: <java.io.File>
  • file_name: <file name>
  • correlationId: <UUID> (same for each line)
  • sequenceNumber: <n>
  • sequenceSize: 0 (number of lines is not know until the file is read)
Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref

Headers:

None.

Payload:

A java.io.File object.

2.1.3 Options

The file source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
file.directory
The directory to poll for new files. (String, default: <none>)
file.filename-pattern
A simple ant pattern to match files. (String, default: <none>)
file.filename-regex
A regex pattern to match files. (Pattern, default: <none>)
file.prevent-duplicates
Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default: true)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

The ref option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.

2.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.1.5 Examples

java -jar file_source.jar --file.directory=/tmp/foo --file.consumer.mode=lines --trigger.fixed-delay=60

2.2 FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

See also MetaDataStore options for possible shared persistent store configuration for the FtpPersistentAcceptOnceFileListFilter used in the FTP Source.

2.2.1 Input

N/A (Fetches files from an FTP server).

2.2.2 Output

mode = contents

Headers:
  • Content-Type: application/octet-stream
  • file_originalFile: <java.io.File>
  • file_name: <file name>
Payload:

A byte[] filled with the file contents.

mode = lines

Headers:
  • Content-Type: text/plain
  • file_orginalFile: <java.io.File>
  • file_name: <file name>
  • correlationId: <UUID> (same for each line)
  • sequenceNumber: <n>
  • sequenceSize: 0 (number of lines is not know until the file is read)
Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref

Headers:

None.

Payload:

A java.io.File object.

2.2.3 Options

The ftp source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
ftp.auto-create-local-dir
Set to true to create the local directory if it does not exist. (Boolean, default: true)
ftp.delete-remote-files
Set to true to delete remote files after successful transfer. (Boolean, default: false)
ftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
ftp.factory.client-mode
The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)
ftp.factory.host
<documentation missing> (String, default: <none>)
ftp.factory.password
<documentation missing> (String, default: <none>)
ftp.factory.port
The port of the server. (Integer, default: 21)
ftp.factory.username
<documentation missing> (String, default: <none>)
ftp.filename-pattern
A filter pattern to match the names of files to transfer. (String, default: <none>)
ftp.filename-regex
A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)
ftp.local-dir
The local directory to use for file transfers. (File, default: <none>)
ftp.preserve-timestamp
Set to true to preserve the original timestamp. (Boolean, default: true)
ftp.remote-dir
The remote FTP directory. (String, default: /)
ftp.remote-file-separator
The remote file separator. (String, default: /)
ftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.2.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.2.5 Examples

java -jar ftp_source.jar --ftp.remote-dir=foo --file.mode=lines --trigger.fixed-delay=60 --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

2.3 Gemfire Source

This source allows you to subscribe to any creates or updates to a Gemfire region. The application configures a client cache and client region, along with the necessary subscriptions enabled. By default the payload contains the updated entry value, but may be controlled by passing in a SpEL expression that uses the EntryEvent as the evaluation context.

To enable SSL communication between Geode Source and the Geode cluster you need to provide the URIs of the Keystore and Truststore files using the gemfire.security.ssl.keystore-uri and gemfire.security.ssl.truststore-uri properties. (If a single file is ued for both stores then point both URIs to it).

2.3.1 Input

N/A

2.3.2 Output

Headers

  • content-type: text/plain

Payload

  • String

2.3.3 Options

The gemfire source supports the following configuration properties:

gemfire.client.pdx-read-serialized
Deserialize the Geode objects into PdxInstance instead of the domain class. (Boolean, default: false)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.region.region-name
The region name. (String, default: <none>)
gemfire.security.password
The cache password. (String, default: <none>)
gemfire.security.ssl.ciphers
Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default: any)
gemfire.security.ssl.keystore-type
Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.keystore-uri
Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.ssl-keystore-password
Password for accessing the keys truststore (String, default: <none>)
gemfire.security.ssl.ssl-truststore-password
Password for accessing the trust store. (String, default: <none>)
gemfire.security.ssl.truststore-type
Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.truststore-uri
Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.user-home-directory
Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default: user.home)
gemfire.security.username
The cache username. (String, default: <none>)
gemfire.source.cache-event-expression
SpEL expression to extract fields from a cache event. (Expression, default: <none>)

2.3.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.3.5 Examples

java -jar ./gemfire-source.jar --gemfire.region.region-name=MyRegion --gemfire.source.cacheEventExpression="newValue"

2.4 Gemfire-CQ Source

Continuous query allows client applications to create a GemFire query using Object Query Language (OQL) and to register a CQ listener which subscribes to the query and is notified every time the query’s result set changes. The gemfire-cq source registers a CQ which will post CQEvent messages to the stream.

To enable SSL communication between Geode CQ and the Geode cluster you need to provide the URIs of the Keystore and Truststore files using the gemfire.security.ssl.keystore-uri and gemfire.security.ssl.truststore-uri properties. (If a single file is ued for both stores then point both URIs to it).

2.4.1 Input

N/A

2.4.2 Output

Headers

  • content-type: text/plain

Payload

  • String

2.4.3 Options

The gemfire-cq source supports the following configuration properties:

gemfire.client.pdx-read-serialized
Deserialize the Geode objects into PdxInstance instead of the domain class. (Boolean, default: false)
gemfire.cq.event-expression
SpEL expression to use to extract data from a cq event. (Expression, default: <none>)
gemfire.cq.query
The OQL query (String, default: <none>)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.security.password
The cache password. (String, default: <none>)
gemfire.security.ssl.ciphers
Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default: any)
gemfire.security.ssl.keystore-type
Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.keystore-uri
Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.ssl-keystore-password
Password for accessing the keys truststore (String, default: <none>)
gemfire.security.ssl.ssl-truststore-password
Password for accessing the trust store. (String, default: <none>)
gemfire.security.ssl.truststore-type
Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.truststore-uri
Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.user-home-directory
Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default: user.home)
gemfire.security.username
The cache username. (String, default: <none>)

2.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.4.5 Examples

java -jar gemfire-cq-source.jar --gemfire.cq.query=

2.5 Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

2.5.1 Input

N/A

2.5.2 Output

Headers:

  • Content-Type: Any

Payload:

If content type matches text/* or application/json

  • String

If content type does not match text/* or application/json

  • byte array

2.5.3 Options

The http source supports the following configuration properties:

http.cors.allow-credentials
Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default: <none>)
http.cors.allowed-headers
List of request headers that can be used during the actual request. (String[], default: <none>)
http.cors.allowed-origins
List of allowed origins, e.g. "https://domain1.com". (String[], default: <none>)
http.mapped-request-headers
Headers that will be mapped. (String[], default: <none>)
http.path-pattern
An Ant-Style pattern to determine which http requests will be captured. (String, default: /)
server.port
Server HTTP port. (Integer, default: 8080)
[Note]Note

Security is disabled for this application by default. To enable it, you should use the mentioned above http.enable-security = true property.

2.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.5.5 Examples

java -jar http_source.jar

2.6 JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

2.6.1 Input

N/A

2.6.2 Output

Headers

  • Content-Type: application/x-java-object

Payload

  • Map<String, Object> when jdbc.split == true (default) and List<Map<String, Object>> otherwise

2.6.3 Options

The jdbc source has the following options:

jdbc.max-rows-per-poll
Max numbers of rows to process for each poll. (Integer, default: 0)
jdbc.query
The query to use to select data. (String, default: <none>)
jdbc.split
Whether to split the SQL result as individual messages. (Boolean, default: true)
jdbc.update
An SQL update statement to execute for marking polled messages as 'seen'. (String, default: <none>)
spring.datasource.data
Data (DML) script resource references. (List<String>, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.initialization-mode
Initialize the datasource using available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.schema
Schema (DDL) script resource references. (List<String>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

2.6.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.6.5 Examples

java -jar jdbc_source.jar --query=<QUERY> [datasource credentials]

2.7 JMS Source

The "jms" source enables receiving messages from JMS.

2.7.1 Input

N/A

2.7.2 Output

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/octet-stream

Payload

  • byte[]

Headers

  • content-type: application/x-java-serialized-object

Payload

  • java.io.Serializable

Headers

  • content-type: application/x-java-object

Payload

  • Map

2.7.3 Options

The jms source has the following options:

jms.client-id
Client id for durable subscriptions. (String, default: <none>)
jms.destination
The destination from which to receive messages (queue or topic). (String, default: <none>)
jms.message-selector
A selector for messages; (String, default: <none>)
jms.session-transacted
True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default: true)
jms.subscription-durable
True for a durable subscription. (Boolean, default: <none>)
jms.subscription-name
The name of a durable or shared subscription. (String, default: <none>)
jms.subscription-shared
True for a shared subscription. (Boolean, default: <none>)
spring.jms.jndi-name
Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default: <none>)
spring.jms.listener.acknowledge-mode
Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default: <none>, possible values: AUTO,CLIENT,DUPS_OK)
spring.jms.listener.auto-startup
Start the container automatically on startup. (Boolean, default: true)
spring.jms.listener.concurrency
Minimum number of concurrent consumers. (Integer, default: <none>)
spring.jms.listener.max-concurrency
Maximum number of concurrent consumers. (Integer, default: <none>)
spring.jms.pub-sub-domain
Whether the default destination type is topic. (Boolean, default: false)
[Note]Note

Spring boot broker configuration is used; refer to the Spring Boot Documentation for more information. The spring.jms.* properties above are also handled by the boot JMS support.

2.7.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.7.5 Examples

java -jar jms-source.jar --jms.destination=

2.8 Load Generator Source

A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.

2.8.1 Input

N/A

2.8.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

2.8.3 Options

The load-generator source has the following options:

load-generator.generate-timestamp
<documentation missing> (Boolean, default: false)
load-generator.message-count
<documentation missing> (Integer, default: 1000)
load-generator.message-size
<documentation missing> (Integer, default: 1000)
load-generator.producers
<documentation missing> (Integer, default: 1)

2.8.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.8.5 Examples

java -jar load-generator-source.jar

2.9 Loggregator Source

A source that can be used to read logging messages from Cloud Foundry Loggregator.

2.9.1 Input

N/A

2.9.2 Output

Headers:

  • Content-Type: text/plain

Payload:

A String with the log message.

2.9.3 Options

The loggregator source has the following options:

loggregator.application-name
<documentation missing> (String, default: <none>)
loggregator.cloud-foundry-api
<documentation missing> (String, default: <none>)
loggregator.cloud-foundry-password
<documentation missing> (String, default: <none>)
loggregator.cloud-foundry-user
<documentation missing> (String, default: <none>)

2.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.9.5 Examples

java -jar loggregator-source.jar --cloudFoundryApi="api" --cloudFoundryUser="CF user" --cloudFoundryPassword="CF password \
    --applicationName=app-name

2.10 Mail Source

A source module that listens for Emails and emits the message body as a message payload.

2.10.1 Input

N/A

2.10.2 Output

Headers

  • content-type: text/plain

Payload

  • String

2.10.3 Options

The mail source supports the following configuration properties:

mail.charset
The charset for byte[] mail-to-string transformation. (String, default: UTF-8)
mail.delete
Set to true to delete email after download. (Boolean, default: false)
mail.expression
Configure a SpEL expression to select messages. (String, default: true)
mail.idle-imap
Set to true to use IdleImap Configuration. (Boolean, default: false)
mail.java-mail-properties
JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
mail.mark-as-read
Set to true to mark email as read. (Boolean, default: false)
mail.url
Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default: <none>)
mail.user-flag
The flag to mark messages when the server does not support \Recent (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.10.5 Examples

java -jar mail-source.jar --mail.javaMailProperties= --mail.url= --mail.expression= \
    --mail.charset= --mail.userFlag=

2.11 MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

2.11.1 Input

N/A

2.11.2 Output

Headers:

  • Content-Type: text/plain

Payload:

  • String

2.11.3 Options

The mongodb source has the following options:

mongodb.collection
The MongoDB collection to query (String, default: <none>)
mongodb.query
The MongoDB query (String, default: { })
mongodb.query-expression
The SpEL expression in MongoDB query DSL style (Expression, default: <none>)
mongodb.split
Whether to split the query result as individual messages. (Boolean, default: true)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with URI. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with URI. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: mongodb://localhost/test)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with URI. (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

2.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.11.5 Examples

java -jar mongodb-source.jar --mongodb.query= --mongodb.collection=

2.12 MQTT Source

The "mqtt" source enables receiving messages from MQTT.

2.12.1 Input

N/A

2.12.2 Output

Headers:

Payload:

  • String if binary setting is false (default)
  • byte[] if binary setting is true

2.12.3 Options

The mqtt source has the following options:

mqtt.binary
true to leave the payload as bytes (Boolean, default: false)
mqtt.charset
the charset used to convert bytes to String (when binary is false) (String, default: UTF-8)
mqtt.clean-session
whether the client and server should remember state across restarts and reconnects (Boolean, default: true)
mqtt.client-id
identifies the client (String, default: stream.client.id.source)
mqtt.connection-timeout
the connection timeout in seconds (Integer, default: 30)
mqtt.keep-alive-interval
the ping interval in seconds (Integer, default: 60)
mqtt.password
the password to use when connecting to the broker (String, default: guest)
mqtt.persistence
'memory' or 'file' (String, default: memory)
mqtt.persistence-directory
Persistence directory (String, default: /tmp/paho)
mqtt.qos
the qos; a single value for all topics or a comma-delimited list to match the topics (Integer[], default: [0])
mqtt.topics
the topic(s) (comma-delimited) to which the source will subscribe (String[], default: [stream.mqtt])
mqtt.url
location of the mqtt broker(s) (comma-delimited list) (String[], default: [tcp://localhost:1883])
mqtt.username
the username to use when connecting to the broker (String, default: guest)

2.12.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.12.5 Examples

java -jar mqtt-source.jar --mqtt.clientId=

2.13 RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

2.13.1 Input

N/A

2.13.2 Output

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/octet-stream

Payload

  • byte[]

Headers

  • content-type: application/x-java-serialized-object

Payload

  • java.io.Serializable

2.13.3 Options

The rabbit source has the following options:

rabbit.enable-retry
true to enable retry. (Boolean, default: false)
rabbit.initial-retry-interval
Initial retry interval when retry is enabled. (Integer, default: 1000)
rabbit.mapped-request-headers
Headers that will be mapped. (String[], default: [STANDARD_REQUEST_HEADERS])
rabbit.max-attempts
The maximum delivery attempts when retry is enabled. (Integer, default: 3)
rabbit.max-retry-interval
Max retry interval when retry is enabled. (Integer, default: 30000)
rabbit.own-connection
When true, use a separate connection based on the boot properties. (Boolean, default: false)
rabbit.queues
The queues to which the source will listen for messages. (String[], default: <none>)
rabbit.requeue
Whether rejected messages should be requeued. (Boolean, default: true)
rabbit.retry-multiplier
Retry backoff multiplier when retry is enabled. (Double, default: 2)
rabbit.transacted
Whether the channel is transacted. (Boolean, default: false)
spring.rabbitmq.addresses
Comma-separated list of addresses to which the client should connect. (String, default: <none>)
spring.rabbitmq.connection-timeout
Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)
spring.rabbitmq.host
RabbitMQ host. (String, default: localhost)
spring.rabbitmq.password
Login to authenticate against the broker. (String, default: guest)
spring.rabbitmq.port
RabbitMQ port. (Integer, default: 5672)
spring.rabbitmq.publisher-confirms
Whether to enable publisher confirms. (Boolean, default: false)
spring.rabbitmq.publisher-returns
Whether to enable publisher returns. (Boolean, default: false)
spring.rabbitmq.requested-heartbeat
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)
spring.rabbitmq.username
Login user to authenticate to the broker. (String, default: guest)
spring.rabbitmq.virtual-host
Virtual host to use when connecting to the broker. (String, default: <none>)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry

[Note]Note

With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

2.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.13.5 Examples

java -jar rabbit-source.jar --rabbit.queues=

2.14 Amazon S3 Source

This source app supports transfer of files using the Amazon S3 protocol. Files are transferred from the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.14.1 Input

N/A

2.14.2 Output

mode = contents

Headers:
  • Content-Type: application/octet-stream
  • file_orginalFile: <java.io.File>
  • file_name: <file name>
Payload:

A byte[] filled with the file contents.

mode = lines

Headers:
  • Content-Type: text/plain
  • file_orginalFile: <java.io.File>
  • file_name: <file name>
  • correlationId: <UUID> (same for each line)
  • sequenceNumber: <n>
  • sequenceSize: 0 (number of lines is not know until the file is read)
Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref

Headers:

None.

Payload:

A java.io.File object.

2.14.3 Options

The s3 source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
s3.auto-create-local-dir
Create or not the local directory. (Boolean, default: true)
s3.delete-remote-files
Delete or not remote files after processing. (Boolean, default: false)
s3.filename-pattern
The pattern to filter remote files. (String, default: <none>)
s3.filename-regex
The regexp to filter remote files. (Pattern, default: <none>)
s3.local-dir
The local directory to store files. (File, default: <none>)
s3.preserve-timestamp
To transfer or not the timestamp of the remote file to the local one. (Boolean, default: true)
s3.remote-dir
AWS S3 bucket resource. (String, default: bucket)
s3.remote-file-separator
Remote File separator. (String, default: /)
s3.tmp-file-suffix
Temporary file suffix. (String, default: .tmp)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.14.4 Amazon AWS common options

The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey
  • cloud.aws.credentials.secretKey
  • cloud.aws.credentials.instanceProfile
  • cloud.aws.credentials.profileName
  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto
  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto
  • cloud.aws.stack.name

2.14.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.14.6 Examples

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines --trigger.fixed-delay=60

2.15 SFTP Source

This source app supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

See also MetadataStore options for possible shared persistent store configuration for the SftpPersistentAcceptOnceFileListFilter and IdempotentReceiverInterceptor used in the SFTP Source.

2.15.1 Multiple SFTP Servers

This source supports polling multiple sftp servers. This requires configuring multiple session factories. The following configuration will poll two sftp servers, consuming files in a round-robin fashion:

sftp.factories.one.host=host1
sftp.factories.one.port=1234,
sftp.factories.one.username = user1,
sftp.factories.one.password = pass1,
...
sftp.factories.two.host=host2,
sftp.factories.two.port=2345,
sftp.factories.two.username = user2,
sftp.factories.two.password = pass2,
sftp.directories=one.sftpSource,two.sftpSecondSource,
sftp.max-fetch=1,
sftp.fair=true

[Note]Note

The TaskLaunchRequest output functionality is currently supported here for legacy reasons. If you are interested in this feature, we recommend using the sftp-datafow-source which is intended specifically for this use case. A task launch request posted to the Data Flow Server API is much simpler to use than the TaskLaunchRequest supported by this app which supports launching tasks using one of the provided platform specific task launchers. Using a platform specific task launcher makes it possible to launch tasks when a Data Flow server is not deployed, but requires several additional configuration parameters.

2.15.2 Input

N/A (Fetches files from an SFTP server).

2.15.3 Output

mode = contents

Headers:
  • Content-Type: application/octet-stream
  • file_originalFile: <java.io.File>
  • file_name: <file name>
Payload:

A byte[] filled with the file contents.

mode = lines

Headers:
  • Content-Type: text/plain
  • file_originalFile: <java.io.File>
  • file_name: <file name>
  • correlationId: <UUID> (same for each line)
  • sequenceNumber: <n>
  • sequenceSize: 0 (number of lines is not know until the file is read)
Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref

Headers:

None.

Payload:

A java.io.File object.

task-launcher-output = true

Headers:
  • Content-Type: application/json
  • file_remoteDirectory: <java.lang.String>
Payload:

A TaskLaunchRequest object with the following set as command line arguments (also bound to job parameters for Spring Batch):

  • <task.local-file-path-parameter-name>=<task.local-file-path-parameter-value>
  • <task.remote-file-path-parameter-name>=<task.remote-file-path-parameter-value>
  • Any provided`task.parameters`

task.resource-uri is required. task.deployment-properties and task.environment-properties are optional.

2.15.4 Options

The sftp source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
sftp.auto-create-local-dir
Set to true to create the local directory if it does not exist. (Boolean, default: true)
sftp.delete-remote-files
Set to true to delete remote files after successful transfer. (Boolean, default: false)
sftp.directories
A list of factory "name.directory" pairs. (String[], default: <none>)
sftp.factories
A map of factory names to factories. (Map<String, Factory>, default: <none>)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.host
The host name of the server. (String, default: localhost)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
The password to use to connect to the server. (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (Resource, default: <none>)
sftp.factory.username
The username to use to connect to the server. (String, default: <none>)
sftp.fair
True for fair polling of multiple servers/directories. (Boolean, default: false)
sftp.filename-pattern
A filter pattern to match the names of files to transfer. (String, default: <none>)
sftp.filename-regex
A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)
sftp.list-only
Set to true to return file metadata without the entire payload. (Boolean, default: false)
sftp.local-dir
The local directory to use for file transfers. (File, default: <none>)
sftp.max-fetch
The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default: <none>)
sftp.preserve-timestamp
Set to true to preserve the original timestamp. (Boolean, default: true)
sftp.remote-dir
The remote FTP directory. (String, default: /)
sftp.remote-file-separator
The remote file separator. (String, default: /)
sftp.stream
Set to true to stream the file rather than copy to a local directory. (Boolean, default: false)
sftp.task-launcher-output
Set to true to create output suitable for a task launch request. (Boolean, default: false)
sftp.task.application-name
The task application name. (String, default: <none>)
sftp.task.data-source-password
The datasource password to be applied to the TaskLaunchRequest. (String, default: <none>)
sftp.task.data-source-url
The datasource url to be applied to the TaskLaunchRequest. Defaults to h2 in-memory JDBC datasource url. (String, default: jdbc:h2:tcp://localhost:19092/mem:dataflow)
sftp.task.data-source-user-name
The datasource user name to be applied to the TaskLaunchRequest. Defaults to "sa" (String, default: sa)
sftp.task.deployment-properties
Comma delimited list of deployment properties to be applied to the TaskLaunchRequest. (String, default: <none>)
sftp.task.environment-properties
Comma delimited list of environment properties to be applied to the TaskLaunchRequest. (String, default: <none>)
sftp.task.local-file-path-parameter-name
Value to use as the local file parameter name. (String, default: localFilePath)
sftp.task.local-file-path-parameter-value
The file path to use as the local file parameter value. Defaults to "java.io.tmpdir". (String, default: <none>)
sftp.task.parameters
Comma separated list of optional parameters in key=value format. (List<String>, default: <none>)
sftp.task.remote-file-path-parameter-name
Value to use as the remote file parameter name. (String, default: remoteFilePath)
sftp.task.resource-uri
The URI of the task artifact to be applied to the TaskLaunchRequest. (String, default: <empty string>)
sftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.15.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.15.6 Examples

java -jar sftp_source.jar --sftp.remote-dir=foo --file.consumer.mode=lines --trigger.fixed-delay=60 \
         --sftp.factory.host=sftpserver --sftp.factory.username=user --sftp.factory.password=pw --sftp.local-dir=/foo

2.16 SFTP Data Flow Source

This is a specialized version of the sftp source which always outputs a Data Flow Task Launch Request as JSON, compatible with the tasklauncher-data-flow sink.

[Note]Note

A task launch request posted to the Data Flow Server API is much simpler to use than the TaskLaunchRequest`supported by the original `sftp source, designed to launch tasks using one of the provided platform specific tasklaunchers. Using a platform specific task launcher makes it possible to launch tasks when a Data Flow server is not deployed, but requires several additional configuration parameters.

This source transfers files using the SFTP protocol from the remote directory to the local directory where the application is deployed.

[Note]Note

When running on a cloud platform such as (Cloud Foundry 2.3+ or Kubernetes), local-dir should be set to a shared mount path provided by platform volume services which can also be accessed from the launched task container.

Optionally, if 'list-only=true', the Task Launch Request provides command line arguments to enable the launched task to access the remote file from the SFTP server. Using a local or shared volume is the recommended approach. The 'list-only` option is only provided as a fallback for cases in which a shared volume is not an option.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

See also MetadataStore options for possible shared persistent store configuration for the FtpPersistentAcceptOnceFileListFilter and IdempotentReceiverInterceptor used in the SFTP Source.

2.16.1 Multiple SFTP Servers

This source supports polling multiple sftp servers. This requires configuring multiple session factories. The following configuration will poll two SFTP servers, consuming files in a round-robin fashion.

sftp.factories.one.host=host1
sftp.factories.one.port=1234,
sftp.factories.one.username = user1,
sftp.factories.one.password = pass1,
...
sftp.factories.two.host=host2,
sftp.factories.two.port=2345,
sftp.factories.two.username = user2,
sftp.factories.two.password = pass2,
sftp.directories=one.sftpSource,two.sftpSecondSource,
sftp.max-fetch=1,
sftp.fair=true

Multi-source task names

Just as for a single soure, the task name may be configured statically to launch the same task to process files from any server, by setting either task.launch.request.task-name or task.launch.request.task-name-expression. To launch a different task for each remote source, set the corresponding sftp.multisource.task-names properties. For example,

sftp.multisource.task-names.one=task1,
sftp.multisource.task-names.two=task2

2.16.2 Input

N/A (Fetches files from an SFTP server).

2.16.3 Output

Headers:

  • Content-Type: application/json

Payload:

A DataFlow Task Launch request as JSON: {"name":"<task-name>", "deploymentProps":{"key"="val",…​}, "args":[]} with the following set as command line arguments (also bound to job parameters for Spring Batch):

  • localFilePath=the local file path

if list-only=true,

  • remoteFilePath= the remote file path, plus sftp connection parameters
  • Any provided task.launch.request.args
  • The task name must be the same as a Data Flow task definition given by one of the available options listed below.

2.16.4 Options

The sftp dataflow source has the following options:

sftp.auto-create-local-dir
Set to true to create the local directory if it does not exist. (Boolean, default: true)
sftp.delete-remote-files
Set to true to delete remote files after successful transfer. (Boolean, default: false)
sftp.directories
A list of factory "name.directory" pairs. (String[], default: <none>)
sftp.factories
A map of factory names to factories. (Map<String, Factory>, default: <none>)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.host
The host name of the server. (String, default: localhost)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
The password to use to connect to the server. (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (Resource, default: <none>)
sftp.factory.username
The username to use to connect to the server. (String, default: <none>)
sftp.fair
True for fair polling of multiple servers/directories. (Boolean, default: false)
sftp.filename-pattern
A filter pattern to match the names of files to transfer. (String, default: <none>)
sftp.filename-regex
A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)
sftp.list-only
Set to true to return file metadata without the entire payload. (Boolean, default: false)
sftp.local-dir
The local directory to use for file transfers. (File, default: <none>)
sftp.max-fetch
The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default: <none>)
sftp.multisource.task-names
Map of task names to multi-source server keys. (Map<String, String>, default: <none>)
sftp.preserve-timestamp
Set to true to preserve the original timestamp. (Boolean, default: true)
sftp.remote-dir
The remote FTP directory. (String, default: /)
sftp.remote-file-separator
The remote file separator. (String, default: /)
task.launch.request.arg-expressions
Comma separated list of option args as SpEL expressions in key=value format. (String, default: <empty string>)
task.launch.request.args
Comma separated list of optional args in key=value format. (List<String>, default: <none>)
task.launch.request.deployment-properties
Comma delimited list of deployment properties to be applied to the TaskLaunchRequest. (String, default: <empty string>)
task.launch.request.task-name
The Data Flow task name. (String, default: <none>)
task.launch.request.task-name-expression
A SpEL expression to extract the task name from each Message, using the Message as the evaluation context. (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.16.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.16.6 Examples

java -jar sftp_dataflow_source.jar --task.launch.request.task-name=myTask --sftp.remote-dir=foo --trigger
.fixed-delay=60 \
         --sftp.factory.host=sftpserver --sftp.factory.username=user --sftp.factory.password=pw --sftp.local-dir=/foo

If the original payload is a File object, you may be able to leverage file name conventions to assign a different task name based on the file name:

java -jar sftp_dataflow_source.jar --task.launch.request.task-name-expression='task-'+payload.name.substring(0,5) --sftp.remote-dir=foo --trigger
.fixed-delay=60 \
         --sftp.factory.host=sftpserver --sftp.factory.username=user --sftp.factory.password=pw --sftp.local-dir=/foo

2.17 SYSLOG Source

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

2.17.1 Input

N/A

2.17.2 Output

Headers

  • content-type: application/json

Payload

  • Map of field/values

2.17.3 Options

The syslog source has the following options:

syslog.buffer-size
the buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
syslog.nio
whether or not to use NIO (when supporting a large number of connections). (Boolean, default: false)
syslog.port
The port to listen on. (Integer, default: 1514)
syslog.protocol
tcp or udp (String, default: tcp)
syslog.reverse-lookup
whether or not to perform a reverse lookup on the incoming socket. (Boolean, default: false)
syslog.rfc
'5424' or '3164' - the syslog format according the the RFC; 3164 is aka 'BSD' format. (String, default: 3164)
syslog.socket-timeout
the socket timeout. (Integer, default: 0)

2.17.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

2.17.5 Examples

java -jar syslog-source.jar --syslog.rfc=5424 --syslog.protocol=tcp

2.18 TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

2.18.1 Input

N/A

2.18.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

2.18.3 Options

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.nio
Whether or not to use NIO. (Boolean, default: false)
tcp.port
The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)
tcp.reverse-lookup
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)
tcp.socket-timeout
The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)
tcp.use-direct-buffers
Whether or not to use direct buffers. (Boolean, default: false)

2.18.4 Available Decoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

2.18.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.18.6 Examples

java -jar tcp_source.jar --tcp.decoder=LF

The "decoder" property determines the message format (default is termination with CRLF).

2.19 TCP Client as a Source which connects to a TCP server and receives data

2.19.1 Input

N/A

2.19.2 Output

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

2.19.3 Options

The tcp-client source has the following options:

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this client will connect. (String, default: localhost)
tcp.nio
Whether or not to use NIO. (Boolean, default: false)
tcp.port
The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)
tcp.retry-interval
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
tcp.reverse-lookup
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)
tcp.socket-timeout
The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)
tcp.use-direct-buffers
Whether or not to use direct buffers. (Boolean, default: false)

2.19.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.19.5 Examples

java -jar tcp_client_source.jar --tcp.decoder=LF

2.20 Time Source

The time source will simply emit a String with the current time every so often.

2.20.1 Input

N/A

2.20.2 Output

Headers:

  • Content-Type: text/plain

Payload:

A String with the time output.

2.20.3 Options

The time source has the following options:

trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.20.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.20.5 Examples

java -jar time-source.jar

2.21 Trigger Source

This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can also be sent each time the trigger fires.

2.21.1 Input

N/A

2.21.2 Output

Headers:

Payload:

  • Any

2.21.3 Options

The trigger source has the following options:

trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.source.payload
The expression for the payload of the Source module. (Expression, default: <none>)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.21.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.21.5 Examples

java -jar trigger_source.jar --trigger.source.payload=payload-expression

2.22 Twitter Stream Source

This source ingests data from Twitter’s streaming API. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).

You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.

2.22.1 Input

N/A

2.22.2 Output

Headers

  • Content-Type: text/plain

Payload

  • String

2.22.3 Options

The twitterstream source has the following options:

twitter.credentials.access-token
Access token (String, default: <none>)
twitter.credentials.access-token-secret
Access token secret (String, default: <none>)
twitter.credentials.consumer-key
Consumer key (String, default: <none>)
twitter.credentials.consumer-secret
Consumer secret (String, default: <none>)
twitter.stream.follow
A comma separated list of user IDs, indicating the users to return statuses for in the stream. (String, default: <none>)
twitter.stream.language
The language of the tweet text. (String, default: <none>)
twitter.stream.locations
A set of bounding boxes to track. (String, default: <none>)
twitter.stream.stream-type
Twitter stream type (such as sample, firehose). Default is sample. (TwitterStreamType, default: <none>, possible values: SAMPLE,FIREHOSE,FILTER)
twitter.stream.track
Keywords to track. (String, default: <none>)
[Note]Note

twitterstream emit JSON in the native Twitter format.

2.22.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

2.22.5 Examples

java -jar twitter_stream_source.jar --twitter.credentials.consumerKey=<CONSUMER_KEY> --twitter.credentials.consumerSecret=<CONSUMER_SECRET> \
    --twitter.credentials.accessToken=<ACCESS_TOKEN> --twitter.credentials.accessTokenSecret=<ACCESS_TOKEN_SECRET>

2.23 CDC Source

Change Data Capture (CDC) source that captures and streams change events from various databases. Currently it supports MySQL, PostgreSQL, MongoDB, Oracle and SQL Server databases.

Build upon Debezium Embedded Connector, the CDC Source allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.

All Debezium configuration properties are supported. Just prefix the properties with the cdc.config. prefix. For example to set the connector.class use the cdc.config.connector.class instead.

For continence some of the most relevant properties are provided with springified shortcuts to allow easy configuration and auto-completion features. For example instead of cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector you can use the shorter cdc.connector=mysql shortcut. Complete list of shortcuts and their Debezium counterparts are listed in the table below. When both the Debezium (e.g. cdc.config.XXX) and the shortcut are set for the same property then the cdc.config.XXX has a precedence!

Also we have provided a new default BackingOffsetStore based on the MetadataStore service allowing to leverage the microservices friendly ways for storing the offset metadata.

2.23.1 Options

cdc.config
Spring pass-trough wrapper for the debezium configuration properties. All properties with 'cdc.config' prefix are converted into Debezium io.debezium.config.Configuration and the prefix is dropped. (Map<String, String>, default: <none>)
cdc.connector
Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other. (ConnectorType, default: <none>, possible values: mysql,postgres,mongodb,oracle,sqlserver)
cdc.flattering.add-source-fields
Fields from the change event’s source structure to add as metadata (prefixed with "__") to the flattened record (String, default: <none>)
cdc.flattering.delete-handling-mode
How to handle delete records. Options are: (1) none - records are passed, (2) drop - records are removed and (3) rewrite - adds '__deleted' field to the records. (DeleteHandlingMode, default: <none>, possible values: drop,rewrite,none)
cdc.flattering.drop-tombstones
Debezium by default generates a tombstone record to enable Kafka compaction after a delete record was generated. This record is usually filtered out to avoid duplicates as a delete record is converted to a tombstone record, too. (Boolean, default: true)
cdc.flattering.enabled
Enable flattering the source record events (https://debezium.io/docs/configuration/event-flattening). (Boolean, default: true)
cdc.flattering.operation-header
The adds the event operation (as obtained from the op field of the original record) as a message header called cdc_operation (Boolean, default: false)
cdc.name
Unique name for this sourceConnector instance. (String, default: <none>)
cdc.offset.commit-timeout
Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. (Duration, default: 5000ms)
cdc.offset.flush-interval
Interval at which to try committing offsets. The default is 1 minute. (Duration, default: 60000ms)
cdc.offset.policy
Offset storage commit policy. (OffsetPolicy, default: <none>)
cdc.offset.storage
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading. (OffsetStorageType, default: <none>, possible values: memory,file,kafka,metadata)
cdc.schema
If set then the value's schema is included as part of the the outbound message. (Boolean, default: false)
cdc.stream.header.offset
When true the source record's offset metadata is serialized into the outbound message header under cdc.offset. (Boolean, default: false) e == Database Support

The CDC Source is based on Debezium, which currently support the following five datastores: MySQL, PostgreSQL, MongoDB, Oracle and SQL Server databases.

In order to run the CdcSourceIntegrationTests integration tests you need to configure source required source databases.

Instructions below explains how to run pre-configured test databases form Docker images.

MySQL

Start the debezium/example-mysql in a docker:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
[Tip]Tip

(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

Use following properties to connect the CDC Source to the MySQL DB:

cdc.connector=mysql 1

cdc.name=my-sql-connector 2
cdc.config.database.server.id=85744 3
cdc.config.database.server.name=my-app-connector 4

cdc.config.database.user=debezium  5
cdc.config.database.password=dbz 6
cdc.config.database.hostname=localhost 7
cdc.config.database.port=3306 8

cdc.schema=true 9
cdc.flattering.enabled=true 10

1

Configures the CDC Source to use MySqlConnector. (equivalent to setting cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector).

2 3 4

Metadata used to identify and dispatch the incoming events.

5 6 7 8

Connection to the MySQL server running on localhost:3306 as debezium user.

9

Includes the Change Event Value schema in the SourceRecord events.

10

Enables the CDC Event Flattering.

You can run also the CdcSourceIntegrationTests#CdcMysqlTests using this mysql configuration.

PostgreSQL

Start a pre-configured postgres server from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0

You can connect to this server like this:

psql -U postgres -h localhost -p 5432

Use following properties to connect the CDC Source to the PostgreSQL:

cdc.connector=postgres 1
cdc.offset.storage=memory 2

cdc.name=my-sql-connector 3
cdc.config.database.server.id=85744 4
cdc.config.database.server.name=my-app-connector 5

cdc.config.database.user=postgres  6
cdc.config.database.password=postgres 7
cdc.config.database..dbname=postgres 8
cdc.config.database.hostname=localhost 9
cdc.config.database.port=5432 10

cdc.schema=true 11
cdc.flattering.enabled=true 12

1

Configures CDC Source to use PostgresConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector.

2

Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.

3 4 5

Metadata used to identify and dispatch the incoming events.

6 7 8 9 10

Connection to the PostgreSQL server running on localhost:5432 as postgres user.

11

Includes the Change Event Value schema in the SourceRecord events.

12

Enables the CDC Event Flattering.

You can run also the CdcSourceIntegrationTests#CdcPostgresTests using this mysql configuration.

MongoDB

Start a pre-configured mongodb from the debezium/example-mongodb:0.10 Docker image:

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:0.10

Initialize the inventory collections

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

In the mongodb terminal output, search for a log entry like host: "3f95a8a6516e:27017" :

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

Add 127.0.0.1 3f95a8a6516e entry to your /etc/hosts

Use following properties to connect the CDC Source to the MongoDB:

cdc.connector=mongodb 1
cdc.offset.storage=memory 2

cdc.config.mongodb.hosts=rs0/localhost:27017 3
cdc.config.mongodb.name=dbserver1 4
cdc.config.mongodb.user=debezium 5
cdc.config.mongodb.password=dbz 6
cdc.config.database.whitelist=inventory 7

cdc.config.tasks.max=1 8

cdc.schema=true 9
cdc.flattering.enabled=true 10

1

Configures CDC Source to use MongoDB Connector. This maps into cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector.

2

Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.

3 4 5 6 7

Connection to the MongoDB running on localhost:27017 as debezium user.

8

debezium.io/docs/connectors/mongodb/#tasks

9

Includes the Change Event Value schema in the SourceRecord events.

10

Enables the CDC Event Flattering.

You can run also the CdcSourceIntegrationTests#CdcPostgresTests using this mysql configuration.

SQL Server

Start a sqlserver from the debezium/example-postgres:1.0 Docker image:

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

Populate with sample data form debezium’s sqlserver tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

Use following properties to connect the CDC Source to the SQLServer:

cdc.connector=sqlserver 1
cdc.offset.storage=memory 2

cdc.name=my-sql-connector 3
cdc.config.database.server.id=85744 4
cdc.config.database.server.name=my-app-connector 5

cdc.config.database.user=sa  6
cdc.config.database.password=Password! 7
cdc.config.database..dbname=testDB 8
cdc.config.database.hostname=localhost 9
cdc.config.database.port=1433 10

1

Configures CDC Source to use SqlServerConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector.

2

Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.

3 4 5

Metadata used to identify and dispatch the incoming events.

6 7 8 9 10

Connection to the SQL Server running on localhost:1433 as sa user.

You can run also the CdcSourceIntegrationTests#CdcSqlServerTests using this mysql configuration.

Oracle

Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up

Populate with sample data form Debezium’s Oracle tutorial:

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1