4. Sinks

4.1 Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

4.1.1 Input

Headers:

  • Content-Type: application/json

Payload:

A JSON String or byte array representing the entity (or a list of entities) to be persisted

4.1.2 Output

N/A

4.1.3 Options

The cassandra sink has the following options:

cassandra.async
Async mode for CassandraMessageHandler. (Boolean, default: true)
cassandra.cluster.create-keyspace
Flag to create (or not) keyspace on application startup. (Boolean, default: false)
cassandra.cluster.entity-base-packages
Base packages to scan for entities annotated with Table annotations. (String[], default: [])
cassandra.cluster.init-script
Resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default: <none>)
cassandra.cluster.metrics-enabled
Enable/disable metrics collection for the created cluster. (Boolean, default: <none>)
cassandra.cluster.skip-ssl-validation
Flag to validate the Servers' SSL certs (Boolean, default: false)
cassandra.consistency-level
The consistency level for write operation. (ConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL,LOCAL_ONE)
cassandra.ingest-query
Ingest Cassandra query. (String, default: <none>)
cassandra.query-type
QueryType for Cassandra Sink. (Type, default: <none>, possible values: INSERT,UPDATE,DELETE,STATEMENT)
cassandra.statement-expression
Expression in Cassandra query DSL style. (Expression, default: <none>)
cassandra.ttl
Time-to-live option of WriteOptions. (Integer, default: 0)
spring.data.cassandra.cluster-name
Name of the Cassandra cluster. (String, default: <none>)
spring.data.cassandra.compression
Compression supported by the Cassandra binary protocol. (Compression, default: none, possible values: `,`snappy,lz4)
spring.data.cassandra.connect-timeout
Socket option: connection time out. (Duration, default: <none>)
spring.data.cassandra.consistency-level
Queries consistency level. (ConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL,LOCAL_ONE)
spring.data.cassandra.contact-points
Cluster node addresses. (List<String>, default: [localhost])
spring.data.cassandra.fetch-size
Queries default fetch size. (Integer, default: <none>)
spring.data.cassandra.jmx-enabled
Whether to enable JMX reporting. Default to false as Cassandra JMX reporting is not compatible with Dropwizard Metrics. (Boolean, default: false)
spring.data.cassandra.keyspace-name
Keyspace name to use. (String, default: <none>)
spring.data.cassandra.load-balancing-policy
Class name of the load balancing policy. The class must have a default constructor. (Class<LoadBalancingPolicy>, default: <none>)
spring.data.cassandra.password
Login password of the server. (String, default: <none>)
spring.data.cassandra.port
Port of the Cassandra server. (Integer, default: <none>)
spring.data.cassandra.read-timeout
Socket option: read time out. (Duration, default: <none>)
spring.data.cassandra.reconnection-policy
Class name of the reconnection policy. The class must have a default constructor. (Class<ReconnectionPolicy>, default: <none>)
spring.data.cassandra.retry-policy
Class name of the retry policy. The class must have a default constructor. (Class<RetryPolicy>, default: <none>)
spring.data.cassandra.schema-action
Schema action to take at startup. (String, default: none)
spring.data.cassandra.serial-consistency-level
Queries serial consistency level. (ConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL,LOCAL_ONE)
spring.data.cassandra.ssl
Enable SSL support. (Boolean, default: false)
spring.data.cassandra.username
Login user of the server. (String, default: <none>)

4.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.1.5 Examples

The following example assumes a JSON payload is sent to a default destination called input, the sink parses some of its properties (id,time,customer_id,value) and persists them into a table called orders.

java -jar cassandra_sink.jar --cassandra.cluster.keyspace=test
--cassandra.ingest-query="insert into orders(id,time,customer_id, value) values (?,?,?,?)"

4.2 Counter Sink

Counter that compute multiple counters from the received messages. It leverages the Micrometer library and can use various popular TSDB to persist the counter values.

By default the Counter Sink increments the message.name counter on every received message. The message-counter-enabled allows you to disable this counter when required.

