4. Sinks

4.1 Aggregate Counter Sink

The aggregate counter differs from a simple counter in that it not only keeps a total value for the count, but also retains the total count values for each minute, hour day and month of the period for which it is run. The data can then be queried by supplying a start and end date and the resolution at which the data should be returned.

4.1.1 Input

Headers

N/A

Payload

  • Any

4.1.2 Output

N/A

4.1.3 Options

The aggregate-counter sink has the following options:

aggregate-counter.date-format
<documentation missing> (String, default: yyyy-MM-dd'T'HH:mm:ss.SSS'Z')
aggregate-counter.increment-expression
Increment value for each bucket as a SpEL against the message (Expression, default: <none>)
aggregate-counter.name
The name of the aggregate counter. (String, default: <none>)
aggregate-counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the aggregate counter. (Expression, default: <none>)
aggregate-counter.time-field
A SpEL expression (against the incoming Message) to derive the timestamp value. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

4.1.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.1.5 Examples

java -jar aggregate-counter-sink.jar --name=<name of the counter>

java -jar aggregate-counter-sink.jar --nameExpression=<name expression>

4.2 Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

4.2.1 Input

Headers:

  • Content-Type: application/json

Payload:

A JSON String or byte array representing the entity (or a list of entities) to be persisted

4.2.2 Output

N/A

4.2.3 Options

The cassandra sink has the following options:

cassandra.cluster.compression-type
The compression to use for the transport. (CompressionType, default: <none>, possible values: NONE,SNAPPY,LZ4)
cassandra.cluster.contact-points
The comma-delimited string of the hosts to connect to Cassandra. (String, default: <none>)
cassandra.cluster.create-keyspace
The flag to create (or not) keyspace on application startup. (Boolean, default: false)
cassandra.cluster.entity-base-packages
The base packages to scan for entities annotated with Table annotations. (String[], default: [])
cassandra.cluster.init-script
The resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default: <none>)
cassandra.cluster.keyspace
The keyspace name to connect to. (String, default: <none>)
cassandra.cluster.metrics-enabled
Enable/disable metrics collection for the created cluster. (Boolean, default: <none>)
cassandra.cluster.password
The password for connection. (String, default: <none>)
cassandra.cluster.port
The port to use to connect to the Cassandra host. (Integer, default: <none>)
cassandra.cluster.schema-action
The schema action to perform. (SchemaAction, default: <none>, possible values: NONE,CREATE,CREATE_IF_NOT_EXISTS,RECREATE,RECREATE_DROP_UNUSED)
cassandra.cluster.skip-ssl-validation
The flag to validate the Servers' SSL certs (Boolean, default: false)
cassandra.cluster.use-ssl
The flag to use SSL to connect (Boolean, default: false)
cassandra.cluster.username
The username for connection. (String, default: <none>)
cassandra.consistency-level
The consistencyLevel option of WriteOptions. (ConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,ALL,LOCAL_ONE,SERIAL,LOCAL_SERIAL,QUORUM,LOCAL_QUORUM,EACH_QUORUM)
cassandra.ingest-query
The ingest Cassandra query. (String, default: <none>)
cassandra.query-type
The queryType for Cassandra Sink. (Type, default: <none>, possible values: INSERT,UPDATE,DELETE,STATEMENT)
cassandra.retry-policy
The retryPolicy option of WriteOptions. (CassandraRetryPolicy, default: <none>, possible values: DEFAULT,DOWNGRADING_CONSISTENCY,FALLTHROUGH,LOGGING)
cassandra.statement-expression
The expression in Cassandra query DSL style. (Expression, default: <none>)
cassandra.ttl
The time-to-live option of WriteOptions. (Integer, default: 0)

4.2.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.2.5 Examples

The following example assumes a JSON payload is sent to a default destination called input, the sink parses some of its properties (id,time,customer_id,value) and persists them into a table called orders.

java -jar cassandra_sink.jar --cassandra.cluster.keyspace=test
--cassandra.ingest-query="insert into orders(id,time,customer_id, value) values (?,?,?,?)"

4.3 Counter Sink

The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.

4.3.1 Input

Headers

N/A

Payload

  • Any

4.3.2 Output

N/A

4.3.3 Options

The counter sink has the following options:

counter.name
The name of the counter to increment. (String, default: <none>)
counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

4.3.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.3.5 Examples

java -jar counter-sink.jar --name=<name of the counter>

java -jar counter-sink.jar --nameExpression=<name expression>

4.4 Field Value Counter Sink

A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload.

4.4.1 Input

Headers

content-type: application/octet-stream

Payload

  • POJO (Java bean)
  • Map

Headers

content-type: text/plain

Payload

  • JSON String

Headers

content-type: application/x-spring-tuple

Payload

  • Tuple

For example suppose a message source produces a payload with a field named user :

