4. Sinks

4.1 Aggregate Counter Sink

The aggregate counter differs from a simple counter in that it not only keeps a total value for the count, but also retains the total count values for each minute, hour day and month of the period for which it is run. The data can then be queried by supplying a start and end date and the resolution at which the data should be returned.

4.1.1 Options

The aggregate-counter sink has the following options:

aggregate-counter.date-format
<documentation missing> (String, default: yyyy-MM-dd'T'HH:mm:ss.SSS'Z')
aggregate-counter.increment-expression
Increment value for each bucket as a SpEL against the message (Expression, default: <none>)
aggregate-counter.name
The name of the aggregate counter. (String, default: <none>)
aggregate-counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the aggregate counter. (Expression, default: <none>)
aggregate-counter.time-field
A SpEL expression (against the incoming Message) to derive the timestamp value. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)

4.2 Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

4.2.1 Options

The cassandra sink has the following options:

cassandra.cluster.compression-type
The compression to use for the transport. (CompressionType, default: <none>, possible values: NONE,SNAPPY)
cassandra.cluster.contact-points
The comma-delimited string of the hosts to connect to Cassandra. (String, default: <none>)
cassandra.cluster.create-keyspace
The flag to create (or not) keyspace on application startup. (Boolean, default: false)
cassandra.cluster.entity-base-packages
The base packages to scan for entities annotated with Table annotations. (String[], default: [])
cassandra.cluster.init-script
The resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default: <none>)
cassandra.cluster.keyspace
The keyspace name to connect to. (String, default: <none>)
cassandra.cluster.metrics-enabled
Enable/disable metrics collection for the created cluster. (Boolean, default: <none>)
cassandra.cluster.password
The password for connection. (String, default: <none>)
cassandra.cluster.port
The port to use to connect to the Cassandra host. (Integer, default: <none>)
cassandra.cluster.schema-action
The schema action to perform. (SchemaAction, default: <none>, possible values: NONE,CREATE,RECREATE,RECREATE_DROP_UNUSED)
cassandra.cluster.username
The username for connection. (String, default: <none>)
cassandra.consistency-level
The consistencyLevel option of WriteOptions. (ConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUOROM,LOCAL_QUOROM,EACH_QUOROM,ALL,LOCAL_ONE,SERIAL,LOCAL_SERIAL)
cassandra.ingest-query
The ingest Cassandra query. (String, default: <none>)
cassandra.query-type
The queryType for Cassandra Sink. (org.springframework.integration.cassandra.outbound.CassandraMessageHandler<T>$Type, default: <none>)
cassandra.retry-policy
The retryPolicy option of WriteOptions. (RetryPolicy, default: <none>, possible values: DEFAULT,DOWNGRADING_CONSISTENCY,FALLTHROUGH,LOGGING)
cassandra.statement-expression
The expression in Cassandra query DSL style. (Expression, default: <none>)
cassandra.ttl
The time-to-live option of WriteOptions. (Integer, default: 0)

4.3 Counter Sink

The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.

4.3.1 Options

The counter sink has the following options:

counter.name
The name of the counter to increment. (String, default: <none>)
counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)

4.4 Field Value Counter Sink

A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload. This sinks supports the following payload types out of the box:

  • POJO (Java bean)
  • Tuple
  • JSON String

For example suppose a message source produces a payload with a field named user :

class Foo {
   String user;
   public Foo(String user) {
       this.user = user;
   }
}

If the stream source produces messages with the following objects:

   new Foo("fred")
   new Foo("sue")
   new Foo("dave")
   new Foo("sue")

The field value counter on the field user will contain:

fred:1, sue:2, dave:1

Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:

users:["dave","fred","sue"]
users:["sue","jon"]

The field value counter on the field users will contain:

dave:1, fred:1, sue:2, jon:1

4.4.1 Options

The field-value-counter sink has the following options:

field-value-counter.field-name
<documentation missing> (String, default: <none>)
field-value-counter.name
The name of the counter to increment. (String, default: <none>)
field-value-counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)

4.5 File Sink

This module writes each message it receives to a file.

4.5.1 Options

The file sink has the following options:

file.binary
A flag to indicate whether content should be written as bytes. (Boolean, default: false)
file.charset
The charset to use when writing text content. (String, default: UTF-8)
file.directory
The parent directory of the target file. (String, default: <none>)
file.directory-expression
The expression to evaluate for the parent directory of the target file. (Expression, default: <none>)
file.mode
The FileExistsMode to use if the target file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE)
file.name
The name of the target file. (String, default: file-sink)
file.name-expression
The expression to evaluate for the name of the target file. (String, default: <none>)
file.suffix
The suffix to append to file name. (String, default: <empty string>)

4.6 FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.6.1 Options

The ftp sink has the following options:

ftp.auto-create-dir
<documentation missing> (Boolean, default: <none>)
ftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
ftp.factory.client-mode
The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)
ftp.factory.host
<documentation missing> (String, default: <none>)
ftp.factory.password
<documentation missing> (String, default: <none>)
ftp.factory.port
The port of the server. (Integer, default: 21)
ftp.factory.username
<documentation missing> (String, default: <none>)
ftp.filename-expression
<documentation missing> (Expression, default: <none>)
ftp.mode
<documentation missing> (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE)
ftp.remote-dir
<documentation missing> (String, default: <none>)
ftp.remote-file-separator
<documentation missing> (String, default: <none>)
ftp.temporary-remote-dir
<documentation missing> (String, default: <none>)
ftp.tmp-file-suffix
<documentation missing> (String, default: <none>)
ftp.use-temporary-filename
<documentation missing> (Boolean, default: <none>)

4.7 Gemfire Sink

The Gemfire sink allows one to write message payloads to a Gemfire server.

4.7.1 Options

The gemfire sink has the following options:

gemfire.json
Indicates if the Gemfire region stores json objects as native Gemfire PdxInstance (Boolean, default: false)
gemfire.key-expression
SpEL expression to use as a cache key (String, default: <none>)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.region.region-name
The region name. (String, default: <none>)

4.8 Gpfdist Sink

A sink module that route messages into GPDB/HAWQ segments via gpfdist protocol. Internally, this sink creates a custom http listener that supports the gpfdist protcol and schedules a task that orchestrates a gploadd session in the same way it is done natively in Greenplum.

No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.

4.8.1 Options

The gpfdist sink has the following options:

gpfdist.batch-count
Number of windowed batch each segment takest (int, default: 100) (Integer, default: 100)
gpfdist.batch-period
Time in seconds for each load operation to sleep in between operations (int, default: 10) (Integer, default: 10)
gpfdist.batch-timeout
Timeout in seconds for segment inactivity. (Integer, default: 4) (Integer, default: 4)
gpfdist.column-delimiter
Data record column delimiter. *(Character, default: no default) (Character, default: <none>)
gpfdist.control-file
Path to yaml control file (String, no default) (Resource, default: <none>)
gpfdist.db-host
Database host (String, default: localhost) (String, default: localhost)
gpfdist.db-name
Database name (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-password
Database password (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-port
Database port (int, default: 5432) (Integer, default: 5432)
gpfdist.db-user
Database user (String, default: gpadmin) (String, default: gpadmin)
gpfdist.delimiter
Data line delimiter (String, default: newline character) (String, default: )
gpfdist.error-table
Tablename to log errors. (String, default: ``) (String, default: <none>)
gpfdist.flush-count
Flush item count (int, default: 100) (Integer, default: 100)
gpfdist.flush-time
Flush item time (int, default: 2) (Integer, default: 2)
gpfdist.gpfdist-port
Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0) (Integer, default: 0)
gpfdist.match-columns
Match columns with update (String, no default) (String, default: <none>)
gpfdist.mode
Mode, either insert or update (String, no default) (String, default: <none>)
gpfdist.null-string
Null string definition. (String, default: ``) (String, default: <none>)
gpfdist.rate-interval
Enable transfer rate interval (int, default: 0) (Integer, default: 0)
gpfdist.segment-reject-limit
Error reject limit. (String, default: ``) (String, default: <none>)
gpfdist.segment-reject-type
Error reject type, either `rows` or `percent`. (String, default: ``) (SegmentRejectType, default: <none>, possible values: ROWS,PERCENT)
gpfdist.sql-after
Sql to run after load (String, no default) (String, default: <none>)
gpfdist.sql-before
Sql to run before load (String, no default) (String, default: <none>)
gpfdist.table
Target database table (String, no default) (String, default: <none>)
gpfdist.update-columns
Update columns with update (String, no default) (String, default: <none>)
spring.net.hostdiscovery.loopback
The new loopback flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.match-interface
The new match interface regex pattern. Default value is is empty (String, default: <none>)
spring.net.hostdiscovery.match-ipv4
Used to match ip address from a network using a cidr notation (String, default: <none>)
spring.net.hostdiscovery.point-to-point
The new point to point flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.prefer-interface
The new preferred interface list (java.util.List<java.lang.String>, default: <none>)

4.8.2 Implementation Notes

Within a gpfdist sink we have a Reactor based stream where data is published from the incoming SI channel. This channel receives data from the Message Bus. The Reactor stream is then connected to Netty based http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among existing http clients. When Greenplum does a load from an external table, each segment will initiate a http connection and start loading data. The net effect is that incoming data is automatically spread among the Greenplum segments.

4.8.3 Detailed Option Descriptions

The gpfdist sink supports the following configuration properties:

table

Database table to work with. (String, default: ``, required)

This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.

Currently table is only way to define a structure of an external table. Effectively it will replace other_table in below clause segment.

CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
mode

Gpfdist mode, either `insert` or `update`. (String, default: insert)

Currently only insert and update gpfdist mode is supported. Mode merge familiar from a native gpfdist loader is not yet supported.

For mode update options matchColumns and updateColumns are required.

columnDelimiter

Data record column delimiter. (Character, default: ``)

Defines used delimiter character in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[DELIMITER AS 'delimiter']
segmentRejectLimit

Error reject limit. (String, default: ``)

Defines a count value in a below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]

