The file source provides the contents of a File as a byte array by default.
However, this can be customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The file source has the following options:
- dir
- the absolute path to the directory to monitor for files (String, default: ``)
- fixedDelay
- the fixed delay polling interval specified in seconds (int, default:
5
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - maxMessages
- the maximum messages per poll; -1 for unlimited (long, default:
-1
) - mode
- specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default:
contents
, possible values: ref,lines,contents
) - pattern
- a filter expression (Ant style) to accept only files that match the pattern (String, default: * )
- preventDuplicates
- whether to prevent the same file from being processed twice (boolean, default:
true
) - timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
) - withMarkers
- if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)
The ref
option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.
This source module supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the module is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The ftp source has the following options:
- autoCreateLocalDir
- local directory must be auto created if it does not exist (boolean, default:
true
) - clientMode
- client mode to use : 2 for passive mode and 0 for active mode (int, default:
0
) - deleteRemoteFiles
- delete remote files after transfer (boolean, default:
false
) - filenamePattern
- simple filename pattern to apply to the filter (String, default: *)
- fixedDelay
- the rate at which to poll the remote directory (int, default:
1
) - host
- the host name for the FTP server (String, default:
localhost
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - localDir
- set the local directory the remote files are transferred to (String, default: ``)
- maxMessages
- the maximum messages per poll; -1 for unlimited (long, default:
-1
) - mode
- specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default:
contents
, possible values: ref,lines,contents
) - password
- the password for the FTP connection (Password, no default)
- port
- the port for the FTP server (int, default:
21
) - preserveTimestamp
- whether to preserve the timestamp of files retrieved (boolean, default:
true
) - remoteDir
- the remote directory to transfer the files from (String, default:
/
) - remoteFileSeparator
- file separator to use on the remote side (String, default:
/
) - timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
) - tmpFileSuffix
- extension to use when downloading files (String, default:
.tmp
) - username
- the username for the FTP connection (String, no default)
- withMarkers
- if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)
A source module that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches 'text/*' or 'application/json', the payload will be a String,
otherwise the payload will be a byte array.
To create a stream definition in the server using the Spring Cloud Data Flow shell
dataflow:> stream create --name httptest --definition "http --server.port=9000 | log" --deploy
Post some data to the http server on port 9000
dataflow:> http post --target http://localhost:9000 --data "hello world"
See if the data ended up in the log.
The "jms" source enables receiving messages from JMS.
The jms source has the following options:
- spring.jms.listener.acknowledgeMode
- the session acknowledge mode (String, default:
AUTO
) - clientId
- an identifier for the client, to be associated with a durable or shared topic subscription (String, no default)
- destination
- the destination name from which messages will be received (String, no default)
- messageSelector
- a message selector to be applied to messages (String, no default)
- subscriptionDurable
- when true, indicates the subscription to a topic is durable (boolean, default:
false
) - subscriptionShared
- when true, indicates the subscription to a topic is shared (JMS 2.0) (boolean, default:
false
) - spring.jms.pubSubDomain
- when true, indicates that the destination is a topic (boolean, default:
false
) - subscriptionName
- a name that will be assigned to the topic subscription (String, no default)
- sessionTransacted
- True to enable transactions and use a `DefaultMessageListenerContainer`, false to select a
`SimpleMessageListenerContainer` (String, default: true)
- spring.jms.listener.concurrency
- The minimum number of consumer threads. *(Integer, default: 1)
- spring.jms.listener.maxConcurrency
- The maximum number of consumer threads. Only supported when `sessionTransacted ` is true *(Integer, default: 1)
| Note |
---|
Spring boot broker configuration is used; refer to the
Spring Boot Documentation for more information.
The spring.jms.* properties above are also handled by the boot JMS support. |
7.5 Load Generator (load-generator
)
A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.
The load-generator source has the following options:
- messageCount
- the number of messages to send (Integer, default:
100
) - messageSize
- the size of message to send (Integer,
1000
) - producers
- the number of producers (Integer,
1
) - outputType
- how this module should emit messages it produces (MimeType, default: no default)
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically.
You can easily create a Queue using the RabbitMQ web UI.
The rabbit source has the following options:
- enableRetry
- enable retry; when retries are exhausted the message will be rejected; message disposition will depend on dead letter configuration (boolean, default:
false
) - initialRetryInterval
- initial interval between retries (int, default:
1000
) - mappedRequestHeaders
- request message header names to be propagated to/from the adpater/gateway (String, default:
STANDARD_REQUEST_HEADERS
) - maxAttempts
- maximum delivery attempts (int, default:
3
) - maxConcurrency
- the maximum number of consumers (int, default:
1
) - maxRetryInterval
- maximum retry interval (int, default:
30000
) - queues
- the queue(s) from which messages will be received (String, default: no default)
- requeue
- whether rejected messages will be requeued by default (boolean, default:
true
) - retryMultiplier
- retry interval multiplier (double, default:
2.0
) - transacted
- true if the channel is to be transacted (boolean, default:
false
)
Also see the Spring Boot Documentation
for addition properties for the broker connections and listener properties.
| Note |
---|
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered). |
This source module supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the module is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The sftp source has the following options:
- allowUnknownKeys
- true to allow connecting to a host with an unknown or changed key (boolean, default:
false
) - autoCreateLocalDir
- if local directory must be auto created if it does not exist (boolean, default:
true
) - deleteRemoteFiles
- delete remote files after transfer (boolean, default:
false
) - fixedDelay
- fixed delay in SECONDS to poll the remote directory (int, default:
1
) - host
- the remote host to connect to (String, default:
localhost
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - knownHostsExpression
- a SpEL expression location of known hosts file; required if 'allowUnknownKeys' is false; examples: systemProperties["user.home"]+"/.ssh/known_hosts", "/foo/bar/known_hosts" (String, no default)
- localDir
- set the local directory the remote files are transferred to (String, default: ``)
- maxMessages
- the maximum messages per poll; -1 for unlimited (long, default:
-1
) - mode
- specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default:
contents
, possible values: ref,lines,contents
) - passPhrase
- the passphrase to use (String, default: ``)
- password
- the password for the provided user (String, default: ``)
- pattern
- simple filename pattern to apply to the filter (String, no default)
- port
- the remote port to connect to (int, default:
22
) - privateKey
- the private key location (a valid Spring Resource URL) (String, default: ``)
- regexPattern
- filename regex pattern to apply to the filter (String, no default)
- remoteDir
- the remote directory to transfer the files from (String, no default)
- timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
) - tmpFileSuffix
- extension to use when downloading files (String, default:
.tmp
) - user
- the username to use (String, no default)
- withMarkers
- if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)
The tcp
source acts as a server and allows a remote party to connect to Spring Cloud Data Flow and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are
available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source module have a byte[]
payload.
- bufferSize
- the size of the buffer (bytes) to use when decoding (int, default:
2048
) - decoder
- the decoder to use when receiving messages (Encoding, default:
CRLF
, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4
) - nio
- whether or not to use NIO (boolean, default:
false
) - port
- the port on which to listen (int, default:
1234
) - reverseLookup
- perform a reverse DNS lookup on the remote IP Address (boolean, default:
false
) - socketTimeout
- the timeout (ms) before closing the socket when no data is received (int, default:
120000
) - useDirectBuffers
- whether or not to use direct buffers (boolean, default:
false
)
Text Data
- CRLF (default)
- text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
- text terminated by line feed (0x0a)
- NULL
- text terminated by a null byte (0x00)
- STXETX
- text preceded by an STX (0x02) and terminated by an ETX (0x03)
Text and Binary Data
- RAW
- no structure - the client indicates a complete message by closing the socket
- L1
- data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
- data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
- data preceded by a four byte (signed) length field (up to 231-1 bytes)
The time source will simply emit a String with the current time every so often.
The time source has the following options:
- fixedDelay
- time delay between messages, expressed in TimeUnits (seconds by default) (int, default:
1
) - dateFormat
- how to render the current time, using SimpleDateFormat (String, default:
yyyy-MM-dd HH:mm:ss
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
)
7.10 Twitter Stream (twitterstream
)
This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
Stream creation is then straightforward:
dataflow:> stream create --name tweets --definition "twitterstream | log" --deploy
The twitterstream source has the following options:
- accessToken
- a valid OAuth access token (String, no default)
- accessTokenSecret
- an OAuth secret corresponding to the access token (String, no default)
- consumerKey
- a consumer key issued by twitter (String, no default)
- consumerSecret
- consumer secret corresponding to the consumer key (String, no default)
- language
- language code e.g. 'en' (String, default: ``)
This source polls data from the RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the
Spring Boot JDBC Support for more
information.
Stream creation is then straightforward:
dataflow:> stream create --name jdbcSource --definition "jdbc --spring.datasource.url=jdbc:oracle:thin:@localhost:1521:test --query='select foo from bar' --fixedDelay=5 | log" --deploy
The jdbc source has the following options:
- query
- the query to use to select data (String, no default, required)
- update
- an SQL update statement to execute for marking polled messages as 'seen' (String, no default)
- split
- whether to split the SQL result as individual messages (boolean, default:
true
) - $maxRowsPerPoll$$
- max numbers of rows to process for each poll (int, default:
0
)
Also see the Spring Boot Documentation
+for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.