class Foo {
   String user;
   public Foo(String user) {
       this.user = user;
   }
}

If the stream source produces messages with the following objects:

   new Foo("fred")
   new Foo("sue")
   new Foo("dave")
   new Foo("sue")

The field value counter on the field user will contain:

fred:1, sue:2, dave:1

Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:

users:["dave","fred","sue"]
users:["sue","jon"]

The field value counter on the field users will contain:

dave:1, fred:1, sue:2, jon:1

4.4.2 Output

N/A

4.4.3 Options

The field-value-counter sink has the following options:

field-value-counter.field-name
The field name to extract for the counter. (String, default: <none>)
field-value-counter.name
The name of the counter to increment. (String, default: <none>)
field-value-counter.name-expression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

4.4.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.4.5 Examples

java -jar field-value-counter-sink.jar --fieldName=<field-name> --name=<name of the counter>

4.5 File Sink

This module writes each message it receives to a file.

4.5.1 Input

Headers

N/A

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.5.2 Output

N/A (writes to the file system).

4.5.3 Options

The file sink has the following options:

file.binary
A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default: false)
file.charset
The charset to use when writing text content. (String, default: UTF-8)
file.directory
The parent directory of the target file. (String, default: <none>)
file.directory-expression
The expression to evaluate for the parent directory of the target file. (Expression, default: <none>)
file.mode
The FileExistsMode to use if the target file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
file.name
The name of the target file. (String, default: file-sink)
file.name-expression
The expression to evaluate for the name of the target file. (String, default: <none>)
file.suffix
The suffix to append to file name. (String, default: <empty string>)

4.5.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar file_sink.jar --file.directory=/tmp/bar

4.6 FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.6.1 Input

Headers

  • file_name (See note above)

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.6.2 Output

N/A (writes to the FTP server).

4.6.3 Options

The ftp sink has the following options:

ftp.auto-create-dir
Whether or not to create the remote directory. (Boolean, default: true)
ftp.factory.cache-sessions
<documentation missing> (Boolean, default: <none>)
ftp.factory.client-mode
The client mode to use for the FTP session. (ClientMode, default: <none>, possible values: ACTIVE,PASSIVE)
ftp.factory.host
<documentation missing> (String, default: <none>)
ftp.factory.password
<documentation missing> (String, default: <none>)
ftp.factory.port
The port of the server. (Integer, default: 21)
ftp.factory.username
<documentation missing> (String, default: <none>)
ftp.filename-expression
A SpEL expression to generate the remote file name. (Expression, default: <none>)
ftp.mode
Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
ftp.remote-dir
The remote FTP directory. (String, default: /)
ftp.remote-file-separator
The remote file separator. (String, default: /)
ftp.temporary-remote-dir
A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default: /)
ftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
ftp.use-temporary-filename
Whether or not to write to a temporary file and rename. (Boolean, default: true)

4.6.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar ftp_sink.jar --ftp.remote-dir=bar --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw

4.7 Gemfire Sink

The Gemfire sink allows one to write message payloads to a Gemfire server.

4.7.1 Input

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/x-java-serialized-object

Payload

  • java.io.Serializable

4.7.2 Output

N/A

4.7.3 Options

The gemfire sink has the following options:

gemfire.json
Indicates if the Gemfire region stores json objects as native Gemfire PdxInstance (Boolean, default: false)
gemfire.key-expression
SpEL expression to use as a cache key (String, default: <none>)
gemfire.pool.connect-type
Specifies connection type: 'server' or 'locator'. (ConnectType, default: <none>, possible values: locator,server)
gemfire.pool.host-addresses
Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default: <none>)
gemfire.pool.subscription-enabled
Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default: false)
gemfire.region.region-name
The region name. (String, default: <none>)
gemfire.security.password
The cache password. (String, default: <none>)
gemfire.security.username
The cache username. (String, default: <none>)

4.7.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.7.5 Examples

java -jar gemfire-sink.jar --gemfire.keyExpression=

4.8 Gpfdist Sink

A sink module that route messages into GPDB/HAWQ segments via gpfdist protocol. Internally, this sink creates a custom http listener that supports the gpfdist protcol and schedules a task that orchestrates a gploadd session in the same way it is done natively in Greenplum.

No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.

4.8.1 Input

Headers:

  • Content-Type: text/plain

Payload:

  • String

4.8.2 Output

N/A

4.8.3 Options

The gpfdist sink has the following options:

