4. Sinks

4.1 Cassandra Sink

This module writes the content of each message it receives to Cassandra.

4.1.1 Options

The cassandra sink has the following options:

compressionType
the compression to use for the transport (CompressionType, default: NONE, possible values: NONE,SNAPPY)
consistencyLevel
the consistencyLevel option of WriteOptions (ConsistencyLevel, no default, possible values: ANY,ONE,TWO,THREE,QUOROM,LOCAL_QUOROM,EACH_QUOROM,ALL,LOCAL_ONE,SERIAL,LOCAL_SERIAL)
spring.cassandra.contactPoints
the comma-delimited string of the hosts to connect to Cassandra (String, default: localhost)
entityBasePackages
the base packages to scan for entities annotated with Table annotations (String[], default: [])
ingestQuery
the ingest Cassandra query (String, no default)
spring.cassandra.initScript
the path to file with CQL scripts (delimited by ';') to initialize keyspace schema (String, no default)
spring.cassandra.keyspace
the keyspace name to connect to (String, default: <stream name>)
metricsEnabled
enable/disable metrics collection for the created cluster (boolean, default: true)
spring.cassandra.password
the password for connection (String, no default)
spring.cassandra.port
the port to use to connect to the Cassandra host (int, default: 9042)
queryType
the queryType for Cassandra Sink (Type, default: INSERT, possible values: INSERT,UPDATE,DELETE,STATEMENT)
retryPolicy
the retryPolicy option of WriteOptions (RetryPolicy, no default, possible values: DEFAULT,DOWNGRADING_CONSISTENCY,FALLTHROUGH,LOGGING)
statementExpression
the expression in Cassandra query DSL style (String, no default)
spring.cassandra.schemaAction
schema action to perform (SchemaAction, default: NONE, possible values: CREATE,NONE,RECREATE,RECREATE_DROP_UNUSED)
ttl
the time-to-live option of WriteOptions (int, default: 0)
spring.cassandra.username
the username for connection (String, no default)

4.2 Counter Sink

The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.

4.2.1 Options

The counter sink has the following options:

name
The name of the counter to increment. (String, default: counts)
nameExpression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (String, default: ``)
store
The name of a store used to store the counter. (String, default: redis, possible values: memory, redis)

4.3 Field Value Counter Sink

A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload. This sinks supports the following payload types out of the box:

  • POJO (Java bean)
  • Tuple
  • JSON String

For example suppose a message source produces a payload with a field named user :

class Foo {
   String user;
   public Foo(String user) {
       this.user = user;
   }
}

If the stream source produces messages with the following objects:

   new Foo("fred")
   new Foo("sue")
   new Foo("dave")
   new Foo("sue")

The field value counter on the field user will contain:

fred:1, sue:2, dave:1

Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:

users:["dave","fred","sue"]
users:["sue","jon"]

The field value counter on the field users will contain:

dave:1, fred:1, sue:2, jon:1

4.3.1 Options

The field-value-counter sink has the following options:

fieldName
the name of the field for which values are counted (String, no default)
name
the name of the metric to contribute to (will be created if necessary) (String, default: <stream name>)
nameExpression
a SpEL expression to compute the name of the metric to contribute to (String, no default)
store
The name of a store used to store the counter. (String, default: redis, possible values: memory, redis)

4.4 File Sink

This module writes each message it receives to a file.

4.4.1 Options

The file sink has the following options:

binary
if false, will append a newline character at the end of each line (boolean, default: false)
charset
the charset to use when writing a String payload (String, default: UTF-8)
dir
the directory in which files will be created (String, default: /tmp/xd/output/)
dirExpression
spring expression used to define directory name (String, no default)
mode
what to do if the file already exists (Mode, default: APPEND, possible values: APPEND,REPLACE,FAIL,IGNORE)
name
filename pattern to use (String, default: <stream name>)
nameExpression
spring expression used to define filename (String, no default)
suffix
filename extension to use (String, no default)

4.5 FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages could be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.5.1 Options

The ftp sink has the following options:

autoCreateDir
remote directory must be auto created if it does not exist (boolean, default: true)
clientMode
client mode to use: 2 for passive mode and 0 for active mode (int, default: 0)
host
the host name for the FTP server (String, default: localhost)
mode
what to do if the file already exists (Mode, default: REPLACE, possible values: APPEND,REPLACE,FAIL,IGNORE)
password
the password for the FTP connection (Password, no default)
port
the port for the FTP server (int, default: 21)
remoteDir
the remote directory to transfer the files to (String, default: /)
remoteFileSeparator
file separator to use on the remote side (String, default: /)
temporaryRemoteDir
temporary remote directory that should be used (String, default: /)
tmpFileSuffix
extension to use on server side when uploading files (String, default: .tmp)
useTemporaryFilename
use a temporary filename while transferring the file and rename it to its final name once it's fully transferred (boolean, default: true)
username
the username for the FTP connection (String, no default)

4.6 Gemfire Sink

The Gemfire sink allows one to write message payloads to a Gemfire server.

4.6.1 Options

The gemfire sink has the following options:

hostAddresses
a comma separated list of [host]:[port] specifying either locator or server addresses for the client connection pool (String, localhost:10334)
keyExpression
a SpEL expression which is evaluated to create a cache key (String, default: the value is currently the message payload')
port
port of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)
regionName
name of the region to use when storing data (String, default: ${spring.application.name})
connectType
'server' or 'locator' (String, default: locator)

4.7 Gpfdist Sink

A sink module that route messages into GPDB/HAWQ segments via gpfdist protocol. Internally, this sink creates a custom http listener that supports the gpfdist protcol and schedules a task that orchestrates a gploadd session in the same way it is done natively in Greenplum.

No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.

4.7.1 Options

The gpfdist sink has the following options:

batchCount
Number of windowed batch each segment takest (int, default: 100)
batchPeriod
Time in seconds for each load operation to sleep in between operations (int, default: 10)
batchTimeout
Timeout in seconds for segment inactivity. (Integer, default: 4)
columnDelimiter
Data record column delimiter. *(Character, default: no default)
controlFile
path to yaml control file (String, no default)
dbHost
database host (String, default: localhost)
dbName
database name (String, default: gpadmin)
dbPassword
database password (String, default: gpadmin)
dbPort
database port (int, default: 5432)
dbUser
database user (String, default: gpadmin)
delimiter
data line delimiter (String, default: newline character)
errorTable
Tablename to log errors. (String, default: ``)
flushCount
flush item count (int, default: 100)
flushTime
flush item time (int, default: 2) gpfdistPort::Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0)
matchColumns
match columns with update (String, no default)
mode
mode, either insert or update (String, no default)
nullString
Null string definition. (String, default: ``)
port
gpfdist listen port (int, default: 0)
rateInterval
enable transfer rate interval (int, default: 0)
segmentRejectLimit
Error reject limit. (String, default: ``)
segmentRejectType
Error reject type, either `rows` or `percent`. (String, default: ``)
sqlAfter
sql to run after load (String, no default)
sqlBefore
sql to run before load (String, no default)
table
target database table (String, no default)
updateColumns
update columns with update (String, no default)

4.7.2 Implementation Notes

Within a gpfdist sink we have a Reactor based stream where data is published from the incoming SI channel. This channel receives data from the Message Bus. The Reactor stream is then connected to Netty based http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among existing http clients. When Greenplum does a load from an external table, each segment will initiate a http connection and start loading data. The net effect is that incoming data is automatically spread among the Greenplum segments.

4.7.3 Detailed Option Descriptions

The gpfdist sink supports the following configuration properties:

table

Database table to work with. (String, default: ``, required)

This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.

Currently table is only way to define a structure of an external table. Effectively it will replace other_table in below clause segment.

CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
mode

Gpfdist mode, either `insert` or `update`. (String, default: insert)

Currently only insert and update gpfdist mode is supported. Mode merge familiar from a native gpfdist loader is not yet supported.

