11. Sinks

11.1 Counter (counter)

A simple module that counts messages received, using Spring Boot metrics abstraction.

The counter sink has the following options:

name
The name of the counter to increment. (String, default: counts)
nameExpression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (String, default: ``)
store
The name of a store used to store the counter. (String, default: memory, possible values: memory, redis)

11.2 Hadoop (HDFS) (hdfs)

If you do not have Hadoop installed, you can install Hadoop as described in our separate guide. Spring XD supports 4 Hadoop distributions, see using Hadoop for more information on how to start Spring XD to target a specific distribution.

Once Hadoop is up and running, you can then use the hdfs sink when creating a stream

dataflow:> stream create --name myhdfsstream1 --definition "time | hdfs" --deploy

In the above example, we’ve scheduled time source to automatically send ticks to hdfs once in every second. If you wait a little while for data to accumuluate you can then list can then list the files in the hadoop filesystem using the shell’s built in hadoop fs commands. Before making any access to HDFS in the shell you first need to configure the shell to point to your name node. This is done using the hadoop config command.

dataflow:>hadoop config fs --namenode hdfs://localhost:8020

In this example the hdfs protocol is used but you may also use the webhdfs protocol. Listing the contents in the output directory (named by default after the stream name) is done by issuing the following command.

dataflow:>hadoop fs ls /xd/myhdfsstream1
Found 1 items
-rw-r--r--   3 jvalkealahti supergroup          0 2013-12-18 18:10 /xd/myhdfsstream1/myhdfsstream1-0.txt.tmp

While the file is being written to it will have the tmp suffix. When the data written exceeds the rollover size (default 1GB) it will be renamed to remove the tmp suffix. There are several options to control the in use file file naming options. These are --inUsePrefix and --inUseSuffix set the file name prefix and suffix respectfully.

When you destroy a stream

dataflow:>stream destroy --name myhdfsstream1

and list the stream directory again, in use file suffix doesn’t exist anymore.

dataflow:>hadoop fs ls /xd/myhdfsstream1
Found 1 items
-rw-r--r--   3 jvalkealahti supergroup        380 2013-12-18 18:10 /xd/myhdfsstream1/myhdfsstream1-0.txt

To list the list the contents of a file directly from a shell execute the hadoop cat command.

dataflow:> hadoop fs cat /xd/myhdfsstream1/myhdfsstream1-0.txt
2013-12-18 18:10:07
2013-12-18 18:10:08
2013-12-18 18:10:09
...

In the above examples we didn’t yet go through why the file was written in a specific directory and why it was named in this specific way. Default location of a file is defined as /xd/<stream name>/<stream name>-<rolling part>.txt. These can be changed using options --directory and --fileName respectively. Example is shown below.

dataflow:>stream create --name myhdfsstream2 --definition "time | hdfs --directory=/xd/tmp --fileName=data" --deploy
dataflow:>stream destroy --name myhdfsstream2
dataflow:>hadoop fs ls /xd/tmp
Found 1 items
-rw-r--r--   3 jvalkealahti supergroup        120 2013-12-18 18:31 /xd/tmp/data-0.txt

It is also possible to control the size of a files written into HDFS. The --rollover option can be used to control when file currently being written is rolled over and a new file opened by providing the rollover size in bytes, kilobytes, megatypes, gigabytes, and terabytes.

dataflow:>stream create --name myhdfsstream3 --definition "time | hdfs --rollover=100" --deploy
dataflow:>stream destroy --name myhdfsstream3
dataflow:>hadoop fs ls /xd/myhdfsstream3
Found 3 items
-rw-r--r--   3 jvalkealahti supergroup        100 2013-12-18 18:41 /xd/myhdfsstream3/myhdfsstream3-0.txt
-rw-r--r--   3 jvalkealahti supergroup        100 2013-12-18 18:41 /xd/myhdfsstream3/myhdfsstream3-1.txt
-rw-r--r--   3 jvalkealahti supergroup        100 2013-12-18 18:41 /xd/myhdfsstream3/myhdfsstream3-2.txt