gpfdist.batch-count
Number of windowed batch each segment takest (int, default: 100) (Integer, default: 100)
gpfdist.batch-period
Time in seconds for each load operation to sleep in between operations (int, default: 10) (Integer, default: 10)
gpfdist.batch-timeout
Timeout in seconds for segment inactivity. (Integer, default: 4) (Integer, default: 4)
gpfdist.column-delimiter
Data record column delimiter. *(Character, default: no default) (Character, default: <none>)
gpfdist.control-file
Path to yaml control file (String, no default) (Resource, default: <none>)
gpfdist.db-host
Database host (String, default: localhost) (String, default: localhost)
gpfdist.db-name
Database name (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-password
Database password (String, default: gpadmin) (String, default: gpadmin)
gpfdist.db-port
Database port (int, default: 5432) (Integer, default: 5432)
gpfdist.db-user
Database user (String, default: gpadmin) (String, default: gpadmin)
gpfdist.delimiter
Data line delimiter (String, default: newline character) (String, default: )
gpfdist.flush-count
Flush item count (int, default: 100) (Integer, default: 100)
gpfdist.flush-time
Flush item time (int, default: 2) (Integer, default: 2)
gpfdist.gpfdist-port
Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0) (Integer, default: 0)
gpfdist.log-errors
Enable log errors. (Boolean, default: false) (Boolean, default: false)
gpfdist.match-columns
Match columns with update (String, no default) (String, default: <none>)
gpfdist.mode
Mode, either insert or update (String, no default) (String, default: <none>)
gpfdist.null-string
Null string definition. (String, default: ``) (String, default: <none>)
gpfdist.rate-interval
Enable transfer rate interval (int, default: 0) (Integer, default: 0)
gpfdist.segment-reject-limit
Error reject limit. (String, default: ``) (String, default: <none>)
gpfdist.segment-reject-type
Error reject type, either `rows` or `percent`. (String, default: `rows`) (SegmentRejectType, default: <none>, possible values: ROWS,PERCENT)
gpfdist.sql-after
Sql to run after load (String, no default) (String, default: <none>)
gpfdist.sql-before
Sql to run before load (String, no default) (String, default: <none>)
gpfdist.table
Target database table (String, no default) (String, default: <none>)
gpfdist.update-columns
Update columns with update (String, no default) (String, default: <none>)
spring.net.hostdiscovery.loopback
The new loopback flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.match-interface
The new match interface regex pattern. Default value is is empty (String, default: <none>)
spring.net.hostdiscovery.match-ipv4
Used to match ip address from a network using a cidr notation (String, default: <none>)
spring.net.hostdiscovery.point-to-point
The new point to point flag. Default value is FALSE (Boolean, default: false)
spring.net.hostdiscovery.prefer-interface
The new preferred interface list (List<String>, default: <none>)

4.8.4 Implementation Notes

Within a gpfdist sink we have a Reactor based stream where data is published from the incoming SI channel. This channel receives data from the Message Bus. The Reactor stream is then connected to Netty based http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among existing http clients. When Greenplum does a load from an external table, each segment will initiate a http connection and start loading data. The net effect is that incoming data is automatically spread among the Greenplum segments.

4.8.5 Detailed Option Descriptions

The gpfdist sink supports the following configuration properties:

table

Database table to work with. (String, default: ``, required)

This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.

Currently table is only way to define a structure of an external table. Effectively it will replace other_table in below clause segment.

CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
mode

Gpfdist mode, either `insert` or `update`. (String, default: insert)

Currently only insert and update gpfdist mode is supported. Mode merge familiar from a native gpfdist loader is not yet supported.

For mode update options matchColumns and updateColumns are required.

columnDelimiter

Data record column delimiter. (Character, default: ``)

Defines used delimiter character in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[DELIMITER AS 'delimiter']
segmentRejectLimit

Error reject limit. (String, default: ``)

Defines a count value in a below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]

As a conveniance this reject limit also recognizes a percentage format 2% and if used, segmentRejectType is automatically set to percent.

segmentRejectType

Error reject type, either `rows` or `percent`. (String, default: ``)

Defines ROWS or PERCENT in below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
logErrors

Enable or disable log errors. (Boolean, default: false)

As error logging is optional with SEGMENT REJECT LIMIT, it’s only used if both segmentRejectLimit and segmentRejectType are set. Enables the error log in below clause segment.