As a conveniance this reject limit also recognizes a percentage format 2% and if used, segmentRejectType is automatically set to percent.

segmentRejectType

Error reject type, either `rows` or `percent`. (String, default: ``)

Defines ROWS or PERCENT in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
errorTable

Tablename to log errors. (String, default: ``)

As error table is optional with SEGMENT REJECT LIMIT, it’s only used if both segmentRejectLimit and segmentRejectType are set. Sets error_table in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
nullString

Null string definition. (String, default: ``)

Defines used null string in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[NULL AS 'null string']
delimiter

Data record delimiter for incoming messages. (String, default: \n)

On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.

 

If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered.

 
 -- External Table Docs
matchColumns

Comma delimited list of columns to match. (String, default: ``)

[Note]Note

See more from examples below.

updateColumns

Comma delimited list of columns to update. (String, default: ``)

[Note]Note

See more from examples below.

sqlBefore
Sql clause to run before each load operation. (String, default: ``)
sqlAfter
Sql clause to run after each load operation. (String, default: ``)
rateInterval

Debug rate of data transfer. (Integer, default: 0)

If set to non zero, sink will log a rate of messages passing throught a sink after number of messages denoted by this setting has been processed. Value 0 means that this rate calculation and logging is disabled.

flushCount

Max collected size per windowed data. (Integer, default: 100)

[Note]Note

For more info on flush and batch settings, see above.

4.8.4 How Data Is Sent Into Segments

There are few important concepts involving how data passes into a sink, through it and finally lands into a database.

  • Sink has its normal message handler for incoming data from a source module, gpfdist protocol listener based on netty where segments connect to and in between those two a reactor based streams controlling load balancing into different segment connections.
  • Incoming data is first sent into a reactor which first constructs a windows. This window is then released into a downstream when it gets full(flushTime) or timeouts(flushTime) if window doesn’t get full. One window is then ready to get send into a segment.
  • Segments which connects to this stream are now able to see a stream of window data, not stream of individual messages. We can also call this as a stream of batches.
  • When segment makes a connection to a protocol listener it subscribes itself into this stream and takes count of batches denoted by batchCount and completes a stream if it got enough batches or if batchTimeout occurred due to inactivity.
  • It doesn’t matter how many simultaneous connections there are from a database cluster at any given time as reactor will load balance batches with all subscribers.
  • Database cluster will initiate this loading session when select is done from an external table which will point to this sink. These loading operations are run in a background in a loop one after another. Option batchPeriod is then used as a sleep time in between these load sessions.

Lets take a closer look how options flushCount, flushTime, batchCount, batchTimeout and batchPeriod work.

As in a highest level where incoming data into a sink is windowed, flushCount and flushTime controls when a batch of messages are sent into a downstream. If there are a lot of simultaneous segment connections, flushing less will keep more segments inactive as there is more demand for batches than what flushing will produce.