For mode update options matchColumns and updateColumns are required.

columnDelimiter

Data record column delimiter. (Character, default: ``)

Defines used delimiter character in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[DELIMITER AS 'delimiter']
segmentRejectLimit

Error reject limit. (String, default: ``)

Defines a count value in a below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]

As a conveniance this reject limit also recognizes a percentage format 2% and if used, segmentRejectType is automatically set to percent.

segmentRejectType

Error reject type, either `rows` or `percent`. (String, default: ``)

Defines ROWS or PERCENT in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
errorTable

Tablename to log errors. (String, default: ``)

As error table is optional with SEGMENT REJECT LIMIT, it’s only used if both segmentRejectLimit and segmentRejectType are set. Sets error_table in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
nullString

Null string definition. (String, default: ``)

Defines used null string in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[NULL AS 'null string']
delimiter

Data record delimiter for incoming messages. (String, default: \n)

On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.

 

If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered.

 
 -- External Table Docs
matchColumns

Comma delimited list of columns to match. (String, default: ``)

[Note]Note

See more from examples below.

updateColumns

Comma delimited list of columns to update. (String, default: ``)

[Note]Note

See more from examples below.

sqlBefore
Sql clause to run before each load operation. (String, default: ``)
sqlAfter
Sql clause to run after each load operation. (String, default: ``)
rateInterval

Debug rate of data transfer. (Integer, default: 0)

If set to non zero, sink will log a rate of messages passing throught a sink after number of messages denoted by this setting has been processed. Value 0 means that this rate calculation and logging is disabled.

flushCount

Max collected size per windowed data. (Integer, default: 100)

[Note]Note

For more info on flush and batch settings, see above.

4.7.4 How Data Is Sent Into Segments

There are few important concepts involving how data passes into a sink, through it and finally lands into a database.

  • Sink has its normal message handler for incoming data from a source module, gpfdist protocol listener based on netty where segments connect to and in between those two a reactor based streams controlling load balancing into different segment connections.
  • Incoming data is first sent into a reactor which first constructs a windows. This window is then released into a downstream when it gets full(flushTime) or timeouts(flushTime) if window doesn’t get full. One window is then ready to get send into a segment.
  • Segments which connects to this stream are now able to see a stream of window data, not stream of individual messages. We can also call this as a stream of batches.
  • When segment makes a connection to a protocol listener it subscribes itself into this stream and takes count of batches denoted by batchCount and completes a stream if it got enough batches or if batchTimeout occurred due to inactivity.
  • It doesn’t matter how many simultaneous connections there are from a database cluster at any given time as reactor will load balance batches with all subscribers.
  • Database cluster will initiate this loading session when select is done from an external table which will point to this sink. These loading operations are run in a background in a loop one after another. Option batchPeriod is then used as a sleep time in between these load sessions.

Lets take a closer look how options flushCount, flushTime, batchCount, batchTimeout and batchPeriod work.

As in a highest level where incoming data into a sink is windowed, flushCount and flushTime controls when a batch of messages are sent into a downstream. If there are a lot of simultaneous segment connections, flushing less will keep more segments inactive as there is more demand for batches than what flushing will produce.

When existing segment connection is active and it has subscribed itself with a stream of batches, data will keep flowing until either batchCount is met or batchTimeout occurs due to inactivity of data from an upstream. Higher a batchCount is more data each segment will read. Higher a batchTimeout is more time segment will wait in case there is more data to come.

As gpfdist load operations are done in a loop, batchPeriod simply controls not to run things in a buzy loop. Buzy loop would be ok if there is a constant stream of data coming in but if incoming data is more like bursts then buzy loop would be unnecessary.

[Note]Note

Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully.

Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.

4.7.5 Example Usage

In this first example we’re just creating a simple stream which inserts data from a time source. Let’s create a table with two text columns.

gpadmin=# create table ticktock (date text, time text);

Create a simple stream gpstream.

dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy

Let it run and see results from a database.

gpadmin=# select count(*) from ticktock;
 count