[ [LOG ERRORS] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
nullString

Null string definition. (String, default: ``)

Defines used null string in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[NULL AS 'null string']
delimiter

Data record delimiter for incoming messages. (String, default: \n)

On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.

 

If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered.

 
 -- External Table Docs
matchColumns

Comma delimited list of columns to match. (String, default: ``)

[Note]Note

See more from examples below.

updateColumns

Comma delimited list of columns to update. (String, default: ``)

[Note]Note

See more from examples below.

sqlBefore
Sql clause to run before each load operation. (String, default: ``)
sqlAfter
Sql clause to run after each load operation. (String, default: ``)
rateInterval

Debug rate of data transfer. (Integer, default: 0)

If set to non zero, sink will log a rate of messages passing throught a sink after number of messages denoted by this setting has been processed. Value 0 means that this rate calculation and logging is disabled.

flushCount

Max collected size per windowed data. (Integer, default: 100)

[Note]Note

For more info on flush and batch settings, see above.

4.8.6 How Data Is Sent Into Segments

There are few important concepts involving how data passes into a sink, through it and finally lands into a database.

  • Sink has its normal message handler for incoming data from a source module, gpfdist protocol listener based on netty where segments connect to and in between those two a reactor based streams controlling load balancing into different segment connections.
  • Incoming data is first sent into a reactor which first constructs a windows. This window is then released into a downstream when it gets full(flushTime) or timeouts(flushTime) if window doesn’t get full. One window is then ready to get send into a segment.
  • Segments which connects to this stream are now able to see a stream of window data, not stream of individual messages. We can also call this as a stream of batches.
  • When segment makes a connection to a protocol listener it subscribes itself into this stream and takes count of batches denoted by batchCount and completes a stream if it got enough batches or if batchTimeout occurred due to inactivity.
  • It doesn’t matter how many simultaneous connections there are from a database cluster at any given time as reactor will load balance batches with all subscribers.
  • Database cluster will initiate this loading session when select is done from an external table which will point to this sink. These loading operations are run in a background in a loop one after another. Option batchPeriod is then used as a sleep time in between these load sessions.

Lets take a closer look how options flushCount, flushTime, batchCount, batchTimeout and batchPeriod work.

As in a highest level where incoming data into a sink is windowed, flushCount and flushTime controls when a batch of messages are sent into a downstream. If there are a lot of simultaneous segment connections, flushing less will keep more segments inactive as there is more demand for batches than what flushing will produce.

When existing segment connection is active and it has subscribed itself with a stream of batches, data will keep flowing until either batchCount is met or batchTimeout occurs due to inactivity of data from an upstream. Higher a batchCount is more data each segment will read. Higher a batchTimeout is more time segment will wait in case there is more data to come.

As gpfdist load operations are done in a loop, batchPeriod simply controls not to run things in a buzy loop. Buzy loop would be ok if there is a constant stream of data coming in but if incoming data is more like bursts then buzy loop would be unnecessary.

[Note]Note

Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully.

Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.

4.8.7 Example Usage

In this first example we’re just creating a simple stream which inserts data from a time source. Let’s create a table with two text columns.

gpadmin=# create table ticktock (date text, time text);

Create a simple stream gpstream.

dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy

Let it run and see results from a database.

gpadmin=# select count(*) from ticktock;
 count
-------
    14
(1 row)

In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.

gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');

Now table looks like this.

gpadmin=# select * from httpdata;
 col1  | col2 | col3
-------+------+------
 DATA3 | DATA | DATA
 DATA2 | DATA | DATA
 DATA1 | DATA | DATA
(3 rows)

Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.

dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy

Post some data into a stream which will be passed into a gpfdist sink via http source.

curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/

If you query table again, you’ll see that row for DATA1 has been updated.

gpadmin=# select * from httpdata;
 col1  | col2  | col3
-------+-------+-------
 DATA3 | DATA  | DATA
 DATA2 | DATA  | DATA
 DATA1 | DATA1 | DATA1
(3 rows)

4.8.8 Tuning Transfer Rate

Default values for options flushCount, flushTime, batchCount, batchTimeout and batchPeriod are relatively conservative and needs to be tuned for every use case for optimal performance. Order to make a decision on how to tune sink behaviour to suit your needs few things needs to be considered.

  • What is an average size of messages ingested by a sink.
  • How fast you want data to become visible in a database.
  • Is incoming data a constant flow or a bursts of data.

Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.

Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.

Longer a load session for each segment is active higher the overall transfer rate is going to be. Option batchCount naturally controls this. However option batchTimeout then really controls how fast each segment will complete a stream due to inactivity from upstream and to step away from a loading session to allow distributes session to finish and data become visible in a database.

4.8.9 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.8.10 Examples

See above.

4.9 HDFS Sink

This module writes each message it receives to HDFS.

4.9.1 Input

Headers

Payload

Any

4.9.2 Output

N/A

4.9.3 Options

The hdfs sink has the following options:

hdfs.close-timeout
Timeout in ms, regardless of activity, after which file will be automatically closed. (Long, default: 0)
hdfs.codec
Compression codec alias name (gzip, snappy, bzip2, lzo, or slzo). (String, default: <none>)
hdfs.directory
Base path to write files to. (String, default: <none>)
hdfs.enable-sync
Whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput. (Boolean, default: false)
hdfs.file-extension
The base filename extension to use for the created files. (String, default: txt)
hdfs.file-name
The base filename to use for the created files. (String, default: <none>)
hdfs.file-open-attempts
Maximum number of file open attempts to find a path. (Integer, default: 10)
hdfs.file-uuid
Whether file name should contain uuid. (Boolean, default: false)
hdfs.flush-timeout
Timeout in ms, regardless of activity, after which data written to file will be flushed. (Long, default: 0)
hdfs.fs-uri
URL for HDFS Namenode. (String, default: <none>)
hdfs.idle-timeout
Inactivity timeout in ms after which file will be automatically closed. (Long, default: 0)
hdfs.in-use-prefix
Prefix for files currently being written. (String, default: <none>)
hdfs.in-use-suffix
Suffix for files currently being written. (String, default: <none>)
hdfs.overwrite
Whether writer is allowed to overwrite files in Hadoop FileSystem. (Boolean, default: false)
hdfs.partition-path
A SpEL expression defining the partition path. (String, default: <none>)
hdfs.rollover
Threshold in bytes when file will be automatically rolled over. (Integer, default: 1000000000)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.9.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.9.5 Examples

java -jar hdfs-sink.jar --fsUri

Unresolved directive in sinks.adoc - include::https://raw.githubusercontent.com/spring-cloud-stream-app-starters/hdfs/master/spring-cloud-starter-stream-sink-hdfs-dataset/README.adoc[tags=ref-doc]

4.10 Jdbc Sink

A module that writes its incoming payload to an RDBMS using JDBC.

4.10.1 Input

Headers

Payload

  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

4.10.2 Output

N/A

4.10.3 Options

The jdbc sink has the following options:

jdbc.columns
The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())
jdbc.initialize
'true', 'false' or the location of a custom initialization script for the table. (String, default: false)
jdbc.table-name
The name of the table to write into. (String, default: messages)
spring.datasource.data
Data (DML) script resource references. (List<String>, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.initialization-mode
Initialize the datasource using available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.schema
Schema (DDL) script resource references. (List<String>, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)

The jdbc.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name"
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.columns=name,city:address.city,street:address.street
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.10.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.10.5 Examples

java -jar jdbc-sink.jar --jdbc.tableName=names --jdbc.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

4.11 Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

4.11.1 Input

Headers

Payload

Any

4.11.2 Output

N/A

4.11.3 Options

The log sink has the following options:

log.expression
A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)
log.level
The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)
log.name
The name of the logger to use. (String, default: <none>)