When existing segment connection is active and it has subscribed itself with a stream of batches, data will keep flowing until either batchCount is met or batchTimeout occurs due to inactivity of data from an upstream. Higher a batchCount is more data each segment will read. Higher a batchTimeout is more time segment will wait in case there is more data to come.

As gpfdist load operations are done in a loop, batchPeriod simply controls not to run things in a buzy loop. Buzy loop would be ok if there is a constant stream of data coming in but if incoming data is more like bursts then buzy loop would be unnecessary.

[Note]Note

Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully.

Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.

4.8.5 Example Usage

In this first example we’re just creating a simple stream which inserts data from a time source. Let’s create a table with two text columns.

gpadmin=# create table ticktock (date text, time text);

Create a simple stream gpstream.

dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy

Let it run and see results from a database.

gpadmin=# select count(*) from ticktock;
 count
-------
    14
(1 row)

In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.

gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');

Now table looks like this.

gpadmin=# select * from httpdata;
 col1  | col2 | col3
-------+------+------
 DATA3 | DATA | DATA
 DATA2 | DATA | DATA
 DATA1 | DATA | DATA
(3 rows)

Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.

dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy

Post some data into a stream which will be passed into a gpfdist sink via http source.

curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/

If you query table again, you’ll see that row for DATA1 has been updated.

gpadmin=# select * from httpdata;
 col1  | col2  | col3
-------+-------+-------
 DATA3 | DATA  | DATA
 DATA2 | DATA  | DATA
 DATA1 | DATA1 | DATA1
(3 rows)

4.8.6 Tuning Transfer Rate

Default values for options flushCount, flushTime, batchCount, batchTimeout and batchPeriod are relatively conservative and needs to be tuned for every use case for optimal performance. Order to make a decision on how to tune sink behaviour to suit your needs few things needs to be considered.

  • What is an average size of messages ingested by a sink.
  • How fast you want data to become visible in a database.
  • Is incoming data a constant flow or a bursts of data.

Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.

Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.

Longer a load session for each segment is active higher the overall transfer rate is going to be. Option batchCount naturally controls this. However option batchTimeout then really controls how fast each segment will complete a stream due to inactivity from upstream and to step away from a loading session to allow distributes session to finish and data become visible in a database.

4.9 HDFS Sink

This module writes each message it receives to HDFS.

4.9.1 Options

The hdfs sink has the following options:

hdfs.close-timeout
Timeout in ms, regardless of activity, after which file will be automatically closed. (Long, default: 0)
hdfs.codec
Compression codec alias name (gzip, snappy, bzip2, lzo, or slzo). (String, default: <none>)
hdfs.directory
Base path to write files to. (String, default: <none>)
hdfs.enable-sync
Whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput. (Boolean, default: false)
hdfs.file-extension
The base filename extension to use for the created files. (String, default: txt)
hdfs.file-name
The base filename to use for the created files. (String, default: <none>)
hdfs.file-open-attempts
Maximum number of file open attempts to find a path. (Integer, default: 10)
hdfs.file-uuid
Whether file name should contain uuid. (Boolean, default: false)
hdfs.flush-timeout
Timeout in ms, regardless of activity, after which data written to file will be flushed. (Long, default: 0)
hdfs.fs-uri
URL for HDFS Namenode. (String, default: <none>)
hdfs.idle-timeout
Inactivity timeout in ms after which file will be automatically closed. (Long, default: 0)
hdfs.in-use-prefix
Prefix for files currently being written. (String, default: <none>)
hdfs.in-use-suffix
Suffix for files currently being written. (String, default: <none>)
hdfs.overwrite
Whether writer is allowed to overwrite files in Hadoop FileSystem. (Boolean, default: false)
hdfs.partition-path
A SpEL expression defining the partition path. (String, default: <none>)
hdfs.rollover
Threshold in bytes when file will be automatically rolled over. (Integer, default: 1000000000)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.10 HDFS Dataset Sink

This module writes each message it receives to HDFS as part of a Kite SDK Dataset.

4.10.1 Options

The hdfs-dataset sink has the following options:

hdfs.dataset.allow-null-values
Whether null property values are allowed, if set to true then schema will use UNION for each field. (Boolean, default: false)
hdfs.dataset.batch-size
Threshold in number of messages when file will be automatically flushed and rolled over. (Integer, default: 10000)
hdfs.dataset.compression-type
Compression type name (snappy, deflate, bzip2 (avro only) or uncompressed) (String, default: snappy)
hdfs.dataset.directory
The base directory path where the files will be written in the Hadoop FileSystem. (String, default: /tmp/hdfs-dataset-sink)
hdfs.dataset.format
The format to use, valid options are avro and parquet. (String, default: avro)
hdfs.dataset.fs-uri
The URI to use to access the Hadoop FileSystem. (String, default: <none>)
hdfs.dataset.idle-timeout
Idle timeout in milliseconds when Hadoop file resource is automatically closed. (Long, default: -1)
hdfs.dataset.namespace
The sub-directory under the base directory where files will be written. (String, default: <none>)
hdfs.dataset.partition-path
The partition path strategy to use, a list of KiteSDK partition expressions separated by a '/' symbol. (String, default: <none>)
hdfs.dataset.writer-cache-size
The size of the cache to be used for partition writers (10 if omitted). (Integer, default: -1)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.11 Jdbc Sink

A module that writes its incoming payload to an RDBMS using JDBC.

4.11.1 Options

The jdbc sink has the following options:

jdbc.columns
The names of the columns that shall receive data, as a set of column[:SpEL] mappings. Also used at initialization time to issue the DDL. (java.util.Map<java.lang.String,java.lang.String>, default: <none>)
jdbc.initialize
'true', 'false' or the location of a custom initialization script for the table. (String, default: false)
jdbc.table-name
The name of the table to write into. (String, default: <none>)
spring.datasource.driver-class-name
<documentation missing> (String, default: <none>)
spring.datasource.init-sql
<documentation missing> (String, default: <none>)
spring.datasource.initialize
Populate the database using 'data.sql'. (Boolean, default: true)
spring.datasource.password
<documentation missing> (String, default: <none>)
spring.datasource.url
<documentation missing> (String, default: <none>)
spring.datasource.username
<documentation missing> (String, default: <none>)
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.12 Log Sink

The log sink uses the application logger to output the data for inspection.

4.12.1 Options

The log sink has the following options:

log.expression
A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)
log.level
The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)
log.name
The name of the logger to use. (String, default: <none>)

4.13 RabbitMQ Sink

This module sends messages to RabbitMQ.

4.13.1 Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

rabbit.converter-bean-name
The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default: <none>)
rabbit.exchange
Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)
rabbit.exchange-expression
A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)
rabbit.mapped-request-headers
Headers that will be mapped. (String[], default: [*])
rabbit.persistent-delivery-mode
Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)
rabbit.routing-key
Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)
rabbit.routing-key-expression
A SpEL expression that evaluates to a routing key. (Expression, default: <none>)
spring.rabbitmq.addresses
Comma-separated list of addresses to which the client should connect to. (String, default: <none>)
spring.rabbitmq.host
RabbitMQ host. (String, default: localhost)
spring.rabbitmq.password
Login to authenticate against the broker. (String, default: <none>)
spring.rabbitmq.port
RabbitMQ port. (Integer, default: 5672)
spring.rabbitmq.requested-heartbeat
Requested heartbeat timeout, in seconds; zero for none. (Integer, default: <none>)
spring.rabbitmq.username
Login user to authenticate to the broker. (String, default: <none>)
spring.rabbitmq.virtual-host
Virtual host to use when connecting to the broker. (String, default: <none>)
[Note]Note

By default, the message converter is a SimpleMessageConverter which handles byte[], String and java.io.Serializable. A well-known bean name jsonConverter will configure a Jackson2JsonMessageConverter instead. In addition, a custom converter bean can be added to the context and referenced by the converterBeanName property.

4.14 Redis Sink

This module sends messages to Redis store.

4.14.1 Options

The redis sink has the following options:

redis.key
A literal key name to use when storing to a key. (String, default: <none>)
redis.key-expression
A SpEL expression to use for storing to a key. (Expression, default: <none>)
redis.queue
A literal queue name to use when storing in a queue. (String, default: <none>)
redis.queue-expression
A SpEL expression to use for queue. (Expression, default: <none>)
redis.topic
A literal topic name to use when publishing to a topic. (String, default: <none>)
redis.topic-expression
A SpEL expression to use for topic. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.pool.max-active
Max number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)
spring.redis.pool.max-idle
Max number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)
spring.redis.pool.max-wait
Maximum amount of time (in milliseconds) a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Integer, default: -1)
spring.redis.pool.min-idle
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (Integer, default: 0)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.sentinel.master
Name of Redis server. (String, default: <none>)
spring.redis.sentinel.nodes
Comma-separated list of host:port pairs. (String, default: <none>)
spring.redis.timeout
Connection timeout in milliseconds. (Integer, default: 0)

4.15 Router Sink

This application routes messages to named channels.

4.15.1 Options

The router sink has the following options:

router.default-output-channel
Where to send unroutable messages. (String, default: nullChannel)
router.destination-mappings
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.expression
The expression to be applied to the message to determine the channel(s) to route to. (Expression, default: <none>)
router.refresh-delay
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)
router.resolution-required
Whether or not channel resolution is required. (Boolean, default: false)
router.script
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)
router.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)
[Note]Note

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

4.15.2 SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

4.15.3 Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

4.16 Amazon S3 Sink

This sink app supports transfer files to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain payload as:

  • File, including directories for recursive upload;
  • InputStream;
  • byte[]

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

4.16.1 Options

The s3 sink has the following options:

bucket
AWS S3 bucket to store files (String, no default)
bucketExpression
SpEL Expression to evaluate S3 bucket against request `Message` (Expression, no default)
keyExpression
SpEL Expression to evaluate S3 Object `key` against request `Message` (Expression, default: File.getName() if any)
acl
Access control list for S3 Object (CannedAccessControlList, no default)
aclExpression
SpEL Expression to evaluate access control list for S3 Object against request `Message` (Expression, no default)

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean.

4.16.2 Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey
  • cloud.aws.credentials.secretKey
  • cloud.aws.credentials.instanceProfile
  • cloud.aws.credentials.profileName
  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto
  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto
  • cloud.aws.stack.name

4.17 SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.17.1 Options

The sftp sink has the following options:

sftp.auto-create-dir
<documentation missing> (Boolean, default: <none>)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
sftp.factory.host
<documentation missing> (String, default: <none>)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (String, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
<documentation missing> (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (String, default: <empty string>)
sftp.factory.username
<documentation missing> (String, default: <none>)
sftp.filename-expression
<documentation missing> (Expression, default: <none>)
sftp.mode
<documentation missing> (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE)
sftp.remote-dir
<documentation missing> (String, default: <none>)
sftp.remote-file-separator
<documentation missing> (String, default: <none>)
sftp.temporary-remote-dir
<documentation missing> (String, default: <none>)
sftp.tmp-file-suffix
<documentation missing> (String, default: <none>)
sftp.use-temporary-filename
<documentation missing> (Boolean, default: <none>)

4.18 TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

4.18.1 Options

The tcp sink has the following options:

tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.close
Whether to close the socket after each message. (Boolean, default: false)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: <none>)
tcp.nio
<documentation missing> (Boolean, default: <none>)
tcp.port
<documentation missing> (Integer, default: <none>)
tcp.reverse-lookup
<documentation missing> (Boolean, default: <none>)
tcp.socket-timeout
<documentation missing> (Integer, default: <none>)
tcp.use-direct-buffers
<documentation missing> (Boolean, default: <none>)

4.18.2 Available Encoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

4.19 Throughput Sink

A simple handler that will count messages and log witnessed throughput at a selected interval.

4.20 Websocket Sink

A simple Websocket Sink implementation.

4.20.1 Options

The following commmand line arguments are supported:

websocket.log-level
the logLevel for netty channels. Default is <tt>WARN</tt> (String, default: <none>)
websocket.path
the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default: /websocket)
websocket.port
the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default: 9292)
websocket.ssl
whether or not to create a {@link io.netty.handler.ssl.SslContext} (Boolean, default: false)
websocket.threads
the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default: 1)

4.20.2 Example

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Redis

The default broker that is used is Redis. Normally can start Redis via redis-server.

Step 2: Deploy a time-source

Step 3: Deploy a websocket-sink (the app that contains this starter jar)

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.stream.module.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

4.20.3 Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketsinktrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketsinktrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser

[Note]Note

For SSL mode (--ssl=true) a self signed certificate is used that might cause troubles with some Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not self-signed) certificate.