Shortcuts to specify sizes other than bytes are written as --rollover=64M, --rollover=512G or --rollover=1T.

The stream can also be compressed during the write operation. Example of this is shown below.

dataflow:>stream create --name myhdfsstream4 --definition "time | hdfs --codec=gzip" --deploy
dataflow:>stream destroy --name myhdfsstream4
dataflow:>hadoop fs ls /xd/myhdfsstream4
Found 1 items
-rw-r--r--   3 jvalkealahti supergroup         80 2013-12-18 18:48 /xd/myhdfsstream4/myhdfsstream4-0.txt.gzip

From a native os shell we can use hadoop’s fs commands and pipe data into gunzip.

# bin/hadoop fs -cat /xd/myhdfsstream4/myhdfsstream4-0.txt.gzip | gunzip
2013-12-18 18:48:10
2013-12-18 18:48:11
...

Often a stream of data may not have a high enough rate to roll over files frequently, leaving the file in an opened state. This prevents users from reading a consistent set of data when running mapreduce jobs. While one can alleviate this problem by using a small rollover value, a better way is to use the idleTimeout option that will automatically close the file if there was no writes during the specified period of time. This feature is also useful in cases where burst of data is written into a stream and you’d like that data to become visible in HDFS.

[Note]Note

The idleTimeout value should not exceed the timeout values set on the Hadoop cluster. These are typically configured using the dfs.socket.timeout and/or dfs.datanode.socket.write.timeout properties in the hdfs-site.xml configuration file.

dataflow:> stream create --name myhdfsstream5 --definition "http --port=8000 | hdfs --rollover=20 --idleTimeout=10000" --deploy

In the above example we changed a source to http order to control what we write into a hdfs sink. We defined a small rollover size and a timeout of 10 seconds. Now we can simply post data into this stream via source end point using a below command.

dataflow:> http post --target http://localhost:8000 --data "hello"

If we repeat the command very quickly and then wait for the timeout we should be able to see that some files are closed before rollover size was met and some were simply rolled because of a rollover size.

dataflow:>hadoop fs ls /xd/myhdfsstream5
Found 4 items
-rw-r--r--   3 jvalkealahti supergroup         12 2013-12-18 19:02 /xd/myhdfsstream5/myhdfsstream5-0.txt
-rw-r--r--   3 jvalkealahti supergroup         24 2013-12-18 19:03 /xd/myhdfsstream5/myhdfsstream5-1.txt
-rw-r--r--   3 jvalkealahti supergroup         24 2013-12-18 19:03 /xd/myhdfsstream5/myhdfsstream5-2.txt
-rw-r--r--   3 jvalkealahti supergroup         18 2013-12-18 19:03 /xd/myhdfsstream5/myhdfsstream5-3.txt

Files can be automatically partitioned using a partitionPath expression. If we create a stream with idleTimeout and partitionPath with simple format yyyy/MM/dd/HH/mm we should see writes ending into its own files within every minute boundary.

dataflow:>stream create --name myhdfsstream6 --definition "time|hdfs --idleTimeout=10000 --partitionPath=dateFormat('yyyy/MM/dd/HH/mm')" --deploy

Let a stream run for a short period of time and list files.

dataflow:>hadoop fs ls --recursive true --dir /xd/myhdfsstream6
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:42 /xd/myhdfsstream6/2014
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:42 /xd/myhdfsstream6/2014/05
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:42 /xd/myhdfsstream6/2014/05/28
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:45 /xd/myhdfsstream6/2014/05/28/09
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:43 /xd/myhdfsstream6/2014/05/28/09/42
-rw-r--r--   3 jvalkealahti supergroup        140 2014-05-28 09:43 /xd/myhdfsstream6/2014/05/28/09/42/myhdfsstream6-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:44 /xd/myhdfsstream6/2014/05/28/09/43
-rw-r--r--   3 jvalkealahti supergroup       1200 2014-05-28 09:44 /xd/myhdfsstream6/2014/05/28/09/43/myhdfsstream6-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 09:45 /xd/myhdfsstream6/2014/05/28/09/44
-rw-r--r--   3 jvalkealahti supergroup       1200 2014-05-28 09:45 /xd/myhdfsstream6/2014/05/28/09/44/myhdfsstream6-0.txt