4.11.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.11.5 Examples

java -jar log-sink.jar

4.12 RabbitMQ Sink

This module sends messages to RabbitMQ.

4.12.1 Input

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/octet-stream

Payload

  • byte[]

Headers

  • content-type: application/x-java-serialized-object

Payload

  • java.io.Serializable

Note: With converterBeanName = jsonConverter any object that can be converted to JSON by Jackson (content type sent to rabbit will be application/json with type information in other headers.

With converterBeanName set to something else, payload will be any object that the converter can handle.

4.12.2 Output

N/A

4.12.3 Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

rabbit.converter-bean-name
The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default: <none>)
rabbit.exchange
Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)
rabbit.exchange-expression
A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)
rabbit.mapped-request-headers
Headers that will be mapped. (String[], default: [*])
rabbit.persistent-delivery-mode
Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)
rabbit.routing-key
Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)
rabbit.routing-key-expression
A SpEL expression that evaluates to a routing key. (Expression, default: <none>)
spring.rabbitmq.addresses
Comma-separated list of addresses to which the client should connect. (String, default: <none>)
spring.rabbitmq.connection-timeout
Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)
spring.rabbitmq.host
RabbitMQ host. (String, default: localhost)
spring.rabbitmq.password
Login to authenticate against the broker. (String, default: guest)
spring.rabbitmq.port
RabbitMQ port. (Integer, default: 5672)
spring.rabbitmq.publisher-confirms
Whether to enable publisher confirms. (Boolean, default: false)
spring.rabbitmq.publisher-returns
Whether to enable publisher returns. (Boolean, default: false)
spring.rabbitmq.requested-heartbeat
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)
spring.rabbitmq.username
Login user to authenticate to the broker. (String, default: guest)
spring.rabbitmq.virtual-host
Virtual host to use when connecting to the broker. (String, default: <none>)
[Note]Note

By default, the message converter is a SimpleMessageConverter which handles byte[], String and java.io.Serializable. A well-known bean name jsonConverter will configure a Jackson2JsonMessageConverter instead. In addition, a custom converter bean can be added to the context and referenced by the converterBeanName property.

4.12.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.12.5 Examples

java -jar rabbit-sink.jar --rabbit.routingKey=
java -jar rabbit-sink.jar --rabbit.routingKeyExpression=

4.13 MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

4.13.1 Input

Headers

Payload

  • Any POJO

4.13.2 Output

N/A

4.13.3 Options

The mongodb sink has the following options:

mongodb.collection
The MongoDB collection to store data (String, default: <none>)
mongodb.collection-expression
The SpEL expression to evaluate MongoDB collection (Expression, default: <none>)
spring.data.mongodb.authentication-database
Authentication database name. (String, default: <none>)
spring.data.mongodb.database
Database name. (String, default: <none>)
spring.data.mongodb.field-naming-strategy
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default: <none>)
spring.data.mongodb.grid-fs-database
GridFS database name. (String, default: <none>)
spring.data.mongodb.host
Mongo server host. Cannot be set with URI. (String, default: <none>)
spring.data.mongodb.password
Login password of the mongo server. Cannot be set with URI. (Character[], default: <none>)
spring.data.mongodb.port
Mongo server port. Cannot be set with URI. (Integer, default: <none>)
spring.data.mongodb.uri
Mongo database URI. Cannot be set with host, port and credentials. (String, default: <none>)
spring.data.mongodb.username
Login user of the mongo server. Cannot be set with URI. (String, default: <none>)

Also see the Spring Boot Documentation for additional MongoProperties properties.

4.13.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.13.5 Examples

java -jar mongodb-sink.jar --mongodb.collection=
java -jar mongodb-sink.jar --mongodb.collectionExpression=

4.14 MQTT Sink

This module sends messages to MQTT.

4.14.1 Input

Headers:

Payload:

  • byte[]
  • String

4.14.2 Output

N/A

4.14.3 Options

The mqtt sink has the following options:

mqtt.async
whether or not to use async sends (Boolean, default: false)
mqtt.charset
the charset used to convert a String payload to byte[] (String, default: UTF-8)
mqtt.clean-session
whether the client and server should remember state across restarts and reconnects (Boolean, default: true)
mqtt.client-id
identifies the client (String, default: stream.client.id.sink)
mqtt.connection-timeout
the connection timeout in seconds (Integer, default: 30)
mqtt.keep-alive-interval
the ping interval in seconds (Integer, default: 60)
mqtt.password
the password to use when connecting to the broker (String, default: guest)
mqtt.persistence
'memory' or 'file' (String, default: memory)
mqtt.persistence-directory
Persistence directory (String, default: /tmp/paho)
mqtt.qos
the quality of service to use (Integer, default: 1)
mqtt.retained
whether to set the 'retained' flag (Boolean, default: false)
mqtt.topic
the topic to which the sink will publish (String, default: stream.mqtt)
mqtt.url
location of the mqtt broker(s) (comma-delimited list) (String[], default: [tcp://localhost:1883])
mqtt.username
the username to use when connecting to the broker (String, default: guest)

4.14.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.14.5 Examples

java -jar mqtt-sink.jar --mqtt.clientid= --mqtt.topic=

4.15 Pgcopy Sink

A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.

4.15.1 Input

Headers

Payload

  • Any

Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)

4.15.2 Output

N/A

4.15.3 Options

The jdbc sink has the following options:

pgcopy.batch-size
Threshold in number of messages when data will be flushed to database table. (Integer, default: 10000)
pgcopy.columns
The names of the columns that shall receive data. Also used at initialization time to issue the DDL. (List<String>, default: payload)
pgcopy.delimiter
Specifies the character that separates columns within each row (line) of the file. The default is a tab character in text format, a comma in CSV format. This must be a single one-byte character. Using an escaped value like '\t' is allowed. (String, default: <none>)
pgcopy.error-table
The name of the error table used for writing rows causing errors. The error table should have three columns named "table_name", "error_message" and "payload" large enough to hold potential data values. You can use the following DDL to create this table: 'CREATE TABLE ERRORS (TABLE_NAME VARCHAR(255), ERROR_MESSAGE TEXT,PAYLOAD TEXT)' (String, default: <none>)
pgcopy.escape
Specifies the character that should appear before a data character that matches the QUOTE value. The default is the same as the QUOTE value (so that the quoting character is doubled if it appears in the data). This must be a single one-byte character. This option is allowed only when using CSV format. (Character, default: <none>)
pgcopy.format
Format to use for the copy command. (Format, default: <none>, possible values: TEXT,CSV)
pgcopy.idle-timeout
Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)
pgcopy.initialize
'true', 'false' or the location of a custom initialization script for the table. (String, default: false)
pgcopy.null-string
Specifies the string that represents a null value. The default is \N (backslash-N) in text format, and an unquoted empty string in CSV format. (String, default: <none>)
pgcopy.quote
Specifies the quoting character to be used when a data value is quoted. The default is double-quote. This must be a single one-byte character. This option is allowed only when using CSV format. (Character, default: <none>)
pgcopy.table-name
The name of the table to write into. (String, default: <none>)
spring.datasource.driver-class-name
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)
spring.datasource.password
Login password of the database. (String, default: <none>)
spring.datasource.url
JDBC url of the database. (String, default: <none>)
spring.datasource.username
Login username of the database. (String, default: <none>)
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.15.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

