2. Sources

2.1 File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.1.1 Options

The file source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
file.directory
The directory to poll for new files. (String, default: <none>)
file.filename-pattern
A simple ant pattern to match files. (String, default: <none>)
file.filename-regex
A regex pattern to match files. (Pattern, default: <none>)
file.prevent-duplicates
Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default: true)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

The ref option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.

2.2 FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.2.1 Options

The ftp source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
ftp.auto-create-local-dir
<documentation missing> (Boolean, default: <none>)
ftp.delete-remote-files
<documentation missing> (Boolean, default: <none>)
ftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
ftp.factory.client-mode
The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)
ftp.factory.host
<documentation missing> (String, default: <none>)
ftp.factory.password
<documentation missing> (String, default: <none>)
ftp.factory.port
The port of the server. (Integer, default: 21)
ftp.factory.username
<documentation missing> (String, default: <none>)
ftp.filename-pattern
<documentation missing> (String, default: <none>)
ftp.filename-regex
<documentation missing> (Pattern, default: <none>)
ftp.local-dir
<documentation missing> (File, default: <none>)
ftp.preserve-timestamp
<documentation missing> (Boolean, default: <none>)
ftp.remote-dir
<documentation missing> (String, default: <none>)
ftp.remote-file-separator
<documentation missing> (String, default: <none>)
ftp.tmp-file-suffix
<documentation missing> (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.3 Gemfire Source

This source allows you to subscribe to any creates or updates to a Gemfire region. The application configures a client cache and client region, along with the necessary subscriptions enabled. By default the payload contains the updated entry value, but may be controlled by passing in a SpEL expression that uses the EntryEvent as the evaluation context.

2.3.1 Options

The gemfire source supports the following configuration properties:

gemfire.cache-event-expression
SpEL expression to extract fields from a cache event. (Expression, default: <none>)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.region.region-name
The region name. (String, default: <none>)

2.4 Gemfire-CQ Source

Continuous query allows client applications to create a GemFire query using Object Query Language (OQL) and to register a CQ listener which subscribes to the query and is notified every time the query’s result set changes. The gemfire-cq source registers a CQ which will post CQEvent messages to the stream.

2.4.1 Options

The gemfire-cq source supports the following configuration properties:

gemfire.cq-event-expression
SpEL expression to use to extract data from a cq event. (Expression, default: <none>)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.query
The OQL query (String, default: <none>)

2.5 Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

2.5.1 Options

The http source supports the following configuration properties:

http.mapped-request-headers
Headers that will be mapped. (String[], default: <none>)
http.path-pattern
An Ant-Style pattern to determine which http requests will be captured. (String, default: /)
http.secured
Secure or not HTTP source path. (Boolean, default: false)
server.port
Server HTTP port. (Integer, default: <none>)

2.6 JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

2.6.1 Options

The jdbc source has the following options:

jdbc.max-rows-per-poll
Max numbers of rows to process for each poll. (Integer, default: 0)
jdbc.query
The query to use to select data. (String, default: <none>)
jdbc.split
Whether to split the SQL result as individual messages. (Boolean, default: true)
jdbc.update
An SQL update statement to execute for marking polled messages as 'seen'. (String, default: <none>)
spring.datasource.data
Data (DML) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.initialize
Populate the database using 'data.sql'. (Boolean, default: true)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.schema
Schema (DDL) script resource references. (java.util.List<java.lang.String>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login user of the database. (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

2.7 JMS Source

The "jms" source enables receiving messages from JMS.

2.7.1 Options

The jms source has the following options:

jms.client-id
Client id for durable subscriptions. (String, default: <none>)
jms.destination
The destination from which to receive messages (queue or topic). (String, default: <none>)
jms.message-selector
A selector for messages; (String, default: <none>)
jms.session-transacted
True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default: true)
jms.subscription-durable
True for a durable subscription. (Boolean, default: <none>)
jms.subscription-name
The name of a durable or shared subscription. (String, default: <none>)
jms.subscription-shared
True for a shared subscription. (Boolean, default: <none>)
spring.jms.jndi-name
Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default: <none>)
spring.jms.listener.acknowledge-mode
Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default: <none>, possible values: AUTO,CLIENT,DUPS_OK)
spring.jms.listener.auto-startup
Start the container automatically on startup. (Boolean, default: true)
spring.jms.listener.concurrency
Minimum number of concurrent consumers. (Integer, default: <none>)
spring.jms.listener.max-concurrency
Maximum number of concurrent consumers. (Integer, default: <none>)
spring.jms.pub-sub-domain
Specify if the default destination type is topic. (Boolean, default: false)
[Note]Note