If tag expressions are provided (via the counter.tag.expression.<tagKey>=<tagValue SpEL expression> property) then the `name counter is incremented. Note that each SpEL expression can evaluate into multiple values resulting into multiple counter increments (one fore every value resolved).

If fixed tags are provided they are include in all message and expression counter increment measurements.

Counter’s implementation is based on the Micrometer library which is a Vendor-neutral application metrics facade that supports the most popular monitoring systems. See the Micrometer documentation for the list of supported monitoring systems. Starting with Spring Boot 2.0, Micrometer is the instrumentation library powering the delivery of application metrics from Spring Boot.

All Spring Cloud Stream App Starters are configured to support two of the most popular monitoring systems, Prometheus and InfluxDB. You can declaratively select which monitoring system to use. If you are not using Prometheus or InfluxDB, you can customise the App starters to use a different monitoring system as well as include your preferred micrometer monitoring system library in your own custom applications.

Grafana is a popular platform for building visualization dashboards.

To enable Micrometer’s Prometheus meter registry for Spring Cloud Stream application starters, set the following properties.

management.metrics.export.prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus

and disable the application’s security which allows for a simple Prometheus configuration to scrape counter information by setting the following property.

spring.cloud.streamapp.security.enabled=false

To enable Micrometer’s Influx meter registry for Spring Cloud Stream application starters, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
[Note]Note

if the Data Flow Server metrics is enabled then the Counter will reuse the exiting configurations.

Following diagram illustrates Counter’s information collection and processing flow.

Counter Architecture

4.2.1 Options

counter.amount-expression
A SpEL expression (against the incoming Message) to derive the amount to add to the counter. If not set the counter is incremented by 1.0 (Expression, default: <none>)
counter.message-counter-enabled
Enables counting the number of messages processed. Uses the 'message.' counter name prefix to distinct it form the expression based counter. The message counter includes the fixed tags when provided. (Boolean, default: true)
counter.name
The name of the counter to increment. The 'name' and 'nameExpression' are mutually exclusive. Only one can be set. (String, default: <none>)
counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. The 'name' and 'nameExpression' are mutually exclusive. Only one can be set. (Expression, default: <none>)
counter.tag.expression
Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate counter increment. Tag expression format is: counter.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default: <none>)
counter.tag.fixed
Custom tags assigned to every counter increment measurements. This is a map so the property convention fixed tags is: counter.tag.fixed.[tag-name]=[tag-value] (Map<String, String>, default: <none>)

4.3 File Sink

This module writes each message it receives to a file.

4.3.1 Input

Headers

N/A

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.3.2 Output

N/A (writes to the file system).

4.3.3 Options

The file sink has the following options:

file.binary
A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default: false)
file.charset
The charset to use when writing text content. (String, default: UTF-8)
file.directory
The parent directory of the target file. (String, default: <none>)
file.directory-expression
The expression to evaluate for the parent directory of the target file. (Expression, default: <none>)
file.mode
The FileExistsMode to use if the target file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
file.name
The name of the target file. (String, default: file-sink)
file.name-expression
The expression to evaluate for the name of the target file. (String, default: <none>)
file.suffix
The suffix to append to file name. (String, default: <empty string>)

4.3.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar file_sink.jar --file.directory=/tmp/bar

4.4 FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.4.1 Input

Headers

  • file_name (See note above)

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.4.2 Output

N/A (writes to the FTP server).

4.4.3 Options

The ftp sink has the following options:

ftp.auto-create-dir
Whether or not to create the remote directory. (Boolean, default: true)
ftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
ftp.factory.client-mode
The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)
ftp.factory.host
<documentation missing> (String, default: <none>)
ftp.factory.password
<documentation missing> (String, default: <none>)
ftp.factory.port
The port of the server. (Integer, default: 21)
ftp.factory.username
<documentation missing> (String, default: <none>)
ftp.filename-expression
A SpEL expression to generate the remote file name. (Expression, default: <none>)
ftp.mode
Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
ftp.remote-dir
The remote FTP directory. (String, default: /)
ftp.remote-file-separator
The remote file separator. (String, default: /)
ftp.temporary-remote-dir
A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default: /)
ftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
ftp.use-temporary-filename
Whether or not to write to a temporary file and rename. (Boolean, default: true)

4.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar ftp_sink.jar --ftp.remote-dir=bar --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw

4.5 Gemfire Sink

The Gemfire sink allows one to write message payloads to a Gemfire server.

To enable SSL communication between Geode Sink and the Geode cluster you need to provide the URIs of the Keystore and Truststore files using the gemfire.security.ssl.keystore-uri and gemfire.security.ssl.truststore-uri properties. (If a single file is used for both stores then point both URIs to it).

4.5.1 Input

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/x-java-serialized-object

Payload

  • java.io.Serializable

4.5.2 Output

N/A

4.5.3 Options

The gemfire sink has the following options:

gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.region.region-name
The region name. (String, default: <none>)
gemfire.security.password
The cache password. (String, default: <none>)
gemfire.security.ssl.ciphers
Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default: any)
gemfire.security.ssl.keystore-type
Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.keystore-uri
Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.ssl-keystore-password
Password for accessing the keys truststore (String, default: <none>)
gemfire.security.ssl.ssl-truststore-password
Password for accessing the trust store. (String, default: <none>)
gemfire.security.ssl.truststore-type
Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default: JKS)
gemfire.security.ssl.truststore-uri
Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default: <none>)
gemfire.security.ssl.user-home-directory
Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default: user.home)
gemfire.security.username
The cache username. (String, default: <none>)
gemfire.sink.json
Indicates if the Gemfire region stores json objects as native Gemfire PdxInstance (Boolean, default: false)
gemfire.sink.key-expression
SpEL expression to use as a cache key (String, default: <none>)

4.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.5.5 Examples

java -jar gemfire-sink.jar --gemfire.sink.keyExpression=

4.6 Gpfdist Sink

A sink module that route messages into GPDB/HAWQ segments via gpfdist protocol. Internally, this sink creates a custom http listener that supports the gpfdist protcol and schedules a task that orchestrates a gploadd session in the same way it is done natively in Greenplum.

No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.

4.6.1 Input

Headers:

  • Content-Type: text/plain

Payload:

  • String

4.6.2 Output

N/A

4.6.3 Options

The gpfdist sink has the following options:

gpfdist.batch-count
Number of windowed batch each segment takest (int, default: 100) (Integer, default: 100)
gpfdist.batch-period
Time in seconds for each load operation to sleep in between operations (int, default: 10) (Integer, default: 10)
gpfdist.batch-timeout
Timeout in seconds for segment inactivity. (Integer, default: 4) (Integer, default: 4)
gpfdist.column-delimiter
Data record column delimiter. *(Character, default: no default) (Character, default: <none>)
gpfdist.control-file
Path to yaml control file (String, no default) (Resource, default: <none>)
gpfdist.db-host
Database host (String, default: localhost) (String, default: localhost)
gpfdist.db-name
Database name (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-password
Database password (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-port
Database port (int, default: 5432) (Integer, default: 5432)
gpfdist.db-user
Database user (String, default: gpadmin) (String, default: gpadmin)
gpfdist.delimiter
Data line delimiter (String, default: newline character) (String, default: )
gpfdist.flush-count
Flush item count (int, default: 100) (Integer, default: 100)
gpfdist.flush-time
Flush item time (int, default: 2) (Integer, default: 2)
gpfdist.gpfdist-port
Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0) (Integer, default: 0)
gpfdist.log-errors
Enable log errors. (Boolean, default: false) (Boolean, default: false)
gpfdist.match-columns
Match columns with update (String, no default) (String, default: <none>)
gpfdist.mode
Mode, either insert or update (String, no default) (String, default: <none>)
gpfdist.null-string
Null string definition. (String, default: ``) (String, default: <none>)
gpfdist.rate-interval
Enable transfer rate interval (int, default: 0) (Integer, default: 0)
gpfdist.segment-reject-limit
Error reject limit. (String, default: ``) (String, default: <none>)
gpfdist.segment-reject-type
Error reject type, either `rows` or `percent`. (String, default: `rows`) (SegmentRejectType, default: <none>, possible values: ROWS,PERCENT)
gpfdist.sql-after
Sql to run after load (String, no default) (String, default: <none>)
gpfdist.sql-before
Sql to run before load (String, no default) (String, default: <none>)
gpfdist.table
Target database table (String, no default) (String, default: <none>)
gpfdist.update-columns
Update columns with update (String, no default) (String, default: <none>)
spring.net.hostdiscovery.loopback
The new loopback flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.match-interface
The new match interface regex pattern. Default value is is empty (String, default: <none>)
spring.net.hostdiscovery.match-ipv4
Used to match ip address from a network using a cidr notation (String, default: <none>)
spring.net.hostdiscovery.point-to-point
The new point to point flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.prefer-interface
The new preferred interface list (List<String>, default: <none>)

4.6.4 Implementation Notes

Within a gpfdist sink we have a Reactor based stream where data is published from the incoming SI channel. This channel receives data from the Message Bus. The Reactor stream is then connected to Netty based http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among existing http clients. When Greenplum does a load from an external table, each segment will initiate a http connection and start loading data. The net effect is that incoming data is automatically spread among the Greenplum segments.

4.6.5 Detailed Option Descriptions

The gpfdist sink supports the following configuration properties:

table

Database table to work with. (String, default: ``, required)

This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.

Currently table is only way to define a structure of an external table. Effectively it will replace other_table in below clause segment.

CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
mode

Gpfdist mode, either `insert` or `update`. (String, default: insert)

Currently only insert and update gpfdist mode is supported. Mode merge familiar from a native gpfdist loader is not yet supported.

For mode update options matchColumns and updateColumns are required.

columnDelimiter

Data record column delimiter. (Character, default: ``)

Defines used delimiter character in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[DELIMITER AS 'delimiter']
segmentRejectLimit

Error reject limit. (String, default: ``)

Defines a count value in a below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]

As a conveniance this reject limit also recognizes a percentage format 2% and if used, segmentRejectType is automatically set to percent.

segmentRejectType

Error reject type, either `rows` or `percent`. (String, default: ``)

Defines ROWS or PERCENT in below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
logErrors

Enable or disable log errors. (Boolean, default: false)

As error logging is optional with SEGMENT REJECT LIMIT, it’s only used if both segmentRejectLimit and segmentRejectType are set. Enables the error log in below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
nullString

Null string definition. (String, default: ``)

Defines used null string in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[NULL AS 'null string']
delimiter

Data record delimiter for incoming messages. (String, default: \n)

On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.

 

If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered.

 
 -- External Table Docs
matchColumns

Comma delimited list of columns to match. (String, default: ``)

[Note]Note

See more from examples below.

updateColumns

Comma delimited list of columns to update. (String, default: ``)

[Note]Note

See more from examples below.

sqlBefore
Sql clause to run before each load operation. (String, default: ``)
sqlAfter
Sql clause to run after each load operation. (String, default: ``)
rateInterval

Debug rate of data transfer. (Integer, default: 0)

If set to non zero, sink will log a rate of messages passing throught a sink after number of messages denoted by this setting has been processed. Value 0 means that this rate calculation and logging is disabled.

flushCount

Max collected size per windowed data. (Integer, default: 100)

[Note]Note

For more info on flush and batch settings, see above.

4.6.6 How Data Is Sent Into Segments

There are few important concepts involving how data passes into a sink, through it and finally lands into a database.

  • Sink has its normal message handler for incoming data from a source module, gpfdist protocol listener based on netty where segments connect to and in between those two a reactor based streams controlling load balancing into different segment connections.
  • Incoming data is first sent into a reactor which first constructs a windows. This window is then released into a downstream when it gets full(flushTime) or timeouts(flushTime) if window doesn’t get full. One window is then ready to get send into a segment.
  • Segments which connects to this stream are now able to see a stream of window data, not stream of individual messages. We can also call this as a stream of batches.
  • When segment makes a connection to a protocol listener it subscribes itself into this stream and takes count of batches denoted by batchCount and completes a stream if it got enough batches or if batchTimeout occurred due to inactivity.
  • It doesn’t matter how many simultaneous connections there are from a database cluster at any given time as reactor will load balance batches with all subscribers.
  • Database cluster will initiate this loading session when select is done from an external table which will point to this sink. These loading operations are run in a background in a loop one after another. Option batchPeriod is then used as a sleep time in between these load sessions.

Lets take a closer look how options flushCount, flushTime, batchCount, batchTimeout and batchPeriod work.

As in a highest level where incoming data into a sink is windowed, flushCount and flushTime controls when a batch of messages are sent into a downstream. If there are a lot of simultaneous segment connections, flushing less will keep more segments inactive as there is more demand for batches than what flushing will produce.

When existing segment connection is active and it has subscribed itself with a stream of batches, data will keep flowing until either batchCount is met or batchTimeout occurs due to inactivity of data from an upstream. Higher a batchCount is more data each segment will read. Higher a batchTimeout is more time segment will wait in case there is more data to come.

As gpfdist load operations are done in a loop, batchPeriod simply controls not to run things in a buzy loop. Buzy loop would be ok if there is a constant stream of data coming in but if incoming data is more like bursts then buzy loop would be unnecessary.

[Note]Note

Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully.

Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.

4.6.7 Example Usage

In this first example we’re just creating a simple stream which inserts data from a time source. Let’s create a table with two text columns.

gpadmin=# create table ticktock (date text, time text);

Create a simple stream gpstream.

dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy

Let it run and see results from a database.

gpadmin=# select count(*) from ticktock;
 count
-------
    14
(1 row)

In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.

gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');

Now table looks like this.

gpadmin=# select * from httpdata;
 col1  | col2 | col3
-------+------+------
 DATA3 | DATA | DATA
 DATA2 | DATA | DATA
 DATA1 | DATA | DATA
(3 rows)

Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.

dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy

Post some data into a stream which will be passed into a gpfdist sink via http source.

curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/

If you query table again, you’ll see that row for DATA1 has been updated.

gpadmin=# select * from httpdata;
 col1  | col2  | col3
-------+-------+-------
 DATA3 | DATA  | DATA
 DATA2 | DATA  | DATA
 DATA1 | DATA1 | DATA1
(3 rows)

4.6.8 Tuning Transfer Rate

Default values for options flushCount, flushTime, batchCount, batchTimeout and batchPeriod are relatively conservative and needs to be tuned for every use case for optimal performance. Order to make a decision on how to tune sink behaviour to suit your needs few things needs to be considered.

  • What is an average size of messages ingested by a sink.
  • How fast you want data to become visible in a database.
  • Is incoming data a constant flow or a bursts of data.

Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.

Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.

Longer a load session for each segment is active higher the overall transfer rate is going to be. Option batchCount naturally controls this. However option batchTimeout then really controls how fast each segment will complete a stream due to inactivity from upstream and to step away from a loading session to allow distributes session to finish and data become visible in a database.

4.6.9 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.6.10 Examples

See above.

4.7 HDFS Sink

This module writes each message it receives to HDFS.

4.7.1 Input

Headers

Payload

Any

4.7.2 Output

N/A

4.7.3 Options

The hdfs sink has the following options:

hdfs.close-timeout
Timeout in ms, regardless of activity, after which file will be automatically closed. (Long, default: 0)
hdfs.codec
Compression codec alias name (gzip, snappy, bzip2, lzo, or slzo). (String, default: <none>)
hdfs.directory
Base path to write files to. (String, default: <none>)
hdfs.enable-sync
Whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput. (Boolean, default: false)
hdfs.file-extension
The base filename extension to use for the created files. (String, default: txt)
hdfs.file-name
The base filename to use for the created files. (String, default: <none>)
hdfs.file-open-attempts
Maximum number of file open attempts to find a path. (Integer, default: 10)
hdfs.file-uuid
Whether file name should contain uuid. (Boolean, default: false)
hdfs.flush-timeout
Timeout in ms, regardless of activity, after which data written to file will be flushed. (Long, default: 0)
hdfs.fs-uri
URL for HDFS Namenode. (String, default: <none>)
hdfs.idle-timeout
Inactivity timeout in ms after which file will be automatically closed. (Long, default: 0)
hdfs.in-use-prefix
Prefix for files currently being written. (String, default: <none>)
hdfs.in-use-suffix
Suffix for files currently being written. (String, default: <none>)
hdfs.overwrite
Whether writer is allowed to overwrite files in Hadoop FileSystem. (Boolean, default: false)
hdfs.partition-path
A SpEL expression defining the partition path. (String, default: <none>)
hdfs.rollover
Threshold in bytes when file will be automatically rolled over. (Integer, default: 1000000000)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.7.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.7.5 Examples

java -jar hdfs-sink.jar --fsUri

4.8 Jdbc Sink

A module that writes its incoming payload to an RDBMS using JDBC.

4.8.1 Input

Headers

Payload

  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

4.8.2 Output

N/A

4.8.3 Options

The jdbc sink has the following options:

jdbc.columns
The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())
jdbc.initialize
'true', 'false' or the location of a custom initialization script for the table. (String, default: false)
jdbc.table-name
The name of the table to write into. (String, default: messages)
spring.datasource.data
Data (DML) script resource references. (List<String>, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.initialization-mode
Initialize the datasource using available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.schema
Schema (DDL) script resource references. (List<String>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)

The jdbc.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name"
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.8.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.8.5 Examples

java -jar jdbc-sink.jar --jdbc.tableName=names --jdbc.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

4.9 Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

4.9.1 Input

Headers

Payload

Any

4.9.2 Output

N/A

4.9.3 Options

The log sink has the following options:

log.expression
A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)
log.level
The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)
log.name
The name of the logger to use. (String, default: <none>)

4.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.9.5 Examples

java -jar log-sink.jar

4.10 RabbitMQ Sink

This module sends messages to RabbitMQ.

4.10.1 Input

Headers

  • propagated headers from the binder, including an original contentType

Payload

  • byte[]

4.10.2 Output

N/A

4.10.3 Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

rabbit.exchange
Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)
rabbit.exchange-expression
A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)
rabbit.mapped-request-headers
Headers that will be mapped. (String[], default: [*])
rabbit.own-connection
When true, use a separate connection based on the boot properties. (Boolean, default: false)
rabbit.persistent-delivery-mode
Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)
rabbit.routing-key
Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)
rabbit.routing-key-expression
A SpEL expression that evaluates to a routing key. (Expression, default: <none>)
spring.rabbitmq.addresses
Comma-separated list of addresses to which the client should connect. (String, default: <none>)
spring.rabbitmq.connection-timeout
Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)
spring.rabbitmq.host
RabbitMQ host. (String, default: localhost)
spring.rabbitmq.password
Login to authenticate against the broker. (String, default: guest)
spring.rabbitmq.port
RabbitMQ port. (Integer, default: 5672)
spring.rabbitmq.publisher-confirms
Whether to enable publisher confirms. (Boolean, default: false)
spring.rabbitmq.publisher-returns
Whether to enable publisher returns. (Boolean, default: false)
spring.rabbitmq.requested-heartbeat
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)
spring.rabbitmq.username
Login user to authenticate to the broker. (String, default: guest)
spring.rabbitmq.virtual-host
Virtual host to use when connecting to the broker. (String, default: <none>)

4.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.10.5 Examples

java -jar rabbit-sink.jar --rabbit.routingKey=
java -jar rabbit-sink.jar --rabbit.routingKeyExpression=

4.11 MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

4.11.1 Input

Headers

Payload

  • Any POJO
  • String
  • byte[]

4.11.2 Output

N/A

4.11.3 Options

The mongodb sink has the following options:

mongodb.collection
The MongoDB collection to store data (String, default: <none>)
mongodb.collection-expression
The SpEL expression to evaluate MongoDB collection (Expression, default: <none>)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with URI. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with URI. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: mongodb://localhost/test)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

Also see the Spring Boot Documentation for additional MongoProperties properties.

4.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.11.5 Examples

java -jar mongodb-sink.jar --mongodb.collection=
java -jar mongodb-sink.jar --mongodb.collectionExpression=

4.12 MQTT Sink

This module sends messages to MQTT.

4.12.1 Input

Headers:

Payload:

  • byte[]
  • String

4.12.2 Output

N/A

4.12.3 Options

The mqtt sink has the following options:

mqtt.async
whether or not to use async sends (Boolean, default: false)
mqtt.charset
the charset used to convert a String payload to byte[] (String, default: UTF-8)
mqtt.clean-session
whether the client and server should remember state across restarts and reconnects (Boolean, default: true)
mqtt.client-id
identifies the client (String, default: stream.client.id.sink)
mqtt.connection-timeout
the connection timeout in seconds (Integer, default: 30)
mqtt.keep-alive-interval
the ping interval in seconds (Integer, default: 60)
mqtt.password
the password to use when connecting to the broker (String, default: guest)
mqtt.persistence
'memory' or 'file' (String, default: memory)
mqtt.persistence-directory
Persistence directory (String, default: /tmp/paho)
mqtt.qos
the quality of service to use (Integer, default: 1)
mqtt.retained
whether to set the 'retained' flag (Boolean, default: false)
mqtt.topic
the topic to which the sink will publish (String, default: stream.mqtt)
mqtt.url
location of the mqtt broker(s) (comma-delimited list) (String[], default: [tcp://localhost:1883])
mqtt.username
the username to use when connecting to the broker (String, default: guest)

4.12.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.12.5 Examples

java -jar mqtt-sink.jar --mqtt.clientid= --mqtt.topic=

4.13 Pgcopy Sink

A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.

4.13.1 Input

Headers

Payload

  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

4.13.2 Output

N/A

4.13.3 Options

The jdbc sink has the following options:

pgcopy.batch-size
Threshold in number of messages when data will be flushed to database table. (Integer, default: 10000)
pgcopy.columns
The names of the columns that shall receive data. Also used at initialization time to issue the DDL. (List<String>, default: payload)
pgcopy.delimiter
Specifies the character that separates columns within each row (line) of the file. The default is a tab character in text format, a comma in CSV format. This must be a single one-byte character. Using an escaped value like '\t' is allowed. (String, default: <none>)
pgcopy.error-table
The name of the error table used for writing rows causing errors. The error table should have three columns named "table_name", "error_message" and "payload" large enough to hold potential data values. You can use the following DDL to create this table: 'CREATE TABLE ERRORS (TABLE_NAME VARCHAR(255), ERROR_MESSAGE TEXT,PAYLOAD TEXT)' (String, default: <none>)
pgcopy.escape
Specifies the character that should appear before a data character that matches the QUOTE value. The default is the same as the QUOTE value (so that the quoting character is doubled if it appears in the data). This must be a single one-byte character. This option is allowed only when using CSV format. (Character, default: <none>)
pgcopy.format
Format to use for the copy command. (Format, default: <none>, possible values: TEXT,CSV)
pgcopy.idle-timeout
Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)
pgcopy.initialize
'true', 'false' or the location of a custom initialization script for the table. (String, default: false)
pgcopy.null-string
Specifies the string that represents a null value. The default is \N (backslash-N) in text format, and an unquoted empty string in CSV format. (String, default: <none>)
pgcopy.quote
Specifies the quoting character to be used when a data value is quoted. The default is double-quote. This must be a single one-byte character. This option is allowed only when using CSV format. (Character, default: <none>)
pgcopy.table-name
The name of the table to write into. (String, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

For integration tests to run, start a PostgreSQL database on localhost:

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

4.13.5 Examples

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

4.14 Redis Sink

This module sends messages to Redis store.

4.14.1 Input

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/octet-stream

Payload

  • byte[]

4.14.2 Output

N/A

4.14.3 Options

The redis sink has the following options:

redis.key
A literal key name to use when storing to a key. (String, default: <none>)
redis.key-expression
A SpEL expression to use for storing to a key. (Expression, default: <none>)
redis.queue
A literal queue name to use when storing in a queue. (String, default: <none>)
redis.queue-expression
A SpEL expression to use for queue. (Expression, default: <none>)
redis.topic
A literal topic name to use when publishing to a topic. (String, default: <none>)
redis.topic-expression
A SpEL expression to use for topic. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.jedis.pool.max-active
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)
spring.redis.jedis.pool.max-idle
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)
spring.redis.jedis.pool.max-wait
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)
spring.redis.jedis.pool.min-idle
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)
spring.redis.jedis.pool.time-between-eviction-runs
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)
spring.redis.lettuce.pool.max-active
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)
spring.redis.lettuce.pool.max-idle
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)
spring.redis.lettuce.pool.max-wait
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)
spring.redis.lettuce.pool.min-idle
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default: 0)
spring.redis.lettuce.pool.time-between-eviction-runs
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default: <none>)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.sentinel.master
Name of the Redis server. (String, default: <none>)
spring.redis.sentinel.nodes
Comma-separated list of "host:port" pairs. (List<String>, default: <none>)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

4.14.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.14.5 Examples

java -jar redis-pubsub-sink.jar --redis.queue=
java -jar redis-pubsub-sink.jar --redis.queueExpression=
java -jar redis-pubsub-sink.jar --redis.key=
java -jar redis-pubsub-sink.jar --redis.keyExpression=
java -jar redis-pubsub-sink.jar --redis.topic=
java -jar redis-pubsub-sink.jar --redis.topicExpression=

4.15 Router Sink

This application routes messages to named channels.

4.15.1 Input

Headers

Payload

Any

4.15.2 Output

N/A

4.15.3 Options

4.15.4 Options

The router sink has the following options:

router.default-output-channel
Where to send unroutable messages. (String, default: nullChannel)
router.destination-mappings
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.expression
The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default: <none>)
router.refresh-delay
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)
router.resolution-required
Whether or not channel resolution is required. (Boolean, default: false)
router.script
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)
router.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)
[Note]Note

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

4.15.5 SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

[Note]Note

Starting with Spring Cloud Stream 2.0 onwards the message wire format for json, text and xml content types is byte[] not String! This is an altering change from SCSt 1.x that treats those types as Strings! Depends on the content type, different techniques for handling the byte[] payloads are available. For plain text content types, one can covert the octet payload into string using the new String(payload) SpEL expression. For json types the jsonPath() SpEL utility already supports string and byte array content interchangeably. Same applies for the xml content type and the #xpath() SpEL utility.

For example for text content type one should use:

 new String(payload).contains('a')

and for json content type SpEL expressions like this:

 #jsonPath(payload, '$.person.name')

4.15.6 Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

4.15.7 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.15.8 Examples

java -jar router-sink.jar --expression="new String(payload).contains('a')?':foo':':bar'"
java -jar router-sink.jar --script=" "

4.16 Amazon S3 Sink

This sink app supports transfer files to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain payload as:

  • File, including directories for recursive upload;
  • InputStream;
  • byte[]

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

4.16.1 Input

Headers

N/A

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.16.2 Output

N/A

4.16.3 Options

The s3 sink has the following options:

s3.acl
S3 Object access control list. (CannedAccessControlList, default: <none>, possible values: private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)
s3.acl-expression
Expression to evaluate S3 Object access control list. (Expression, default: <none>)
s3.bucket
AWS bucket for target file(s) to store. (String, default: <none>)
s3.bucket-expression
Expression to evaluate AWS bucket name. (Expression, default: <none>)
s3.key-expression
Expression to evaluate S3 Object key. (Expression, default: <none>)

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean.

4.16.4 Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey
  • cloud.aws.credentials.secretKey
  • cloud.aws.credentials.instanceProfile
  • cloud.aws.credentials.profileName
  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto
  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto
  • cloud.aws.stack.name

4.16.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar s3-sink.jar --s3.bucket=/tmp/bar

4.17 SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

4.17.1 Input

Headers

  • file_name (See note above)

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.17.2 Output

N/A (writes to the SFTP server).

4.17.3 Options

The sftp sink has the following options:

sftp.auto-create-dir
Whether or not to create the remote directory. (Boolean, default: true)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.cache-sessions
Cache sessions (Boolean, default: <none>)
sftp.factory.host
The host name of the server. (String, default: localhost)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
The password to use to connect to the server. (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (Resource, default: <none>)
sftp.factory.username
The username to use to connect to the server. (String, default: <none>)
sftp.filename-expression
A SpEL expression to generate the remote file name. (Expression, default: <none>)
sftp.mode
Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
sftp.remote-dir
The remote FTP directory. (String, default: /)
sftp.remote-file-separator
The remote file separator. (String, default: /)
sftp.temporary-remote-dir
A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default: /)
sftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
sftp.use-temporary-filename
Whether or not to write to a temporary file and rename. (Boolean, default: true)

4.17.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar sftp_sink.jar --sftp.remote-dir=bar --sftp.factory.host=sftpserver \
         --sftp.factory.username=user --sftp.factory.password=pw

4.18 TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

4.18.1 Input

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

Headers:

  • Content-Type: text/plain

Payload:

  • String

4.18.2 Output

N/A

4.18.3 Options

The tcp sink has the following options:

tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.close
Whether to close the socket after each message. (Boolean, default: false)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: <none>)
tcp.nio
Whether or not to use NIO. (Boolean, default: false)
tcp.port
The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)
tcp.reverse-lookup
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)
tcp.socket-timeout
The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)
tcp.use-direct-buffers
Whether or not to use direct buffers. (Boolean, default: false)

4.18.4 Available Encoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

4.18.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.18.6 Examples

java -jar tcp_sink.jar --tcp.encoder=LF

4.19 Throughput Sink

A simple handler that will count messages and log witnessed throughput at a selected interval.

4.19.1 Input

Headers

Payload

Any

4.19.2 Output

N/A

4.19.3 Options

4.19.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.19.5 Examples

java -jar throughput-sink.jar

4.20 Websocket Sink

A simple Websocket Sink implementation.

4.20.1 Input

Headers

Payload

Any

4.20.2 Output

N/A

4.20.3 Options

The following commmand line arguments are supported:

websocket.log-level
the logLevel for netty channels. Default is <tt>WARN</tt> (String, default: <none>)
websocket.path
the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default: /websocket)
websocket.port
the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default: 9292)
websocket.ssl
whether or not to create a {@link io.netty.handler.ssl.SslContext} (Boolean, default: false)
websocket.threads
the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default: 1)

4.20.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.20.5 Examples

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Rabbitmq

Step 2: Deploy a time-source

Step 3: Deploy a websocket-sink (the app that contains this starter jar)

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.stream.module.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

4.20.6 Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketsinktrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketsinktrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser

[Note]Note

For SSL mode (--ssl=true) a self signed certificate is used that might cause troubles with some Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not self-signed) certificate.

4.21 TaskLauncher Data Flow Sink

This application launches a registered task definition using the Data Flow Server REST API.

4.21.1 Input

Launch request args including:

  • the task name (required and created as a task with the target Data Flow Server)
  • deployment properties (key value pairs, optional).
  • program arguments for the task (a list, optional).

Headers:

  • Content-Type: application/json

Payload:

A JSON document:

{
  "name":"foo",
  "deploymentProps": {"key1":"val1","key2":"val2"},
  "args":["--debug", "--foo", "bar"]
}

minimally,

{"name":"foo"}

4.21.2 Output

N/A (launches task on the SCDF server’s task platform).

Options

The tasklauncher-dataflow sink supports the following configuration properties:

platform-name
The Spring Cloud Data Flow platform to use for launching tasks. (String, default: default)
spring.cloud.dataflow.client.authentication.access-token
OAuth2 Access Token. (String, default: <none>)
spring.cloud.dataflow.client.authentication.basic.password
The login password. (String, default: <none>)
spring.cloud.dataflow.client.authentication.basic.username
The login username. (String, default: <none>)
spring.cloud.dataflow.client.authentication.client-id
OAuth2 Client Id. (String, default: <none>)
spring.cloud.dataflow.client.authentication.client-secret
OAuth2 Client Secret. (String, default: <none>)
spring.cloud.dataflow.client.authentication.scope
OAuth2 Scopes. (Set<String>, default: <none>)
spring.cloud.dataflow.client.authentication.token-uri
OAuth2 Token Uri. (String, default: <none>)
spring.cloud.dataflow.client.enable-dsl
Enable Data Flow DSL access. (Boolean, default: false)
spring.cloud.dataflow.client.server-uri
The Data Flow server URI. (String, default: http://localhost:9393)
spring.cloud.dataflow.client.skip-ssl-validation
Skip Ssl validation. (Boolean, default: false)
trigger.initial-delay
The initial delay in milliseconds. (Integer, default: 1000)
trigger.max-period
The maximum polling period in milliseconds. Will be set to period if period > maxPeriod. (Integer, default: 30000)
trigger.period
The polling period in milliseconds. (Integer, default: 1000)

4.21.3 Using the TaskLauncher

Th Dataflow tasklauncher is a sink that consumes LaunchRequest messages, as described above, and launches a task using the configured Spring Cloud Data Flow server (given by --spring.cloud.dataflow.client.server-uri). The task launcher periodically polls its input source for launch requests but will pause polling when the platform has reached it’s concurrent task execution limit, given by spring.cloud.dataflow.task.platform.<platform-type>.accounts[<account-name>].maximum-concurrent-tasks. This prevents the SCDF deployer’s deployment platform from exhausting its resources under heavy task load. The poller is scheduled using a DynamicPeriodicTrigger. By default the initial polling rate is 1 second, but may be configured to any duration. When polling is paused, or if there are no launch requests present, the trigger period will increase, applying exponential backoff, up to a configured maximum (30 seconds by default).

[Note]Note

This version of the Data Flow task launcher requires SCDF version 2.4.x.

The SCDF 2.4.x server may be configured to launch tasks on multiple platforms. Each task launcher instance is configured for a single platform, given by the platformName property (default if not specified). This limitation is enforced because if the server has multiple task platforms configured, it may be the case that some of its task platforms are at the limit and some are not. In this situation, we can only consume the next launch request if we know for which task platform it is targeted. For this reason, if the SCDF server is configured for multiple task platforms (or a single non-default platform), we assume that all launch requests are targeted for that platform. The task launcher will set the required deployment property spring.cloud.dataflow.task.platformName if the request does not provide it.

[Note]Note

If the request includes the deployment property spring.cloud.dataflow.task.platformName, and the value is not the same as the tasklauncher’s platformName, the task launcher will throw an exception.

To launch tasks on multiple platforms, you must configure a task launcher instance per platform and use a router sink, or partitioning strategy, to route requests to the correct instance.

[Note]Note

When the poller is paused it puts pressure on the message broker so some tuning will be necessary in extreme cases to balance resource utilization.

Client Authentication

If the SCDF server requires authentication, the client must pass credentials with authorization to launch a task. The Data Flow client supports both basic and OAuth2 authentication.

For basic authentication set the username and password:

--spring.cloud.dataflow.client.authentication.basic.username=<username> --spring.cloud.dataflow.client.authentication.basic.password=<password>

For OAuth2 authentication, set the client-id, client-secret, and token-uri at a minimum. These values correspond to values set in the SCDF server’s OAuth2 configuration. For more details, see the Security section in the Data Flow reference.

--spring.cloud.dataflow.client.authentication.client-id=<client-id> --spring.cloud.dataflow.client.authentication.client-secret=<client-secret> spring.cloud.dataflow.client.authentication.token-uri: <token-uri>

4.21.4 Build

[Note]Note

This version supports SCDF server 2.4.x which is based on Spring Boot 2.2.4. This required some customization to override standard dependency versions set by the app starters tooling. For this reason, the runtime apps for Kafka and Rabbit MQ are not generated by the tools. Instead they are included explicitly in the build.

$ ./mvnw clean install

Examples

Register a task app and create a task, the timestamp sample provides a simple demonstration.

dataflow:>app register --name timestamp --type task --uri ...
dataflow:>task create timestamp --definition timestamp
dataflow:>stream create http --server.port=9000 | task-launcher-dataflow-sink --deploy

Send a launch request,

$curl http://localhost:9000 -H"Content-Type:application/json" -d '{"name":"timestamp"}'
dataflow:>task execution list
╔═════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║Task Name│ID│         Start Time         │          End Time          │Exit Code║
╠═════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║timestamp│1 │Fri Aug 10 08:48:05 EDT 2018│Fri Aug 10 08:48:05 EDT 20180        ║
╚═════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