Partitioning can also be based on defined lists. In a below example we simulate feeding data by using a time and a transform elements. Data passed to hdfs sink has a content APP0:foobar, APP1:foobar, APP2:foobar or APP3:foobar.

dataflow:>stream create --name myhdfsstream7 --definition "time | transform --expression=\"'APP'+T(Math).round(T(Math).random()*3)+':foobar'\" | hdfs --idleTimeout=10000 --partitionPath=path(dateFormat('yyyy/MM/dd/HH'),list(payload.split(':')[0],{{'0TO1','APP0','APP1'},{'2TO3','APP2','APP3'}}))" --deploy

Let the stream run few seconds, destroy it and check what got written in those partitioned files.

dataflow:>stream destroy --name myhdfsstream7
Destroyed stream 'myhdfsstream7'
dataflow:>hadoop fs ls --recursive true --dir /xd
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014/05
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28/19
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28/19/0TO1_list
-rw-r--r--   3 jvalkealahti supergroup        108 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28/19/0TO1_list/myhdfsstream7-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28/19/2TO3_list
-rw-r--r--   3 jvalkealahti supergroup        180 2014-05-28 19:24 /xd/myhdfsstream7/2014/05/28/19/2TO3_list/myhdfsstream7-0.txt
dataflow:>hadoop fs cat /xd/myhdfsstream7/2014/05/28/19/0TO1_list/myhdfsstream7-0.txt
APP1:foobar
APP1:foobar
APP0:foobar
APP0:foobar
APP1:foobar

Partitioning can also be based on defined ranges. In a below example we simulate feeding data by using a time and a transform elements. Data passed to hdfs sink has a content ranging from APP0 to APP15. We simple parse the number part and use it to do a partition with ranges {3,5,10}.

dataflow:>stream create --name myhdfsstream8 --definition "time | transform --expression=\"'APP'+T(Math).round(T(Math).random()*15)\" | hdfs --idleTimeout=10000 --partitionPath=path(dateFormat('yyyy/MM/dd/HH'),range(T(Integer).parseInt(payload.substring(3)),{3,5,10}))" --deploy

Let the stream run few seconds, destroy it and check what got written in those partitioned files.

dataflow:>stream destroy --name myhdfsstream8
Destroyed stream 'myhdfsstream8'
dataflow:>hadoop fs ls --recursive true --dir /xd
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/10_range
-rw-r--r--   3 jvalkealahti supergroup         16 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/10_range/myhdfsstream8-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/3_range
-rw-r--r--   3 jvalkealahti supergroup         35 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/3_range/myhdfsstream8-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/5_range
-rw-r--r--   3 jvalkealahti supergroup          5 2014-05-28 19:34 /xd/myhdfsstream8/2014/05/28/19/5_range/myhdfsstream8-0.txt
dataflow:>hadoop fs cat /xd/myhdfsstream8/2014/05/28/19/3_range/myhdfsstream8-0.txt
APP3
APP3
APP1
APP0
APP1
dataflow:>hadoop fs cat /xd/myhdfsstream8/2014/05/28/19/5_range/myhdfsstream8-0.txt
APP4
dataflow:>hadoop fs cat /xd/myhdfsstream8/2014/05/28/19/10_range/myhdfsstream8-0.txt
APP6
APP15
APP7

Partition using a dateFormat can be based on content itself. This is a good use case if old log files needs to be processed where partitioning should happen based on timestamp of a log entry. We create a fake log data with a simple date string ranging from 1970-01-10 to 1970-01-13.

dataflow:>stream create --name myhdfsstream9 --definition "time | transform --expression=\"'1970-01-'+1+T(Math).round(T(Math).random()*3)\" | hdfs --idleTimeout=10000 --partitionPath=path(dateFormat('yyyy/MM/dd/HH',payload,'yyyy-MM-DD'))" --deploy