Spring boot broker configuration is used; refer to the Spring Boot Documentation for more information. The spring.jms.* properties above are also handled by the boot JMS support.

2.8 Load Generator Source

A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.

2.8.1 Options

The load-generator source has the following options:

load-generator.generate-timestamp
<documentation missing> (Boolean, default: false)
load-generator.message-count
<documentation missing> (Integer, default: 1000)
load-generator.message-size
<documentation missing> (Integer, default: 1000)
load-generator.producers
<documentation missing> (Integer, default: 1)

2.9 Mail Source

A source module that listens for Emails and emits the message body as a message payload.

2.9.1 Options

The mail source supports the following configuration properties:

mail.charset
The charset for byte[] mail-to-string transformation. (String, default: UTF-8)
mail.delete
Set to true to delete email after download. (Boolean, default: false)
mail.expression
Configure a SpEL expression to select messages. (String, default: true)
mail.idle-imap
Set to true to use IdleImap Configuration. (Boolean, default: false)
mail.java-mail-properties
JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
mail.mark-as-read
Set to true to mark email as read. (Boolean, default: false)
mail.user-flag
The flag to mark messages when the server does not support \Recent. (String, default: spring-integration-mail-adapter)
mail.url
Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.10 MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

2.10.1 Options

The mongodb source has the following options:

mongodb.collection
The MongoDB collection to query (String, default: <none>)
mongodb.query
The MongoDB query (String, default: { })
mongodb.query-expression
The SpEL expression in MongoDB query DSL style (String, default: <none>)
mongodb.split
Whether to split the query result as individual messages. (Boolean, default: true)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (java.lang.Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. (char[], default: <none>)
spring.data.mongodb.port
Mongo server port. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. When set, host and port are ignored. (String, default: mongodb://localhost/test)
spring.data.mongodb.username
Login user of the mongo server. (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

2.11 RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

2.11.1 Options

The rabbit source has the following options:

rabbit.enable-retry
true to enable retry. (Boolean, default: false)
rabbit.initial-retry-interval
Initial retry interval when retry is enabled. (Integer, default: 1000)
rabbit.mapped-request-headers
Headers that will be mapped. (String[], default: [STANDARD_REQUEST_HEADERS])
rabbit.max-attempts
The maximum delivery attempts when retry is enabled. (Integer, default: 3)
rabbit.max-retry-interval
Max retry interval when retry is enabled. (Integer, default: 30000)
rabbit.queues
The queues to which the source will listen for messages. (String[], default: <none>)
rabbit.requeue
Whether rejected messages should be requeued. (Boolean, default: true)
rabbit.retry-multiplier
Retry backoff multiplier when retry is enabled. (Double, default: 2)
rabbit.transacted
Whether the channel is transacted. (Boolean, default: false)
spring.rabbitmq.addresses
Comma-separated list of addresses to which the client should connect to. (String, default: <none>)
spring.rabbitmq.host
RabbitMQ host. (String, default: localhost)
spring.rabbitmq.password
Login to authenticate against the broker. (String, default: <none>)
spring.rabbitmq.port
RabbitMQ port. (Integer, default: 5672)
spring.rabbitmq.requested-heartbeat
Requested heartbeat timeout, in seconds; zero for none. (Integer, default: <none>)
spring.rabbitmq.username
Login user to authenticate to the broker. (String, default: <none>)
spring.rabbitmq.virtual-host
Virtual host to use when connecting to the broker. (String, default: <none>)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry

[Note]Note

With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

2.12 Amazon S3 Source

This source app supports transfer of files using the Amazon S3 protocol. Files are transferred from the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.12.1 Options

The s3 source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
s3.auto-create-local-dir
<documentation missing> (Boolean, default: <none>)
s3.delete-remote-files
<documentation missing> (Boolean, default: <none>)
s3.filename-pattern
<documentation missing> (String, default: <none>)
s3.filename-regex
<documentation missing> (Pattern, default: <none>)
s3.local-dir
<documentation missing> (File, default: <none>)
s3.preserve-timestamp
<documentation missing> (Boolean, default: <none>)
s3.remote-dir
<documentation missing> (String, default: <none>)
s3.remote-file-separator
<documentation missing> (String, default: <none>)
s3.tmp-file-suffix
<documentation missing> (String, default: <none>)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.12.2 Amazon AWS common options

The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey
  • cloud.aws.credentials.secretKey
  • cloud.aws.credentials.instanceProfile
  • cloud.aws.credentials.profileName
  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto
  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto
  • cloud.aws.stack.name

2.13 SFTP Source

This source app supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

2.13.1 Options

The sftp source has the following options:

file.consumer.markers-json
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)
file.consumer.mode
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)
file.consumer.with-markers
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)
sftp.auto-create-local-dir
Set to true to create the local directory if it does not exist. (Boolean, default: true)
sftp.delete-remote-files
Set to true to delete remote files after successful transfer. (Boolean, default: false)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.cache-sessions
Cache sessions (Boolean, default: <none>)
sftp.factory.host
The host name of the server. (String, default: localhost)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
The password to use to connect to the server. (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (String, default: <empty string>)
sftp.factory.username
The username to use to connect to the server. (String, default: <none>)
sftp.filename-pattern
A filter pattern to match the names of files to transfer. (String, default: <none>)
sftp.filename-regex
A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)
sftp.local-dir
The local directory to use for file transfers. (File, default: <none>)
sftp.preserve-timestamp
Set to true to preserve the original timestamp. (Boolean, default: true)
sftp.remote-dir
The remote FTP directory. (String, default: /)
sftp.remote-file-separator
The remote file separator. (String, default: /)
sftp.stream
Set to true to stream the file rather than copy to a local directory. (Boolean, default: false)
sftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: -1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.14 SYSLOG Source

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

2.14.1 Options

The syslog source has the following options:

syslog.buffer-size
the buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
syslog.nio
whether or not to use NIO (when supporting a large number of connections). (Boolean, default: false)
syslog.port
The port to listen on. (Integer, default: 1514)
syslog.protocol
tcp or udp (String, default: tcp)
syslog.reverse-lookup
whether or not to perform a reverse lookup on the incoming socket. (Boolean, default: false)
syslog.rfc
'5424' or '3164' - the syslog format according the the RFC; 3164 is aka 'BSD' format. (String, default: 3164)
syslog.socket-timeout
the socket timeout. (Integer, default: 0)

2.15 TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

2.15.1 Options

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.nio
<documentation missing> (Boolean, default: <none>)
tcp.port
<documentation missing> (Integer, default: <none>)
tcp.reverse-lookup
<documentation missing> (Boolean, default: <none>)
tcp.socket-timeout
<documentation missing> (Integer, default: <none>)
tcp.use-direct-buffers
<documentation missing> (Boolean, default: <none>)

2.15.2 Available Decoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

2.16 TCP Client as a Source which connects to a TCP server and receives data

2.16.1 Options

The tcp-client source has the following options:

tcp.buffer-size
The buffer size used when decoding messages; larger messages will be rejected. (Integer, default: 2048)
tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.decoder
The decoder to use when receiving messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this client will connect. (String, default: localhost)
tcp.nio
<documentation missing> (Boolean, default: <none>)
tcp.port
<documentation missing> (Integer, default: <none>)
tcp.retry-interval
Retry interval (in milliseconds) to check the connection and reconnect. (Long, default: 60000)
tcp.reverse-lookup
<documentation missing> (Boolean, default: <none>)
tcp.socket-timeout
<documentation missing> (Integer, default: <none>)
tcp.use-direct-buffers
<documentation missing> (Boolean, default: <none>)

