This application polls a directory and sends new files or their contents to the output channel.
The file source provides the contents of a File as a byte array by default.
However, this can be customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The file source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - file.directory
- The directory to poll for new files. (String, default:
<none>
) - file.filename-pattern
- A simple ant pattern to match files. (String, default:
<none>
) - file.filename-regex
- A regex pattern to match files. (Pattern, default:
<none>
) - file.prevent-duplicates
- Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default:
true
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
The ref
option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The ftp source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - ftp.auto-create-local-dir
- <documentation missing> (Boolean, default:
<none>
) - ftp.delete-remote-files
- <documentation missing> (Boolean, default:
<none>
) - ftp.factory.cache-sessions
- <documentation missing> (Boolean, default:
<none>
) - ftp.factory.client-mode
- The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values: ACTIVE
,PASSIVE
) - ftp.factory.host
- <documentation missing> (String, default:
<none>
) - ftp.factory.password
- <documentation missing> (String, default:
<none>
) - ftp.factory.port
- The port of the server. (Integer, default:
21
) - ftp.factory.username
- <documentation missing> (String, default:
<none>
) - ftp.filename-pattern
- <documentation missing> (String, default:
<none>
) - ftp.filename-regex
- <documentation missing> (Pattern, default:
<none>
) - ftp.local-dir
- <documentation missing> (File, default:
<none>
) - ftp.preserve-timestamp
- <documentation missing> (Boolean, default:
<none>
) - ftp.remote-dir
- <documentation missing> (String, default:
<none>
) - ftp.remote-file-separator
- <documentation missing> (String, default:
<none>
) - ftp.tmp-file-suffix
- <documentation missing> (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
A source module that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
The http source supports the following configuration properties:
- http.path-pattern
- An Ant-Style pattern to determine which http requests will be captured. (String, default:
/
) - server.port
- Server HTTP port. (Integer, default:
<none>
)
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the
Spring Boot JDBC Support for more
information.
The jdbc source has the following options:
- jdbc.max-rows-per-poll
- Max numbers of rows to process for each poll. (Integer, default:
0
) - jdbc.query
- The query to use to select data. (String, default:
<none>
) - jdbc.split
- Whether to split the SQL result as individual messages. (Boolean, default:
true
) - jdbc.update
- An SQL update statement to execute for marking polled messages as 'seen'. (String, default:
<none>
) - spring.datasource.driver-class-name
- <documentation missing> (String, default:
<none>
) - spring.datasource.init-sql
- <documentation missing> (String, default:
<none>
) - spring.datasource.initialize
- Populate the database using 'data.sql'. (Boolean, default:
true
) - spring.datasource.password
- <documentation missing> (String, default:
<none>
) - spring.datasource.url
- <documentation missing> (String, default:
<none>
) - spring.datasource.username
- <documentation missing> (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
The "jms" source enables receiving messages from JMS.
The jms source has the following options:
- jms.client-id
- Client id for durable subscriptions. (String, default:
<none>
) - jms.destination
- The destination from which to receive messages (queue or topic). (String, default:
<none>
) - jms.message-selector
- A selector for messages; (String, default:
<none>
) - jms.session-transacted
- True to enable transactions and select a DefaultMessageListenerContainer, false to
select a SimpleMessageListenerContainer. (Boolean, default:
true
) - jms.subscription-durable
- True for a durable subscription. (Boolean, default:
<none>
) - jms.subscription-name
- The name of a durable or shared subscription. (String, default:
<none>
) - jms.subscription-shared
- True for a shared subscription. (Boolean, default:
<none>
) - spring.jms.jndi-name
- Connection factory JNDI name. When set, takes precedence to others connection
factory auto-configurations. (String, default:
<none>
) - spring.jms.listener.acknowledge-mode
- Acknowledge mode of the container. By default, the listener is transacted with
automatic acknowledgment. (AcknowledgeMode, default:
<none>
, possible values: AUTO
,CLIENT
,DUPS_OK
) - spring.jms.listener.auto-startup
- Start the container automatically on startup. (Boolean, default:
true
) - spring.jms.listener.concurrency
- Minimum number of concurrent consumers. (Integer, default:
<none>
) - spring.jms.listener.max-concurrency
- Maximum number of concurrent consumers. (Integer, default:
<none>
) - spring.jms.pub-sub-domain
- Specify if the default destination type is topic. (Boolean, default:
false
)
![[Note]](images/note.png) | Note |
---|
Spring boot broker configuration is used; refer to the
Spring Boot Documentation for more information.
The spring.jms.* properties above are also handled by the boot JMS support. |
2.6 Load Generator Source
A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.
The load-generator source has the following options:
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/testing/spring-cloud-starter-stream-source-mail/README.adoc[tags=ref-doc]
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/testing/spring-cloud-starter-stream-source-mongodb/README.adoc[tags=ref-doc]
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically.
You can easily create a Queue using the RabbitMQ web UI.
The rabbit source has the following options:
- rabbit.enable-retry
- true to enable retry. (Boolean, default:
false
) - rabbit.initial-retry-interval
- Initial retry interval when retry is enabled. (Integer, default:
1000
) - rabbit.mapped-request-headers
- Headers that will be mapped. (String[], default:
[STANDARD_REQUEST_HEADERS]
) - rabbit.max-attempts
- The maximum delivery attempts when retry is enabled. (Integer, default:
3
) - rabbit.max-retry-interval
- Max retry interval when retry is enabled. (Integer, default:
30000
) - rabbit.queues
- The queues to which the source will listen for messages. (String[], default:
<none>
) - rabbit.requeue
- Whether rejected messages should be requeued. (Boolean, default:
true
) - rabbit.retry-multiplier
- Retry backoff multiplier when retry is enabled. (Double, default:
2
) - rabbit.transacted
- Whether the channel is transacted. (Boolean, default:
false
) - spring.rabbitmq.addresses
- Comma-separated list of addresses to which the client should connect to. (String, default:
<none>
) - spring.rabbitmq.host
- RabbitMQ host. (String, default:
localhost
) - spring.rabbitmq.password
- Login to authenticate against the broker. (String, default:
<none>
) - spring.rabbitmq.port
- RabbitMQ port. (Integer, default:
5672
) - spring.rabbitmq.requested-heartbeat
- Requested heartbeat timeout, in seconds; zero for none. (Integer, default:
<none>
) - spring.rabbitmq.username
- Login user to authenticate to the broker. (String, default:
<none>
) - spring.rabbitmq.virtual-host
- Virtual host to use when connecting to the broker. (String, default:
<none>
)
Also see the Spring Boot Documentation
for addition properties for the broker connections and listener properties.
![[Note]](images/note.png) | Note |
---|
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered). |
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-app-starters/master/s3/spring-cloud-starter-stream-source-s3/README.adoc[tags=ref-doc]
This source app supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The sftp source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - sftp.auto-create-local-dir
- <documentation missing> (Boolean, default:
<none>
) - sftp.delete-remote-files
- <documentation missing> (Boolean, default:
<none>
) - sftp.factory.allow-unknown-keys
- True to allow an unknown or changed key. (Boolean, default:
false
) - sftp.factory.cache-sessions
- <documentation missing> (Boolean, default:
<none>
) - sftp.factory.host
- <documentation missing> (String, default:
<none>
) - sftp.factory.known-hosts-expression
- A SpEL expression resolving to the location of the known hosts file. (String, default:
<none>
) - sftp.factory.pass-phrase
- Passphrase for user's private key. (String, default:
<empty string>
) - sftp.factory.password
- <documentation missing> (String, default:
<none>
) - sftp.factory.port
- The port of the server. (Integer, default:
22
) - sftp.factory.private-key
- Resource location of user's private key. (String, default:
<empty string>
) - sftp.factory.username
- <documentation missing> (String, default:
<none>
) - sftp.filename-pattern
- <documentation missing> (String, default:
<none>
) - sftp.filename-regex
- <documentation missing> (Pattern, default:
<none>
) - sftp.local-dir
- <documentation missing> (File, default:
<none>
) - sftp.preserve-timestamp
- <documentation missing> (Boolean, default:
<none>
) - sftp.remote-dir
- <documentation missing> (String, default:
<none>
) - sftp.remote-file-separator
- <documentation missing> (String, default:
<none>
) - sftp.tmp-file-suffix
- <documentation missing> (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
The syslog source receives SYSLOG packets over UDP, TCP, or both.
RFC3164 (BSD) and RFC5424 formats are supported.
The syslog source has the following options:
- syslog.buffer-size
- the buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - syslog.nio
- whether or not to use NIO (when supporting a large number of connections). (Boolean, default:
false
) - syslog.port
- The port to listen on. (Integer, default:
1514
) - syslog.protocol
- tcp or udp (String, default:
tcp
) - syslog.reverse-lookup
- whether or not to perform a reverse lookup on the incoming socket. (Boolean, default:
false
) - syslog.rfc
- '5424' or '3164' - the syslog format according the the RFC; 3164 is aka 'BSD' format. (String, default:
3164
) - syslog.socket-timeout
- the socket timeout. (Integer, default:
0
)
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are
available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
- tcp.buffer-size
- The buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - tcp.decoder
- The decoder to use when receiving messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.nio
- <documentation missing> (Boolean, default:
<none>
) - tcp.port
- <documentation missing> (Integer, default:
<none>
) - tcp.reverse-lookup
- <documentation missing> (Boolean, default:
<none>
) - tcp.socket-timeout
- <documentation missing> (Integer, default:
<none>
) - tcp.use-direct-buffers
- <documentation missing> (Boolean, default:
<none>
)
2.10.2 Available Decoders
Text Data
- CRLF (default)
- text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
- text terminated by line feed (0x0a)
- NULL
- text terminated by a null byte (0x00)
- STXETX
- text preceded by an STX (0x02) and terminated by an ETX (0x03)
Text and Binary Data
- RAW
- no structure - the client indicates a complete message by closing the socket
- L1
- data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
- data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
- data preceded by a four byte (signed) length field (up to 231-1 bytes)
The time source will simply emit a String with the current time every so often.
The time source has the following options:
- trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can
also be sent each time the trigger fires.
The trigger source has the following options:
- trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.source.payload
- The expression for the payload of the Source module. (Expression, default:
<none>
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
2.13 Twitter Stream Source
This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
The twitterstream source has the following options:
- twitter.credentials.access-token
- Access token (String, default:
<none>
) - twitter.credentials.access-token-secret
- Access token secret (String, default:
<none>
) - twitter.credentials.consumer-key
- Consumer key (String, default:
<none>
) - twitter.credentials.consumer-secret
- Consumer secret (String, default:
<none>
) - twitter.stream.language
- The language of the tweet text. (String, default:
<none>
) - twitter.stream.stream-type
- Twitter stream type (such as sample, firehose). Default is sample. (TwitterStreamType, default:
<none>
, possible values: SAMPLE
,FIREHOSE
)