Let the stream run few seconds, destroy it and check what got written in those partitioned files. If you see the partition paths, those are based on year 1970, not present year.

dataflow:>stream destroy --name myhdfsstream9
Destroyed stream 'myhdfsstream9'
dataflow:>hadoop fs ls --recursive true --dir /xd
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970/01
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970/01/10
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/10/00
-rw-r--r--   3 jvalkealahti supergroup         44 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/10/00/myhdfsstream9-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970/01/11
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/11/00
-rw-r--r--   3 jvalkealahti supergroup         99 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/11/00/myhdfsstream9-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970/01/12
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/12/00
-rw-r--r--   3 jvalkealahti supergroup         44 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/12/00/myhdfsstream9-0.txt
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:56 /xd/myhdfsstream9/1970/01/13
drwxr-xr-x   - jvalkealahti supergroup          0 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/13/00
-rw-r--r--   3 jvalkealahti supergroup         55 2014-05-28 19:57 /xd/myhdfsstream9/1970/01/13/00/myhdfsstream9-0.txt
dataflow:>hadoop fs cat /xd/myhdfsstream9/1970/01/10/00/myhdfsstream9-0.txt
1970-01-10
1970-01-10
1970-01-10
1970-01-10

11.2.1 Options

The hdfs sink has the following options:

closeTimeout
timeout in ms, regardless of activity, after which file will be automatically closed (long, default: 0)
codec
compression codec alias name (gzip, snappy, bzip2, lzo, or slzo) (String, default: ``)
directory
where to output the files in the Hadoop FileSystem (String, default: /tmp/hdfs-sink)
fileExtension
the base filename extension to use for the created files (String, default: txt)
fileName
the base filename to use for the created files (String, default: <stream name>)
fileOpenAttempts
maximum number of file open attempts to find a path (int, default: 10)
fileUuid
whether file name should contain uuid (boolean, default: false)
fsUri
the URI to use to access the Hadoop FileSystem (String, default: ${spring.hadoop.fsUri})
idleTimeout
inactivity timeout in ms after which file will be automatically closed (long, default: 0)
inUsePrefix
prefix for files currently being written (String, default: ``)
inUseSuffix
suffix for files currently being written (String, default: .tmp)
overwrite
whether writer is allowed to overwrite files in Hadoop FileSystem (boolean, default: false)
partitionPath
a SpEL expression defining the partition path (String, default: ``)
rollover
threshold in bytes when file will be automatically rolled over (String, default: 1G)
[Note]Note

In the context of the fileOpenAttempts option, attempt is either one rollover request or failed stream open request for a path (if another writer came up with a same path and already opened it).

11.2.2 Partition Path Expression

SpEL expression is evaluated against a Spring Messaging Message passed internally into a HDFS writer. This allows expression to use headers and payload from that message. While you could do a custom processing within a stream and add custom headers, timestamp is always going to be there. Data to be written is then available in a payload.

Accessing Properties

Using a payload simply returns whatever is currently being written. Access to headers is via headers property. Any other property is automatically resolved from headers if found. For example headers.timestamp is equivalent to timestamp.

Custom Methods

Addition to a normal SpEL functionality, few custom methods has been added to make it easier to build partition paths. These custom methods can be used to work with a normal partition concepts like date formatting, lists, ranges and hashes.

path
path(String... paths)

Concatenates paths together with a delimiter /. This method can be used to make the expression less verbose than using a native SpEL functionality to combine path parts together. To create a path part1/part2, expression 'part1' + '/' + 'part2' is equivalent to path('part1','part2').

Parameters

paths
Any number of path parts

Return Value. Concatenated value of paths delimited with /.

dateFormat
dateFormat(String pattern)
dateFormat(String pattern, Long epoch)
dateFormat(String pattern, Date date)
dateFormat(String pattern, String datestring)
dateFormat(String pattern, String datestring, String dateformat)