For integration tests to run, start a PostgreSQL database on localhost:

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

4.15.5 Examples

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

4.16 Redis Sink

This module sends messages to Redis store.

4.16.1 Input

Headers

  • content-type: text/plain

Payload

  • String

Headers

  • content-type: application/octet-stream

Payload

  • byte[]

4.16.2 Output

N/A

4.16.3 Options

The redis sink has the following options:

redis.key
A literal key name to use when storing to a key. (String, default: <none>)
redis.key-expression
A SpEL expression to use for storing to a key. (Expression, default: <none>)
redis.queue
A literal queue name to use when storing in a queue. (String, default: <none>)
redis.queue-expression
A SpEL expression to use for queue. (Expression, default: <none>)
redis.topic
A literal topic name to use when publishing to a topic. (String, default: <none>)
redis.topic-expression
A SpEL expression to use for topic. (Expression, default: <none>)
spring.redis.database
Database index used by the connection factory. (Integer, default: 0)
spring.redis.host
Redis server host. (String, default: localhost)
spring.redis.jedis.pool.max-active
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)
spring.redis.jedis.pool.max-idle
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)
spring.redis.jedis.pool.max-wait
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)
spring.redis.jedis.pool.min-idle
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (Integer, default: 0)
spring.redis.lettuce.pool.max-active
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default: 8)
spring.redis.lettuce.pool.max-idle
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default: 8)
spring.redis.lettuce.pool.max-wait
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default: -1ms)
spring.redis.lettuce.pool.min-idle
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (Integer, default: 0)
spring.redis.password
Login password of the redis server. (String, default: <none>)
spring.redis.port
Redis server port. (Integer, default: 6379)
spring.redis.sentinel.master
Name of the Redis server. (String, default: <none>)
spring.redis.sentinel.nodes
Comma-separated list of "host:port" pairs. (List<String>, default: <none>)
spring.redis.ssl
Whether to enable SSL support. (Boolean, default: false)
spring.redis.timeout
Connection timeout. (Duration, default: <none>)
spring.redis.url
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default: <none>)

4.16.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:

$ ./mvnw clean package

4.16.5 Examples

java -jar redis-pubsub-sink.jar --redis.queue=
java -jar redis-pubsub-sink.jar --redis.queueExpression=
java -jar redis-pubsub-sink.jar --redis.key=
java -jar redis-pubsub-sink.jar --redis.keyExpression=
java -jar redis-pubsub-sink.jar --redis.topic=
java -jar redis-pubsub-sink.jar --redis.topicExpression=

4.17 Router Sink

This application routes messages to named channels.

4.17.1 Input

Headers

Payload

Any

4.17.2 Output

N/A

4.17.3 Options

4.17.4 Options

The router sink has the following options:

router.default-output-channel
Where to send unroutable messages. (String, default: nullChannel)
router.destination-mappings
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.expression
The expression to be applied to the message to determine the channel(s) to route to. (Expression, default: <none>)
router.refresh-delay
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default: 60000)
router.resolution-required
Whether or not channel resolution is required. (Boolean, default: false)
router.script
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default: <none>)
router.variables
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default: <none>)
router.variables-location
The location of a properties file containing custom script variable bindings. (Resource, default: <none>)
[Note]Note

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

4.17.5 SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

4.17.6 Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

4.17.7 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.17.8 Examples

java -jar router-sink.jar --expression=" "
java -jar router-sink.jar --script=" "

4.18 Amazon S3 Sink

This sink app supports transfer files to the Amazon S3 bucket. Files payloads (and directories recursively) are transferred to the remote directory (S3 bucket) to the local directory where the application is deployed.

Messages accepted by this sink must contain payload as:

  • File, including directories for recursive upload;
  • InputStream;
  • byte[]

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

4.18.1 Input

Headers

N/A

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.18.2 Output

N/A

4.18.3 Options

The s3 sink has the following options:

s3.acl
S3 Object access control list. (CannedAccessControlList, default: <none>, possible values: private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)
s3.acl-expression
Expression to evaluate S3 Object access control list. (Expression, default: <none>)
s3.bucket
AWS bucket for target file(s) to store. (String, default: <none>)
s3.bucket-expression
Expression to evaluate AWS bucket name. (Expression, default: <none>)
s3.key-expression
Expression to evaluate S3 Object key. (Expression, default: <none>)

The target generated application based on the AmazonS3SinkConfiguration can be enhanced with the S3MessageHandler.UploadMetadataProvider and/or S3ProgressListener, which are injected into S3MessageHandler bean.

4.18.4 Amazon AWS common options

The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.