2.17 Time Source

The time source will simply emit a String with the current time every so often.

2.17.1 Options

The time source has the following options:

trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.18 Trigger Source

This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can also be sent each time the trigger fires.

2.18.1 Options

The trigger source has the following options:

trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.source.payload
The expression for the payload of the Source module. (Expression, default: <none>)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

2.19 TriggerTask Source

The TriggerTask app sends a TaskLaunchRequest based on a fixed delay, date or cron expression. The user is allowed to set the command line arguments as well as the Spring Boot properties that are used by the task.

2.19.1 Options

The triggertask source has the following options:

trigger.cron
Cron expression value for the Cron Trigger. (String, default: <none>)
trigger.date-format
Format for the date value. (String, default: <none>)
trigger.fixed-delay
Fixed delay for periodic triggers. (Integer, default: 1)
trigger.initial-delay
Initial delay for periodic triggers. (Integer, default: 0)
trigger.max-messages
Maximum messages per poll, -1 means infinity. (Long, default: 1)
trigger.source.payload
The expression for the payload of the Source module. (Expression, default: <none>)
trigger.time-unit
The TimeUnit to apply to delay values. (TimeUnit, default: <none>, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)
triggertask.application-name
The name to be applied to the launched task.. (String, default: <empty string>)
triggertask.command-line-args
Space delimited key=value pairs to be used as commandline variables for the task. (String, default: <empty string>)
triggertask.deployment-properties
Comma delimited key=value pairs to be used as deploymentProperties for the task. (String, default: <empty string>)
triggertask.environment-properties
Comma delimited key=value pairs to be used as environmentProperties for the task. (String, default: <empty string>)
triggertask.uri
The uri to the task artifact. (String, default: <empty string>)

2.19.2 Using the triggertask

A trigger task is a source that dispatches TaskLaunchRequest messages that will eventually be consumed by a tasklauncher-* sink. The tasklauncher-* sink will then deploy a task and launch it. An example of this using Spring Cloud Data Flow would look like this:

stream create foo --definition "triggertask --uri=maven://org.springframework.cloud.task.app:timestamp-task:jar:1.1.0.RELEASE --fixed-delay=5 | task-launcher-local"  --deploy

In the case above the tasklauncher-local will receive the TaskLaunchRequest and launch the task enumerated in the --uri property using the Spring Cloud Local Deployer.

The only required property for the triggertask is the --uri which specifies the artifact that will be launched by the tasklauncher-* that you have selected.

[Note]Note

When using the tasklauncher-* and the URI uses maven as the resource please be sure to set the --maven.remote-repositories property for the tasklauncher-*, if the jar is not in your local repository. For example:

stream create foo --definition "triggertask --uri=maven://org.springframework.cloud.task.app:timestamp-task:jar:1.0.1.RELEASE --fixed-delay=5 | task-launcher-local --maven.remote-repositories.repo1.url=http://repo.spring.io/libs-snapshot" --deploy

2.20 Twitter Stream Source

This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).

You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.

2.20.1 Options

The twitterstream source has the following options:

twitter.credentials.access-token
Access token (String, default: <none>)
twitter.credentials.access-token-secret
Access token secret (String, default: <none>)
twitter.credentials.consumer-key
Consumer key (String, default: <none>)
twitter.credentials.consumer-secret
Consumer secret (String, default: <none>)
twitter.stream.language
The language of the tweet text. (String, default: <none>)
twitter.stream.stream-type
Twitter stream type (such as sample, firehose). Default is sample. (TwitterStreamType, default: <none>, possible values: SAMPLE,FIREHOSE)
[Note]Note

twitterstream emit JSON in the native Twitter format.