This source module supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the module is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The ftp source has the following options:
- autoCreateLocalDir
- local directory must be auto created if it does not exist (boolean, default:
true
) - clientMode
- client mode to use : 2 for passive mode and 0 for active mode (int, default:
0
) - deleteRemoteFiles
- delete remote files after transfer (boolean, default:
false
) - filenamePattern
- simple filename pattern to apply to the filter (String, default: *)
- fixedDelay
- the rate at which to poll the remote directory (int, default:
1
) - host
- the host name for the FTP server (String, default:
localhost
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - localDir
- set the local directory the remote files are transferred to (String, default:
/tmp/xd/ftp
) - maxMessages
- the maximum messages per poll; -1 for unlimited (long, default:
-1
) - mode
- specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default:
contents
, possible values: ref,lines,contents
) - password
- the password for the FTP connection (Password, no default)
- port
- the port for the FTP server (int, default:
21
) - preserveTimestamp
- whether to preserve the timestamp of files retrieved (boolean, default:
true
) - remoteDir
- the remote directory to transfer the files from (String, default:
/
) - remoteFileSeparator
- file separator to use on the remote side (String, default:
/
) - timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
) - tmpFileSuffix
- extension to use when downloading files (String, default:
.tmp
) - username
- the username for the FTP connection (String, no default)
- withMarkers
- if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)
A source module that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches 'text/*' or 'application/json', the payload will be a String,
otherwise the payload will be a byte array.
To create a stream definition in the server using the XD shell
dataflow:> stream create --name httptest --definition "http | log" --deploy
Post some data to the http server on the default port of 9000
dataflow:> http post --target http://localhost:9000 --data "hello world"
See if the data ended up in the log.
The time source will simply emit a String with the current time every so often.
The time source has the following options:
- fixedDelay
- time delay between messages, expressed in TimeUnits (seconds by default) (int, default:
1
) - format
- how to render the current time, using SimpleDateFormat (String, default:
yyyy-MM-dd HH:mm:ss
) - initialDelay
- an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default:
0
) - timeUnit
- the time unit for the fixed and initial delays (String, default:
SECONDS
)
9.4 Twitter Stream (twitterstream
)
This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
Stream creation is then straightforward:
dataflow:> stream create --name tweets --definition "twitterstream | log" --deploy
The twitterstream source has the following options:
- accessToken
- a valid OAuth access token (String, no default)
- accessTokenSecret
- an OAuth secret corresponding to the access token (String, no default)
- consumerKey
- a consumer key issued by twitter (String, no default)
- consumerSecret
- consumer secret corresponding to the consumer key (String, no default)
- language
- language code e.g. 'en' (String, default: ``)