Some of them are about AWS credentials:

  • cloud.aws.credentials.accessKey
  • cloud.aws.credentials.secretKey
  • cloud.aws.credentials.instanceProfile
  • cloud.aws.credentials.profileName
  • cloud.aws.credentials.profilePath

Other are for AWS Region definition:

  • cloud.aws.region.auto
  • cloud.aws.region.static

And for AWS Stack:

  • cloud.aws.stack.auto
  • cloud.aws.stack.name

4.18.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar s3-sink.jar --s3.bucket=/tmp/bar

4.19 SFTP Sink

SFTP sink is a simple option to push files to an SFTP server from incoming messages.

It uses an sftp-outbound-adapter, therefore incoming messages can be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

4.19.1 Input

Headers

  • file_name (See note above)

Payload

  • java.io.File
  • java.io.InputStream
  • byte[]
  • String

4.19.2 Output

N/A (writes to the SFTP server).

4.19.3 Options

The sftp sink has the following options:

sftp.auto-create-dir
Whether or not to create the remote directory. (Boolean, default: true)
sftp.factory.allow-unknown-keys
True to allow an unknown or changed key. (Boolean, default: false)
sftp.factory.cache-sessions
Cache sessions (Boolean, default: <none>)
sftp.factory.host
The host name of the server. (String, default: localhost)
sftp.factory.known-hosts-expression
A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)
sftp.factory.pass-phrase
Passphrase for user's private key. (String, default: <empty string>)
sftp.factory.password
The password to use to connect to the server. (String, default: <none>)
sftp.factory.port
The port of the server. (Integer, default: 22)
sftp.factory.private-key
Resource location of user's private key. (String, default: <empty string>)
sftp.factory.username
The username to use to connect to the server. (String, default: <none>)
sftp.filename-expression
A SpEL expression to generate the remote file name. (Expression, default: <none>)
sftp.mode
Action to take if the remote file already exists. (FileExistsMode, default: <none>, possible values: APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)
sftp.remote-dir
The remote FTP directory. (String, default: /)
sftp.remote-file-separator
The remote file separator. (String, default: /)
sftp.temporary-remote-dir
A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default: /)
sftp.tmp-file-suffix
The suffix to use while the transfer is in progress. (String, default: .tmp)
sftp.use-temporary-filename
Whether or not to write to a temporary file and rename. (Boolean, default: true)

4.19.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar sftp_sink.jar --sftp.remote-dir=bar --sftp.factory.host=sftpserver \
         --sftp.factory.username=user --sftp.factory.password=pw

4.20 TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

4.20.1 Input

Headers:

  • Content-Type: application/octet-stream

Payload:

  • byte[]

Headers:

  • Content-Type: text/plain

Payload:

  • String

4.20.2 Output

N/A

4.20.3 Options

The tcp sink has the following options:

tcp.charset
The charset used when converting from bytes to String. (String, default: UTF-8)
tcp.close
Whether to close the socket after each message. (Boolean, default: false)
tcp.encoder
The encoder to use when sending messages. (Encoding, default: <none>, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
tcp.host
The host to which this sink will connect. (String, default: <none>)
tcp.nio
Whether or not to use NIO. (Boolean, default: false)
tcp.port
The port on which to listen; 0 for the OS to choose a port. (Integer, default: 1234)
tcp.reverse-lookup
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default: false)
tcp.socket-timeout
The timeout (ms) before closing the socket when no data is received. (Integer, default: 120000)
tcp.use-direct-buffers
Whether or not to use direct buffers. (Boolean, default: false)

4.20.4 Available Encoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

4.20.5 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.20.6 Examples

java -jar tcp_sink.jar --tcp.encoder=LF

4.21 Throughput Sink

A simple handler that will count messages and log witnessed throughput at a selected interval.

4.21.1 Input

Headers

Payload

Any

4.21.2 Output

N/A

4.21.3 Options

4.21.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.21.5 Examples

java -jar throughput-sink.jar

4.22 Websocket Sink

A simple Websocket Sink implementation.

4.22.1 Input

Headers

Payload

Any

4.22.2 Output

N/A

4.22.3 Options

The following commmand line arguments are supported:

websocket.log-level
the logLevel for netty channels. Default is <tt>WARN</tt> (String, default: <none>)
websocket.path
the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default: /websocket)
websocket.port
the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default: 9292)
websocket.ssl
whether or not to create a {@link io.netty.handler.ssl.SslContext} (Boolean, default: false)
websocket.threads
the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default: 1)

4.22.4 Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

4.22.5 Examples

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Rabbitmq

Step 2: Deploy a time-source

Step 3: Deploy a websocket-sink (the app that contains this starter jar)

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.stream.module.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

4.22.6 Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketsinktrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketsinktrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser

[Note]Note

For SSL mode (--ssl=true) a self signed certificate is used that might cause troubles with some Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not self-signed) certificate.