2. Sources

2.1 File

The file source provides the contents of a File as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.1.1 Options

The file source has the following options:

dir
the absolute path to the directory to monitor for files (String, default: ``)
fixedDelay
the fixed delay polling interval specified in seconds (int, default: 5)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
pattern
a filter expression (Ant style) to accept only files that match the pattern (String, default: * )
preventDuplicates
whether to prevent the same file from being processed twice (boolean, default: true)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

The ref option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.

2.2 FTP (ftp)

This source module supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the module is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.2.1 Options

The ftp source has the following options:

autoCreateLocalDir
local directory must be auto created if it does not exist (boolean, default: true)
clientMode
client mode to use : 2 for passive mode and 0 for active mode (int, default: 0)
deleteRemoteFiles
delete remote files after transfer (boolean, default: false)
filenamePattern
simple filename pattern to apply to the filter (String, default: *)
fixedDelay
the rate at which to poll the remote directory (int, default: 1)
host
the host name for the FTP server (String, default: localhost)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
localDir
set the local directory the remote files are transferred to (String, default: ``)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
password
the password for the FTP connection (Password, no default)
port
the port for the FTP server (int, default: 21)
preserveTimestamp
whether to preserve the timestamp of files retrieved (boolean, default: true)
remoteDir
the remote directory to transfer the files from (String, default: /)
remoteFileSeparator
file separator to use on the remote side (String, default: /)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
tmpFileSuffix
extension to use when downloading files (String, default: .tmp)
username
the username for the FTP connection (String, no default)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

2.3 HTTP (http)

A source module that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches 'text/*' or 'application/json', the payload will be a String, otherwise the payload will be a byte array.

To create a stream definition in the server using the Spring Cloud Data Flow shell

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log" --deploy

Post some data to the http server on port 9000

dataflow:> http post --target http://localhost:9000 --data "hello world"

See if the data ended up in the log.

2.4 Load Generator (load-generator)

A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.

2.4.1 Options

The load-generator source has the following options:

messageCount
the number of messages to send (Integer, default: 100)
messageSize
the size of message to send (Integer, 1000)
producers
the number of producers (Integer, 1)
outputType
how this module should emit messages it produces (MimeType, default: no default)

2.5 SFTP (sftp)

This source module supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the module is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.5.1 Options

The sftp source has the following options:

allowUnknownKeys
true to allow connecting to a host with an unknown or changed key (boolean, default: false)
autoCreateLocalDir
if local directory must be auto created if it does not exist (boolean, default: true)
deleteRemoteFiles
delete remote files after transfer (boolean, default: false)
fixedDelay
fixed delay in SECONDS to poll the remote directory (int, default: 1)
host
the remote host to connect to (String, default: localhost)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
knownHostsExpression
a SpEL expresssion location of known hosts file; required if 'allowUnknownKeys' is false; examples: systemProperties["user.home"]+"/.ssh/known_hosts", "/foo/bar/known_hosts" (String, no default)
localDir
set the local directory the remote files are transferred to (String, default: ``)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
passPhrase
the passphrase to use (String, default: ``)
password
the password for the provided user (String, default: ``)
pattern
simple filename pattern to apply to the filter (String, no default)
port
the remote port to connect to (int, default: 22)
privateKey
the private key location (a valid Spring Resource URL) (String, default: ``)
regexPattern
filename regex pattern to apply to the filter (String, no default)
remoteDir
the remote directory to transfer the files from (String, no default)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
tmpFileSuffix
extension to use when downloading files (String, default: .tmp)
user
the username to use (String, no default)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

2.6 TCP

The tcp source acts as a server and allows a remote party to connect to Spring Cloud Data Flow and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source module have a byte[] payload.

2.6.1 Options

bufferSize
the size of the buffer (bytes) to use when decoding (int, default: 2048)
decoder
the decoder to use when receiving messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
nio
whether or not to use NIO (boolean, default: false)
port
the port on which to listen (int, default: 1234)
reverseLookup
perform a reverse DNS lookup on the remote IP Address (boolean, default: false)
socketTimeout
the timeout (ms) before closing the socket when no data is received (int, default: 120000)
useDirectBuffers
whether or not to use direct buffers (boolean, default: false)

2.6.2 Available Decoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

2.7 Time (time)

The time source will simply emit a String with the current time every so often.

2.7.1 Options

The time source has the following options:

fixedDelay
time delay between messages, expressed in TimeUnits (seconds by default) (int, default: 1)
format
how to render the current time, using SimpleDateFormat (String, default: yyyy-MM-dd HH:mm:ss)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)

2.8 Twitter Stream (twitterstream)

This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).

You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.

Stream creation is then straightforward:

dataflow:> stream create --name tweets --definition "twitterstream | log" --deploy

2.8.1 Options

The twitterstream source has the following options:

accessToken
a valid OAuth access token (String, no default)
accessTokenSecret
an OAuth secret corresponding to the access token (String, no default)
consumerKey
a consumer key issued by twitter (String, no default)
consumerSecret
consumer secret corresponding to the consumer key (String, no default)
language
language code e.g. 'en' (String, default: ``)
[Note]Note

twitterstream emit JSON in the native Twitter format.