-------
    14
(1 row)

In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.

gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');

Now table looks like this.

gpadmin=# select * from httpdata;
 col1  | col2 | col3
-------+------+------
 DATA3 | DATA | DATA
 DATA2 | DATA | DATA
 DATA1 | DATA | DATA
(3 rows)

Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.

dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy

Post some data into a stream which will be passed into a gpfdist sink via http source.

curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/

If you query table again, you’ll see that row for DATA1 has been updated.

gpadmin=# select * from httpdata;
 col1  | col2  | col3
-------+-------+-------
 DATA3 | DATA  | DATA
 DATA2 | DATA  | DATA
 DATA1 | DATA1 | DATA1
(3 rows)

4.7.6 Tuning Transfer Rate

Default values for options flushCount, flushTime, batchCount, batchTimeout and batchPeriod are relatively conservative and needs to be tuned for every use case for optimal performance. Order to make a decision on how to tune sink behaviour to suit your needs few things needs to be considered.

  • What is an average size of messages ingested by a sink.
  • How fast you want data to become visible in a database.
  • Is incoming data a constant flow or a bursts of data.

Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.

Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.

Longer a load session for each segment is active higher the overall transfer rate is going to be. Option batchCount naturally controls this. However option batchTimeout then really controls how fast each segment will complete a stream due to inactivity from upstream and to step away from a loading session to allow distributes session to finish and data become visible in a database.

4.8 HDFS Sink

This module writes each message it receives to HDFS.

4.8.1 Options

The hdfs sink has the following options:

closeTimeout
timeout in ms, regardless of activity, after which file will be automatically closed (long, default: 0)
codec
compression codec alias name (gzip, snappy, bzip2, lzo, or slzo) (String, default: ``)
directory
where to output the files in the Hadoop FileSystem (String, default: ``)
enableSync
whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput (boolean, default: false)
fileExtension
the base filename extension to use for the created files (String, default: txt)
fileName
the base filename to use for the created files (String, default: <stream name>)
fileOpenAttempts
maximum number of file open attempts to find a path (int, default: 10)
fileUuid
whether file name should contain uuid (boolean, default: false)
flushTimeout
timeout in ms, regardless of activity, after which data written to file will be flushed (long, default: 0)
fsUri
the URI to use to access the Hadoop FileSystem (String, default: ${spring.hadoop.fsUri})
idleTimeout
inactivity timeout in ms after which file will be automatically closed (long, default: 0)
inUsePrefix
prefix for files currently being written (String, default: ``)
inUseSuffix
suffix for files currently being written (String, default: .tmp)
overwrite
whether writer is allowed to overwrite files in Hadoop FileSystem (boolean, default: false)
partitionPath
a SpEL expression defining the partition path (String, default: ``)
rollover
threshold in bytes when file will be automatically rolled over (String, default: 1G)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.9 Jdbc Sink

A module that writes its incoming payload to an RDBMS using JDBC.

4.9.1 Options

The jdbc sink has the following options:

expression
a SpEL expression used to transform messages (String, default: ``)
tableName
String (String, default: <stream name)
columns
the names of the columns that shall receive data, as a set of column[:SpEL] mappings, also used at initialization time to issue the DDL (String, default: payload)
initialize
String (Boolean, default: false)
batchSize
String (long, default: 10000)
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.10 Log Sink

The log sink uses the application logger to output the data for inspection.

4.10.1 Options

The log sink has the following options:

expression
the expression to be evaluated for the log content; use '#root' to log the full message (String, default: payload)
level
the log level (String, default: INFO)
name
the name of the log category to log to (String, default: ``)

4.11 RabbitMQ Sink

This module sends messages to RabbitMQ.

4.11.1 Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