Creates a path using date formatting. Internally this method delegates into SimpleDateFormat and needs a Date and a pattern. On default if no parameter used for conversion is given, timestamp is expected. Effectively dateFormat('yyyy') equals to dateFormat('yyyy', timestamp) or dateFormat('yyyy', headers.timestamp).

Method signature with three parameters can be used to create a custom Date object which is then passed to SimpleDateFormat conversion using a dateformat pattern. This is useful in use cases where partition should be based on a date or time string found from a payload content itself. Default dateformat pattern if omitted is yyyy-MM-dd.

Parameters

pattern
Pattern compatible with SimpleDateFormat to produce a final output.
epoch
Timestamp as Long which is converted into a Date.
date
A Date to be formatted.
dateformat
Secondary pattern to convert datestring into a Date.
datestring
Date as a String

Return Value. A path part representation which can be a simple file or directory name or a directory structure.

list
list(Object source, List<List<Object>> lists)

Creates a partition path part by matching a source against a lists denoted by lists.

Lets assume that data is being written and it’s possible to extrace an appid either from headers or payload. We can automatically do a list based partition by using a partition method list(headers.appid,{{'1TO3','APP1','APP2','APP3'},{'4TO6','APP4','APP5','APP6'}}). This method would create three partitions, 1TO3_list, 4TO6_list and list. Latter is used if no match is found from partition lists passed to lists.

Parameters

source
An Object to be matched against lists.
lists
A definition of list of lists.

Return Value. A path part prefixed with a matched key i.e. XXX_list or list if no match.

range
range(Object source, List<Object> list)

Creates a partition path part by matching a source against a list denoted by list using a simple binary search.

The partition method takes a source as first argument and list as a second argument. Behind the scenes this is using jvm’s binarySearch which works on an Object level so we can pass in anything. Remember that meaningful range match only works if passed in Object and types in list are of same type like Integer. Range is defined by a binarySearch itself so mostly it is to match against an upper bound except the last range in a list. Having a list of {1000,3000,5000} means that everything above 3000 will be matched with 5000. If that is an issue then simply adding Integer.MAX_VALUE as last range would overflow everything above 5000 into a new partition. Created partitions would then be 1000_range, 3000_range and 5000_range.

Parameters

source
An Object to be matched against list.
list
A definition of list.

Return Value. A path part prefixed with a matched key i.e. XXX_range.

hash
hash(Object source, int bucketcount)

Creates a partition path part by calculating hashkey using source`s hashCode and bucketcount. Using a partition method hash(timestamp,2) would then create partitions named 0_hash, 1_hash and 2_hash. Number suffixed with _hash is simply calculated using Object.hashCode() % bucketcount.

Parameters

source
An Object which hashCode will be used.
bucketcount
A number of buckets

Return Value. A path part prefixed with a hash key i.e. XXX_hash.

11.3 Log (log)

Probably the simplest option for a sink is just to log the data. The log sink uses the application logger to output the data for inspection. The log level is set to WARN and the logger name is created from the stream name. To create a stream using a log sink you would use a command like

dataflow:> stream create --name mylogstream --definition "http --port=8000 | log" --deploy

You can then try adding some data. We’ve used the http source on port 8000 here, so run the following command to send a message

dataflow:> http post --target http://localhost:8000 --data "hello"

and you should see the following output in the XD container console.

13/06/07 16:12:18 INFO Received: hello

11.4 Redis (redis)

Redis sink can be used to ingest data into redis store. You can choose queue, topic or key with selcted collection type to point to a specific data store.

For example,

dataflow:>stream create store-into-redis --definition "http | redis --queue=myList" --deploy
dataflow:>Created and deployed new stream 'store-into-redis'

11.4.1 Options

The redis sink has the following options:

topicExpression
a SpEL expression to use for topic (String, no default)
queueExpression
a SpEL expression to use for queue (String, no default)
keyExpression
a SpEL expression to use for keyExpression (String, no default)
key
name for the key (String, no default)
queue
name for the queue (String, no default)
topic
name for the topic (String, no default)