This application polls a directory and sends new files or their contents to the output channel.
The file source provides the contents of a File as a byte array by default.
However, this can be customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
N/A (Reads from a file system directory).
Content-Type: application/octet-stream
file_originalFile: <java.io.File>
file_name: <file name>
A byte[]
filled with the file contents.
Content-Type: text/plain
file_originalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
The file source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - file.directory
- The directory to poll for new files. (String, default:
<none>
) - file.filename-pattern
- A simple ant pattern to match files. (String, default:
<none>
) - file.filename-regex
- A regex pattern to match files. (Pattern, default:
<none>
) - file.prevent-duplicates
- Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default:
true
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
The ref
option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar file_source.jar --file.directory=/tmp/foo --file.consumer.mode=lines --trigger.fixed-delay=60
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
N/A (Fetches files from an FTP server).
Content-Type: application/octet-stream
file_orginalFile: <java.io.File>
file_name: <file name>
A byte[]
filled with the file contents.
Content-Type: text/plain
file_orginalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
The ftp source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - ftp.auto-create-local-dir
- Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - ftp.delete-remote-files
- Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - ftp.factory.cache-sessions
- <documentation missing> (Boolean, default:
<none>
) - ftp.factory.client-mode
- The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values: ACTIVE
,PASSIVE
) - ftp.factory.host
- <documentation missing> (String, default:
<none>
) - ftp.factory.password
- <documentation missing> (String, default:
<none>
) - ftp.factory.port
- The port of the server. (Integer, default:
21
) - ftp.factory.username
- <documentation missing> (String, default:
<none>
) - ftp.filename-pattern
- A filter pattern to match the names of files to transfer. (String, default:
<none>
) - ftp.filename-regex
- A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - ftp.local-dir
- The local directory to use for file transfers. (File, default:
<none>
) - ftp.preserve-timestamp
- Set to true to preserve the original timestamp. (Boolean, default:
true
) - ftp.remote-dir
- The remote FTP directory. (String, default:
/
) - ftp.remote-file-separator
- The remote file separator. (String, default:
/
) - ftp.tmp-file-suffix
- The suffix to use while the transfer is in progress. (String, default:
.tmp
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar ftp_source.jar --ftp.remote-dir=foo --file.mode=lines --trigger.fixed-delay=60 --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
This source allows you to subscribe to any creates or updates to a Gemfire region. The application configures a client cache and client region, along with the necessary
subscriptions enabled. By default the payload contains the updated entry value,
but may be controlled by passing in a SpEL expression that uses the EntryEvent as the
evaluation context.
The gemfire source supports the following configuration properties:
- gemfire.cache-event-expression
- SpEL expression to extract fields from a cache event. (Expression, default:
<none>
) - gemfire.pool.connect-type
- Specifies connection type: 'server' or 'locator'. (ConnectType, default:
<none>
, possible values: locator
,server
) - gemfire.pool.host-addresses
- Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default:
<none>
) - gemfire.pool.subscription-enabled
- Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default:
false
) - gemfire.region.region-name
- The region name. (String, default:
<none>
) - gemfire.security.password
- The cache password. (String, default:
<none>
) - gemfire.security.username
- The cache username. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar ./gemfire-source.jar --gemfire.region.region-name=MyRegion --gemfire.cacheEventExpression="newValue"
Continuous query allows client applications to create a GemFire query using Object Query Language (OQL) and to
register a CQ listener which subscribes to the query and is notified every time the query’s result set changes.
The gemfire-cq source registers a CQ which will post CQEvent messages to the stream.
The gemfire-cq source supports the following configuration properties:
- gemfire.cq-event-expression
- SpEL expression to use to extract data from a cq event. (Expression, default:
<none>
) - gemfire.pool.connect-type
- Specifies connection type: 'server' or 'locator'. (ConnectType, default:
<none>
, possible values: locator
,server
) - gemfire.pool.host-addresses
- Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default:
<none>
) - gemfire.pool.subscription-enabled
- Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default:
false
) - gemfire.query
- The OQL query (String, default:
<none>
) - gemfire.security.password
- The cache password. (String, default:
<none>
) - gemfire.security.username
- The cache username. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar gemfire-cq-source.jar --gemfire.query=
A source application that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
If content type matches text/*
or application/json
If content type does not match text/*
or application/json
The http source supports the following configuration properties:
- http.cors.allow-credentials
- Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default:
<none>
) - http.cors.allowed-headers
- List of request headers that can be used during the actual request. (String[], default:
<none>
) - http.cors.allowed-origins
- List of allowed origins, e.g. "http://domain1.com". (String[], default:
<none>
) - http.enable-csrf
- The security CSRF enabling flag. Makes sense only if 'enableSecurity = true'. (Boolean, default:
false
) - http.enable-security
- The security enabling flag. (Boolean, default:
false
) - http.mapped-request-headers
- Headers that will be mapped. (String[], default:
<none>
) - http.path-pattern
- An Ant-Style pattern to determine which http requests will be captured. (String, default:
/
) - server.port
- Server HTTP port. (Integer, default:
8080
)
| Note |
---|
Security is disabled for this application by default.
To enable it, you should use the mentioned above http.enable-security = true property. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar http_source.jar
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the
Spring Boot JDBC Support for more
information.
Content-Type: application/x-java-object
List<Map<String, Object>>
The jdbc source has the following options:
- jdbc.max-rows-per-poll
- Max numbers of rows to process for each poll. (Integer, default:
0
) - jdbc.query
- The query to use to select data. (String, default:
<none>
) - jdbc.split
- Whether to split the SQL result as individual messages. (Boolean, default:
true
) - jdbc.update
- An SQL update statement to execute for marking polled messages as 'seen'. (String, default:
<none>
) - spring.datasource.data
- Data (DML) script resource references. (List<String>, default:
<none>
) - spring.datasource.driver-class-name
- Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.initialization-mode
- Initialize the datasource using available DDL and DML scripts. (DataSourceInitializationMode, default:
embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
) - spring.datasource.password
- Login password of the database. (String, default:
<none>
) - spring.datasource.schema
- Schema (DDL) script resource references. (List<String>, default:
<none>
) - spring.datasource.url
- JDBC url of the database. (String, default:
<none>
) - spring.datasource.username
- Login username of the database. (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar jdbc_source.jar --query=<QUERY> [datasource credentials]
The "jms" source enables receiving messages from JMS.
content-type: application/octet-stream
content-type: application/x-java-serialized-object
content-type: application/x-java-object
The jms source has the following options:
- jms.client-id
- Client id for durable subscriptions. (String, default:
<none>
) - jms.destination
- The destination from which to receive messages (queue or topic). (String, default:
<none>
) - jms.message-selector
- A selector for messages; (String, default:
<none>
) - jms.session-transacted
- True to enable transactions and select a DefaultMessageListenerContainer, false to
select a SimpleMessageListenerContainer. (Boolean, default:
true
) - jms.subscription-durable
- True for a durable subscription. (Boolean, default:
<none>
) - jms.subscription-name
- The name of a durable or shared subscription. (String, default:
<none>
) - jms.subscription-shared
- True for a shared subscription. (Boolean, default:
<none>
) - spring.jms.jndi-name
- Connection factory JNDI name. When set, takes precedence to others connection
factory auto-configurations. (String, default:
<none>
) - spring.jms.listener.acknowledge-mode
- Acknowledge mode of the container. By default, the listener is transacted with
automatic acknowledgment. (AcknowledgeMode, default:
<none>
, possible values: AUTO
,CLIENT
,DUPS_OK
) - spring.jms.listener.auto-startup
- Start the container automatically on startup. (Boolean, default:
true
) - spring.jms.listener.concurrency
- Minimum number of concurrent consumers. (Integer, default:
<none>
) - spring.jms.listener.max-concurrency
- Maximum number of concurrent consumers. (Integer, default:
<none>
) - spring.jms.pub-sub-domain
- Whether the default destination type is topic. (Boolean, default:
false
)
| Note |
---|
Spring boot broker configuration is used; refer to the
Spring Boot Documentation for more information.
The spring.jms.* properties above are also handled by the boot JMS support. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar jms-source.jar --jms.destination=
2.8 Load Generator Source
A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.
Content-Type: application/octet-stream
The load-generator source has the following options:
- load-generator.generate-timestamp
- <documentation missing> (Boolean, default:
false
) - load-generator.message-count
- <documentation missing> (Integer, default:
1000
) - load-generator.message-size
- <documentation missing> (Integer, default:
1000
) - load-generator.producers
- <documentation missing> (Integer, default:
1
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar load-generator-source.jar
A source module that listens for Emails and emits the message body as a message payload.
The mail source supports the following configuration properties:
- mail.charset
- The charset for byte[] mail-to-string transformation. (String, default:
UTF-8
) - mail.delete
- Set to true to delete email after download. (Boolean, default:
false
) - mail.expression
- Configure a SpEL expression to select messages. (String, default:
true
) - mail.idle-imap
- Set to true to use IdleImap Configuration. (Boolean, default:
false
) - mail.java-mail-properties
- JavaMail properties as a new line delimited string of name-value pairs, e.g.
'foo=bar\n baz=car'. (Properties, default:
<none>
) - mail.mark-as-read
- Set to true to mark email as read. (Boolean, default:
false
) - mail.url
- Mail connection URL for connection to Mail server e.g.
'imaps://username:[email protected]:993/Inbox'. (URLName, default:
<none>
) - mail.user-flag
- The flag to mark messages when the server does not support \Recent (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar mail-source.jar --mail.javaMailProperties= --mail.url= --mail.expression= \
--mail.charset= --mail.userFlag=
This source polls data from MongoDB.
This source is fully based on the MongoDataAutoConfiguration
, so refer to the
Spring Boot MongoDB Support
for more information.
The mongodb source has the following options:
- mongodb.collection
- The MongoDB collection to query (String, default:
<none>
) - mongodb.query
- The MongoDB query (String, default:
{ }
) - mongodb.query-expression
- The SpEL expression in MongoDB query DSL style (Expression, default:
<none>
) - mongodb.split
- Whether to split the query result as individual messages. (Boolean, default:
true
) - spring.data.mongodb.authentication-database
- Authentication database name. (String, default:
<none>
) - spring.data.mongodb.database
- Database name. (String, default:
<none>
) - spring.data.mongodb.field-naming-strategy
- Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - spring.data.mongodb.grid-fs-database
- GridFS database name. (String, default:
<none>
) - spring.data.mongodb.host
- Mongo server host. Cannot be set with URI. (String, default:
<none>
) - spring.data.mongodb.password
- Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - spring.data.mongodb.port
- Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - spring.data.mongodb.uri
- Mongo database URI. Cannot be set with host, port and credentials. (String, default:
<none>
) - spring.data.mongodb.username
- Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
Also see the Spring Boot Documentation for additional MongoProperties
properties.
See and TriggerProperties
for polling options.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar mongodb-source.jar --mongodb.query= --mongodb.collection=
The "mqtt" source enables receiving messages from MQTT.
String
if binary setting is false
(default)byte[]
if binary setting is true
The mqtt source has the following options:
- mqtt.binary
- true to leave the payload as bytes (Boolean, default:
false
) - mqtt.charset
- the charset used to convert bytes to String (when binary is false) (String, default:
UTF-8
) - mqtt.clean-session
- whether the client and server should remember state across restarts and reconnects (Boolean, default:
true
) - mqtt.client-id
- identifies the client (String, default:
stream.client.id.source
) - mqtt.connection-timeout
- the connection timeout in seconds (Integer, default:
30
) - mqtt.keep-alive-interval
- the ping interval in seconds (Integer, default:
60
) - mqtt.password
- the password to use when connecting to the broker (String, default:
guest
) - mqtt.persistence
- 'memory' or 'file' (String, default:
memory
) - mqtt.persistence-directory
- Persistence directory (String, default:
/tmp/paho
) - mqtt.qos
- the qos; a single value for all topics or a comma-delimited list to match the topics (int[], default:
[0]
) - mqtt.topics
- the topic(s) (comma-delimited) to which the source will subscribe (String[], default:
[stream.mqtt]
) - mqtt.url
- location of the mqtt broker(s) (comma-delimited list) (String[], default:
[tcp://localhost:1883]
) - mqtt.username
- the username to use when connecting to the broker (String, default:
guest
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar mqtt-source.jar --mqtt.clientId=
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically.
You can easily create a Queue using the RabbitMQ web UI.
content-type: application/octet-stream
content-type: application/x-java-serialized-object
The rabbit source has the following options:
- rabbit.enable-retry
- true to enable retry. (Boolean, default:
false
) - rabbit.initial-retry-interval
- Initial retry interval when retry is enabled. (Integer, default:
1000
) - rabbit.mapped-request-headers
- Headers that will be mapped. (String)[], default:
[STANDARD_REQUEST_HEADERS]
) - rabbit.max-attempts
- The maximum delivery attempts when retry is enabled. (Integer, default:
3
) - rabbit.max-retry-interval
- Max retry interval when retry is enabled. (Integer, default:
30000
) - rabbit.queues
- The queues to which the source will listen for messages. (String)[], default:
<none>
) - rabbit.requeue
- Whether rejected messages should be requeued. (Boolean, default:
true
) - rabbit.retry-multiplier
- Retry backoff multiplier when retry is enabled. (Double, default:
2
) - rabbit.transacted
- Whether the channel is transacted. (Boolean, default:
false
) - spring.rabbitmq.addresses
- Comma-separated list of addresses to which the client should connect. (String, default:
<none>
) - spring.rabbitmq.connection-timeout
- Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - spring.rabbitmq.host
- RabbitMQ host. (String, default:
localhost
) - spring.rabbitmq.password
- Login to authenticate against the broker. (String, default:
guest
) - spring.rabbitmq.port
- RabbitMQ port. (Integer, default:
5672
) - spring.rabbitmq.publisher-confirms
- Whether to enable publisher confirms. (Boolean, default:
false
) - spring.rabbitmq.publisher-returns
- Whether to enable publisher returns. (Boolean, default:
false
) - spring.rabbitmq.requested-heartbeat
- Requested heartbeat timeout; zero for none. If a duration suffix is not specified,
seconds will be used. (Duration, default:
<none>
) - spring.rabbitmq.username
- Login user to authenticate to the broker. (String, default:
guest
) - spring.rabbitmq.virtual-host
- Virtual host to use when connecting to the broker. (String, default:
<none>
)
Also see the Spring Boot Documentation
for addition properties for the broker connections and listener properties.
| Note |
---|
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered). |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar rabbit-source.jar --rabbit.queues=
This source app supports transfer of files using the Amazon S3 protocol.
Files are transferred from the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
Content-Type: application/octet-stream
file_orginalFile: <java.io.File>
file_name: <file name>
A byte[]
filled with the file contents.
Content-Type: text/plain
file_orginalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
The s3 source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - s3.auto-create-local-dir
- Create or not the local directory. (Boolean, default:
true
) - s3.delete-remote-files
- Delete or not remote files after processing. (Boolean, default:
false
) - s3.filename-pattern
- The pattern to filter remote files. (String, default:
<none>
) - s3.filename-regex
- The regexp to filter remote files. (Pattern, default:
<none>
) - s3.local-dir
- The local directory to store files. (File, default:
<none>
) - s3.preserve-timestamp
- To transfer or not the timestamp of the remote file to the local one. (Boolean, default:
true
) - s3.remote-dir
- AWS S3 bucket resource. (String, default:
bucket
) - s3.remote-file-separator
- Remote File separator. (String, default:
/
) - s3.tmp-file-suffix
- Temporary file suffix. (String, default:
.tmp
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
2.13.4 Amazon AWS common options
The Amazon S3 Source (as all other Amazon AWS applications) is based on the
Spring Cloud AWS project as a foundation, and its auto-configuration
classes are used automatically by Spring Boot.
Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
- cloud.aws.credentials.accessKey
- cloud.aws.credentials.secretKey
- cloud.aws.credentials.instanceProfile
- cloud.aws.credentials.profileName
- cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
- cloud.aws.region.auto
- cloud.aws.region.static
And for AWS Stack
:
- cloud.aws.stack.auto
- cloud.aws.stack.name
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines --trigger.fixed-delay=60
This source app supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
- ref Provides a
java.io.File
reference - lines Will split files line-by-line and emit a new message for each line
- contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
N/A (Fetches files from an SFTP server).
Content-Type: application/octet-stream
file_originalFile: <java.io.File>
file_name: <file name>
A byte[]
filled with the file contents.
Content-Type: text/plain
file_originalFile: <java.io.File>
file_name: <file name>
correlationId: <UUID>
(same for each line)sequenceNumber: <n>
sequenceSize: 0
(number of lines is not know until the file is read)
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
The sftp source has the following options:
- file.consumer.markers-json
- When 'fileMarkers == true', specify if they should be produced
as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - file.consumer.mode
- The FileReadingMode to use for file reading sources.
Values are 'ref' - The File object,
'lines' - a message per line, or
'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values: ref
,lines
,contents
) - file.consumer.with-markers
- Set to true to emit start of file/end of file marker messages before/after the data.
Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
) - sftp.auto-create-local-dir
- Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - sftp.batch.batch-resource-uri
- The URI of the batch artifact to be applied to the TaskLaunchRequest. (String, default:
<empty string>
) - sftp.batch.data-source-password
- The datasource password to be applied to the TaskLaunchRequest. (String, default:
<none>
) - sftp.batch.data-source-url
- The datasource url to be applied to the TaskLaunchRequest. Defaults to h2 in-memory
JDBC datasource url. (String, default:
jdbc:h2:tcp://localhost:19092/mem:dataflow
) - sftp.batch.data-source-user-name
- The datasource user name to be applied to the TaskLaunchRequest. Defaults to "sa" (String, default:
sa
) - sftp.batch.deployment-properties
- Comma delimited list of deployment properties to be applied to the
TaskLaunchRequest. (String, default:
<none>
) - sftp.batch.environment-properties
- Comma delimited list of environment properties to be applied to the
TaskLaunchRequest. (String, default:
<none>
) - sftp.batch.job-parameters
- Comma separated list of optional job parameters in key=value format. (List<String>, default:
<none>
) - sftp.batch.local-file-path-job-parameter-name
- Value to use as the local file job parameter name. Defaults to "localFilePath". (String, default:
localFilePath
) - sftp.batch.local-file-path-job-parameter-value
- The file path to use as the local file job parameter value. Defaults to "java.io.tmpdir". (String, default:
<none>
) - sftp.batch.remote-file-path-job-parameter-name
- Value to use as the remote file job parameter name. Defaults to "remoteFilePath". (String, default:
remoteFilePath
) - sftp.delete-remote-files
- Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - sftp.factory.allow-unknown-keys
- True to allow an unknown or changed key. (Boolean, default:
false
) - sftp.factory.cache-sessions
- Cache sessions (Boolean, default:
<none>
) - sftp.factory.host
- The host name of the server. (String, default:
localhost
) - sftp.factory.known-hosts-expression
- A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - sftp.factory.pass-phrase
- Passphrase for user's private key. (String, default:
<empty string>
) - sftp.factory.password
- The password to use to connect to the server. (String, default:
<none>
) - sftp.factory.port
- The port of the server. (Integer, default:
22
) - sftp.factory.private-key
- Resource location of user's private key. (String, default:
<empty string>
) - sftp.factory.username
- The username to use to connect to the server. (String, default:
<none>
) - sftp.filename-pattern
- A filter pattern to match the names of files to transfer. (String, default:
<none>
) - sftp.filename-regex
- A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - sftp.list-only
- Set to true to return file metadata without the entire payload. (Boolean, default:
false
) - sftp.local-dir
- The local directory to use for file transfers. (File, default:
<none>
) - sftp.metadata.redis.key-name
- The key name to use when storing file metadata. Defaults to "sftpSource". (String, default:
sftpSource
) - sftp.preserve-timestamp
- Set to true to preserve the original timestamp. (Boolean, default:
true
) - sftp.remote-dir
- The remote FTP directory. (String, default:
/
) - sftp.remote-file-separator
- The remote file separator. (String, default:
/
) - sftp.stream
- Set to true to stream the file rather than copy to a local directory. (Boolean, default:
false
) - sftp.task-launcher-output
- Set to true to create output suitable for a task launch request. (Boolean, default:
false
) - sftp.tmp-file-suffix
- The suffix to use while the transfer is in progress. (String, default:
.tmp
) - trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
-1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
SECONDS
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar sftp_source.jar --sftp.remote-dir=foo --file.consumer.mode=lines --trigger.fixed-delay=60 \
--sftp.factory.host=sftpserver --sftp.factory.username=user --sftp.factory.password=pw --sftp.local-dir=/foo
The syslog source receives SYSLOG packets over UDP, TCP, or both.
RFC3164 (BSD) and RFC5424 formats are supported.
content-type: application/json
The syslog source has the following options:
- syslog.buffer-size
- the buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - syslog.nio
- whether or not to use NIO (when supporting a large number of connections). (Boolean, default:
false
) - syslog.port
- The port to listen on. (Integer, default:
1514
) - syslog.protocol
- tcp or udp (String, default:
tcp
) - syslog.reverse-lookup
- whether or not to perform a reverse lookup on the incoming socket. (Boolean, default:
false
) - syslog.rfc
- '5424' or '3164' - the syslog format according the the RFC; 3164 is aka 'BSD' format. (String, default:
3164
) - syslog.socket-timeout
- the socket timeout. (Integer, default:
0
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar syslog-source.jar --syslog.rfc=5424 --syslog.protocol=tcp
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are
available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
Content-Type: application/octet-stream
- tcp.buffer-size
- The buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - tcp.decoder
- The decoder to use when receiving messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.nio
- Whether or not to use NIO. (Boolean, default:
false
) - tcp.port
- The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - tcp.reverse-lookup
- Perform a reverse DNS lookup on the remote IP Address; if false,
just the IP address is included in the message headers. (Boolean, default:
false
) - tcp.socket-timeout
- The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - tcp.use-direct-buffers
- Whether or not to use direct buffers. (Boolean, default:
false
)
2.16.4 Available Decoders
Text Data
- CRLF (default)
- text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
- text terminated by line feed (0x0a)
- NULL
- text terminated by a null byte (0x00)
- STXETX
- text preceded by an STX (0x02) and terminated by an ETX (0x03)
Text and Binary Data
- RAW
- no structure - the client indicates a complete message by closing the socket
- L1
- data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
- data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
- data preceded by a four byte (signed) length field (up to 231-1 bytes)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar tcp_source.jar --tcp.decoder=LF
The "decoder" property determines the message format (default is termination with CRLF).
2.17 TCP Client as a Source which connects to a TCP server and receives data
Content-Type: application/octet-stream
The tcp-client source has the following options:
- tcp.buffer-size
- The buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - tcp.charset
- The charset used when converting from bytes to String. (String, default:
UTF-8
) - tcp.decoder
- The decoder to use when receiving messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.host
- The host to which this client will connect. (String, default:
localhost
) - tcp.nio
- Whether or not to use NIO. (Boolean, default:
false
) - tcp.port
- The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - tcp.retry-interval
- Retry interval (in milliseconds) to check the connection and reconnect. (Long, default:
60000
) - tcp.reverse-lookup
- Perform a reverse DNS lookup on the remote IP Address; if false,
just the IP address is included in the message headers. (Boolean, default:
false
) - tcp.socket-timeout
- The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - tcp.use-direct-buffers
- Whether or not to use direct buffers. (Boolean, default:
false
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar tcp_client_source.jar --tcp.decoder=LF
The time source will simply emit a String with the current time every so often.
A String
with the time output.
The time source has the following options:
- trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and
build it:
$ ./mvnw clean package
java -jar time-source.jar
This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can
also be sent each time the trigger fires.
The trigger source has the following options:
- trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.source.payload
- The expression for the payload of the Source module. (Expression, default:
<none>
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar trigger_source.jar --trigger.source.payload=payload-expression
The TriggerTask app sends a TaskLaunchRequest
based on a fixed delay, date or
cron expression. The TaskLaunchRequest
is used by a tasklauncher-* sink that
will deploy and launch a task. The only required property for the triggertask
is the --uri which specifies the artifact that will be launched by the
tasklauncher-* that you have selected. The user is also allowed to set the
command line arguments as well as the
Spring Boot properties
that are used by the task.
Content-Type: application/octet-stream
A byte array containing the TaskLaunchRequest
The triggertask source has the following options:
- trigger.cron
- Cron expression value for the Cron Trigger. (String, default:
<none>
) - trigger.date-format
- Format for the date value. (String, default:
<none>
) - trigger.fixed-delay
- Fixed delay for periodic triggers. (Integer, default:
1
) - trigger.initial-delay
- Initial delay for periodic triggers. (Integer, default:
0
) - trigger.max-messages
- Maximum messages per poll, -1 means infinity. (Long, default:
1
) - trigger.source.payload
- The expression for the payload of the Source module. (Expression, default:
<none>
) - trigger.time-unit
- The TimeUnit to apply to delay values. (TimeUnit, default:
<none>
, possible values: NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
) - triggertask.application-name
- The name to be applied to the launched task.. (String, default:
<empty string>
) - triggertask.command-line-args
- Space delimited key=value pairs to be used as commandline variables for the task. (String, default:
<empty string>
) - triggertask.deployment-properties
- Comma delimited key=value pairs to be used as deploymentProperties for the task. (String, default:
<empty string>
) - triggertask.environment-properties
- Comma delimited key=value pairs to be used as environmentProperties for the task. (String, default:
<empty string>
) - triggertask.uri
- The uri to the task artifact. (String, default:
<empty string>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and
build it:
$ ./mvnw clean package
java -jar trigger_task.jar --triggertask.uri=maven://org.springframework.cloud.task.app:timestamp-task:1.2.0.RELEASE
2.21 Twitter Stream Source
This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).
You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.
The twitterstream source has the following options:
- twitter.credentials.access-token
- Access token (String, default:
<none>
) - twitter.credentials.access-token-secret
- Access token secret (String, default:
<none>
) - twitter.credentials.consumer-key
- Consumer key (String, default:
<none>
) - twitter.credentials.consumer-secret
- Consumer secret (String, default:
<none>
) - twitter.stream.follow
- A comma separated list of user IDs, indicating the users to return statuses for in the stream. (String, default:
<none>
) - twitter.stream.language
- The language of the tweet text. (String, default:
<none>
) - twitter.stream.locations
- A set of bounding boxes to track. (String, default:
<none>
) - twitter.stream.stream-type
- Twitter stream type (such as sample, firehose). Default is sample. (TwitterStreamType, default:
<none>
, possible values: SAMPLE
,FIREHOSE
,FILTER
) - twitter.stream.track
- Keywords to track. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar twitter_stream_source.jar --twitter.credentials.consumerKey=<CONSUMER_KEY> --twitter.credentials.consumerSecret=<CONSUMER_SECRET> \
--twitter.credentials.accessToken=<ACCESS_TOKEN> --twitter.credentials.accessTokenSecret=<ACCESS_TOKEN_SECRET>