converterBeanName
the bean name of the message converter (String, default: none)
persistentDeliveryMode
the default delivery mode, true for persistent (boolean, default: false)
exchange
the Exchange on the RabbitMQ broker to which messages should be sent (String, default: "")
exchangeExpression
a SpEL expression that evaluates to the Exchange on the RabbitMQ broker to which messages should be sent; overrides `exchange` (String, default: ``)
mappedRequestHeaders
request message header names to be propagated to RabbitMQ, to limit to the set of standard headers plus `bar`, use `STANDARD_REQUEST_HEADERS,bar` (String, default: *)
routingKey
the routing key to be passed with the message, as a SpEL expression (String, default: none)
routingKeyExpression
an expression that evaluates to the routing key to be passed with the message, as a SpEL expression; overrides `routingKey` (String, default: none)
[Note]Note

By default, the message converter is a SimpleMessageConverter which handles byte[], String and java.io.Serializable. A well-known bean name jsonConverter will configure a Jackson2JsonMessageConverter instead. In addition, a custom converter bean can be added to the context and referenced by the converterBeanName property.

4.12 Redis Sink

This module sends messages to Redis store.

4.12.1 Options

The redis sink has the following options:

topicExpression
a SpEL expression to use for topic (String, no default)
queueExpression
a SpEL expression to use for queue (String, no default)
keyExpression
a SpEL expression to use for keyExpression (String, no default)
key
name for the key (String, no default)
queue
name for the queue (String, no default)
topic
name for the topic (String, no default)

4.13 Router Sink

This module routes messages to named channels.

4.13.1 Options

The router sink has the following options:

destinations
comma-delimited destinations mapped from evaluation results (String, no default)
defaultOutputChannel
Where to route messages where the channel cannot be resolved (String, default: nullChannel)
expression
a SpEL expression used to determine the destination (String, default: headers['routeTo'])
propertiesLocation
the path of a properties file containing custom script variable bindings (String, no default)
refreshDelay
how often to check (in milliseconds) whether the script (if present) has changed; -1 for never (long, default: 60000)
script
reference to a script used to process messages (String, no default)
destinationMappings
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (String, no default)
[Note]Note

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

4.13.2 SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

4.13.3 Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

4.14 TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

4.14.1 Options

The tcp sink has the following options:

charset
the charset used when converting from String to bytes (String, default: UTF-8)
close
whether to close the socket after each message (boolean, default: false)
encoder
the encoder to use when sending messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
host
the remote host to connect to (String, default: localhost)
nio
whether or not to use NIO (boolean, default: false)
port
the port on the remote host to connect to (int, default: 1234)
reverseLookup
perform a reverse DNS lookup on the remote IP Address (boolean, default: false)
socketTimeout
the timeout (ms) before closing the socket when no data is received (int, default: 120000)
useDirectBuffers
whether or not to use direct buffers (boolean, default: false)

4.14.2 Available Encoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

4.15 Throughput Sink

A simple handler that will count messages and log witnessed throughput at a selected interval.

4.16 Websocket Sink

A simple Websocket Sink implementation.

4.16.1 Options

The following commmand line arguments are supported:

websocketPort
controls the port onto which the Netty server binds (String, default: 9292)
websocketPath
controls the path where the WebsocketServer expects Websocket connections (String, default: /websocket)
ssl
controls whether the Websocket server should accept SSL connections (i.e. `wss://host:9292/websocket`) (Boolean, default: false)
websocketLoglevel
controls the loglevel of the underlying Netty loghandler (String, default: WARN)
threads
controls the number of worker threads used for the Netty event loop (String, default: 1)

4.16.2 Example

To verify that the websocket-sink receives messages from other spring-cloud-stream modules, you can use the following simple end-to-end setup.

Step 1: Start Redis

The default broker that is used is Redis. Normally can start Redis via redis-server.

Step 2: Deploy a time-source

Deploy a time-source:

java -jar target/time-source***-exec.jar --spring.cloud.stream.bindings.output=ticktock --server.port=9191

Step 3: Deploy a websocket-sink

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar target/websocket-sink-***-exec.jar --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.stream.module.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

4.16.3 Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketsinktrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketsinktrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser

[Note]Note

For SSL mode (--ssl=true) a self signed certificate is used that might cause troubles with some Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not self-signed) certificate.