Spring Cloud Stream Reference Guide

Sabby Anandan, Artem Bilan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Gary Russell, Thomas Risberg, David Turanski, Janne Valkealahti

1.0.0.M1

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.


Table of Contents

I. Reference Guide
1. Introduction
1.1. Starters and pre-built applications
1.2. Classification
1.3. Using the artifacts
1.3.1. Maven and Docker accesss
1.3.2. Building the artifacts
1.4. Creating custom artifacts
1.4.1. Using a different binder
1.4.2. Creating your own applications
Using generic Spring Cloud Stream applications
Using the starters to create custom components
II. Starters
2. Sources
2.1. File Source
2.1.1. Options
2.2. FTP Source
2.2.1. Options
2.3. Http Source
2.3.1. Options
2.4. JDBC Source
2.4.1. Options
2.5. JMS Source
2.5.1. Options
2.6. Load Generator Source
2.6.1. Options
2.7. RabbitMQ Source
2.7.1. Options
A Note About Retry
2.8. SFTP Source
2.8.1. Options
2.9. SYSLOG Source
2.9.1. Options
2.10. TCP
2.10.1. Options
2.10.2. Available Decoders
2.11. Time Source
2.11.1. Options
2.12. Trigger Source
2.12.1. Options
2.13. Twitter Stream Source
2.13.1. Options
3. Processors
3.1. Bridge Processor
3.2. Filter Processor
3.2.1. Options
3.3. Groovy Filter Processor
3.3.1. Options
3.4. Groovy Transform Processor
3.4.1. Options
3.5. Http Client Processor
3.5.1. Options
3.6. PMML Processor
3.6.1. Options
3.7. Scripable Transform Processor
3.7.1. Options
3.8. Splitter Processor
3.8.1. Options
3.8.2. JSON Example
3.9. Transform Processor
3.9.1. Options
4. Sinks
4.1. Cassandra Sink
4.1.1. Options
4.2. Counter Sink
4.2.1. Options
4.3. Field Value Counter Sink
4.3.1. Options
4.4. File Sink
4.4.1. Options
4.5. FTP Sink
4.5.1. Options
4.6. Gemfire Sink
4.6.1. Options
4.7. Gpfdist Sink
4.7.1. Options
4.7.2. Implementation Notes
4.7.3. Detailed Option Descriptions
4.7.4. How Data Is Sent Into Segments
4.7.5. Example Usage
4.7.6. Tuning Transfer Rate
4.8. HDFS Sink
4.8.1. Options
4.9. Jdbc Sink
4.9.1. Options
4.10. Log Sink
4.10.1. Options
4.11. RabbitMQ Sink
4.11.1. Options
4.12. Redis Sink
4.12.1. Options
4.13. Router Sink
4.13.1. Options
4.13.2. SpEL-based Routing
4.13.3. Groovy-based Routing
4.14. TCP Sink
4.14.1. Options
4.14.2. Available Encoders
4.15. Throughput Sink
4.16. Websocket Sink
4.16.1. Options
4.16.2. Example
Step 1: Start Redis
Step 2: Deploy a time-source
Step 3: Deploy a websocket-sink (the app that contains this starter jar)
4.16.3. Actuators
III. Appendices
A. Building
A.1. Basic Compile and Test
A.2. Documentation
A.3. Working with the code
A.3.1. Importing into eclipse with m2eclipse
A.3.2. Importing into eclipse without m2eclipse
5. Contributing
5.1. Sign the Contributor License Agreement
5.2. Code Conventions and Housekeeping

Part I. Reference Guide

This section will provide you with a detailed overview of Spring Cloud Stream Application Starters, their purpose, and how to use them. It assumes familiarity with general Spring Cloud Stream concepts, which can be found in the Spring Cloud Stream reference documentation].

1. Introduction

Spring Cloud Stream Application Starters provide you with predefined Spring Cloud Stream applications that you can run independently or with Spring Cloud Data Flow. You can also use the starters as a basis for creating your own applications. They include:

  • connectors (sources and sinks) for middleware including message brokers, storage (relational, non-relational, filesystem);
  • adapters for various network protocols;
  • generic processors that can be customized via Spring Expression Language (SpEL) or scripting.

You can find a detailed listing of all the starters and as their options in the corresponding section of this guide.

1.1 Starters and pre-built applications

As a user of Spring Cloud Stream Application Starters you have access to two types of artifacts.

Starters are libraries that contain the complete configuration of a Spring Cloud Stream application with a specific role (e.g. an HTTP source that receives HTTP POST requests and forwards the data on its output channel to downstream Spring Cloud Stream applications). Starters are not executable applications, and are intended to be included in other Spring Boot applications, along with a Binder implementation.

Prebuilt applications are Spring Boot applications that include the starters and a Binder implementation. Prebuilt applications are uberjars and include minimal code required to execute standalone. For each starter, the project provides a prebuilt version including the Kafka Binder and a prebuilt version including the Rabbit MQ Binder.

[Note]Note

Only starters are present in the source code of the project. Prebuilt applications are generated according to the Maven plugin configuration.

1.2 Classification

Based on their target application type, starters can be either:

  • a source that connects to an external resource to receive data that is sent on its sole output channel;
  • a processor that receives data from a single input channel and processes it, sending the result on its single output channel;
  • a sink that connects to an external resource to send data that is received on its sole input channel.

You can easily identify the type and functionality of a starter based on its name. All starters are named following the convention spring-cloud-starter-stream-<type>-<functionality>. For example spring-cloud-starter-stream-source-file is a starter for a file source that polls a directory and sends file data on the output channel (read the reference documentation of the source for details). Conversely, spring-cloud-starter-stream-sink-cassandra is a starter for a Cassandra sink that writes the data that it receives on the input channel to Cassandra (read the reference documentation of the sink for details).

The prebuilt applications follow a naming convention too: <functionality>-<type>-<binder>. For example, cassandra-sink-kafka is a Cassandra sink using the Kafka binder.

1.3 Using the artifacts

You either get access to the artifacts produced by Spring Cloud Stream Application Starters via Maven, Docker, or building the artifacts yourself.

1.3.1 Maven and Docker accesss

Starters are available as Maven artifacts in the Spring repositories. You can add them as dependencies to your application, as follows:

<dependency>
  <group>org.springframework.cloud.stream.app</group>
  <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId>
  <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>

From this, you can infer the coordinates for other starters found in this guide. While the version may vary, the group will always remain org.springframework.cloud.stream.app and the artifact id follows the naming convention spring-cloud-starter-stream-<type>-<functionality> described previously.

Prebuilt applications are available as Maven artifacts too. It is not encouraged to use them directly as dependencies, as starters should be used instead. Following the typical Maven <group>:<artifactId>:<version> convention, they can be referenced for example as:

org.springframework.cloud.stream.app:cassandra-sink-rabbit:1.0.0.BUILD-SNAPSHOT

Just as with the starters, you can infer the coordinates for other prebuilt applications found in the guide. The group will be always org.springframework.cloud.stream.app. The version may vary. The artifact id follows the format <functionality>-<type>-<binder> previously described.

The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/. Naming and versioning follows the same general conventions as Maven, e.g.

docker pull springcloudstream/cassandra-sink-kafka

will pull the latest Docker image of the Cassandra sink with the Kafka binder.

1.3.2 Building the artifacts

You can also build the project and generate the artifacts (including the prebuilt applications) on your own. This is useful if you want to deploy the artifacts locally, for example for adding a new starter, or if you want to build the entire set of artifacts with a new binder.

First, you need to generate the prebuilt applications. This is done by running the application generation Maven plugin. You can do so by simply invoking the corresponding script in the root of the project.

./generate.sh

For the each of the prebuilt applications, the script will generate the following items:

  • pom.xml file with the required dependencies (starter and binder)
  • a class that contains the main method of the application and imports the predefined configuration
  • generated integration test code that exercises the component against the configured binder.

For example, spring-cloud-starter-stream-sink-cassandra will generate cassandra-sink-rabbit and cassandra-sink-kafka as completely functional applications.

1.4 Creating custom artifacts

Apart from accessing the sources, sinks and processors already provided by the project, in this section we will describe how to:

  • Use a different binder than Kafka or Rabbit
  • Create your own applications
  • Customize dependencies such as Hadoop distributions or JDBC drivers

1.4.1 Using a different binder

If you want to use one of the applications found in Spring Cloud Stream Application Starters and you want to use one of the predefined binders (i.e. Kafka or Rabbit), you can just use the prebuilt versions of the artifacts. But if you want to connect to a different middleware system, and you have a binder for it, you will create new artifacts.

<dependencies>
  <!- other dependencies -->
  <dependency>
    <groupId>org.springframework.cloud.stream.app</groupId>
    <artifactId>spring-cloud-starter-stream-sink-cassandra</artifactId>
    <version>1.0.0.BUILD-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-gemfire</artifactId>
    <version>1.0.0.BUILD-SNAPSHOT</version>
  </dependency>
</dependencies>

The next step is to create the project’s main class and import the configuration provided by the starter. For example, in the same case of the Cassandra sink it can look like the following:

package org.springframework.cloud.stream.app.cassandra.sink.rabbit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.app.cassandra.sink.CassandraSinkConfiguration;
import org.springframework.context.annotation.Import;


@SpringBootApplication
@Import(CassandraSinkConfiguration.class)
public class CassandraSinkGemfireApplication {

	public static void main(String[] args) {
		SpringApplication.run(CassandraSinkGemfireApplication.class, args);
	}
}

1.4.2 Creating your own applications

Spring Cloud Stream Application Starters consists of regular Spring Cloud Stream applications with some additional conventions that facilitate generating prebuilt applications with the preconfigured binders. Sometimes, your solution may require additional applications that are not in the scope of Spring Cloud Stream Application Starters, or require additional tweaks and enhancements. In this section we will show you how to create custom applications that can be part of your solution, along with Spring Cloud Stream application starters. You have the following options:

  • create new Spring Cloud Stream applications;
  • use the starters to create customized versions;

Using generic Spring Cloud Stream applications

If you want to add your own custom applications to your solution, you can simply create a new Spring Cloud Stream project with the binder of your choice and run it the same way as the applications provided by Spring Cloud Stream Application Starters, independently or via Spring Cloud Data Flow. The process is described in the Getting Started Guide of Spring Cloud Stream. One restriction is that the applications must have:

  • a single inbound channel named input for sources - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Source;
  • a single outbound channel named output for sinks - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Sink;
  • both an inbound channel named input and an outbound channel named output for processors - the simplest way to do so is by using the predefined interface org.spring.cloud.stream.messaging.Processor.

The other restriction is to use the same kind of binder as the rest of your solution.

Using the starters to create custom components

You can also reuse the starters provided by Spring Cloud Stream Application Starters to create custom components, enriching the behavior of the application. For example, you can add a Spring Security layer to your HTTP source, add additional configurations to the ObjectMapper used for JSON transformation wherever that happens, or change the JDBC driver or Hadoop distribution that the application is using. For doing so should set up your project following a process similar to customizing a binder. In fact, customizing the binder is the simplest form of creating a custom component.

As a reminder, this involves:

  • adding the starter to your project
  • choosing the binder
  • adding the main class and importing the starter configuration.

After doing so, you can simply add the additional configuration for the extra features of your application.

Part II. Starters

2. Sources

2.1 File Source

This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.1.1 Options

The file source has the following options:

dir
the absolute path to the directory to monitor for files (String, default: ``)
fixedDelay
the fixed delay polling interval specified in seconds (int, default: 5)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
pattern
a filter expression (Ant style) to accept only files that match the pattern (String, default: * )
preventDuplicates
whether to prevent the same file from being processed twice (boolean, default: true)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

The ref option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.

2.2 FTP Source

This source application supports transfer of files using the FTP protocol. Files are transferred from the remote directory to the local directory where the app is deployed. Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.2.1 Options

The ftp source has the following options:

autoCreateLocalDir
local directory must be auto created if it does not exist (boolean, default: true)
cacheSessions
true to cache ftp sessions (boolean, default: false)
clientMode
client mode to use : 2 for passive mode and 0 for active mode (int, default: 0)
deleteRemoteFiles
delete remote files after transfer (boolean, default: false)
filenamePattern
simple filename pattern to apply to the filter (String, default: *)
fixedDelay
the rate at which to poll the remote directory (int, default: 1)
host
the host name for the FTP server (String, default: localhost)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
localDir
set the local directory the remote files are transferred to (String, default: ``)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
password
the password for the FTP connection (Password, no default)
port
the port for the FTP server (int, default: 21)
preserveTimestamp
whether to preserve the timestamp of files retrieved (boolean, default: true)
remoteDir
the remote directory to transfer the files from (String, default: /)
remoteFileSeparator
file separator to use on the remote side (String, default: /)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
tmpFileSuffix
extension to use when downloading files (String, default: .tmp)
username
the username for the FTP connection (String, no default)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

2.3 Http Source

A source module that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

2.3.1 Options

The http source supports the following configuration properties:

pathPattern
An Ant-Style pattern to determine which http requests will be captured (String, default: /)

2.4 JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

2.4.1 Options

The jdbc source has the following options:

query
the query to use to select data (String, no default, required)
update
an SQL update statement to execute for marking polled messages as 'seen' (String, no default)
split
whether to split the SQL result as individual messages (boolean, default: true)
maxRowsPerPoll
max numbers of rows to process for each poll (int, default: 0)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

2.5 JMS Source

The "jms" source enables receiving messages from JMS.

2.5.1 Options

The jms source has the following options:

spring.jms.listener.acknowledgeMode
the session acknowledge mode (String, default: AUTO)
clientId
an identifier for the client, to be associated with a durable or shared topic subscription (String, no default)
destination
the destination name from which messages will be received (String, no default)
messageSelector
a message selector to be applied to messages (String, no default)
subscriptionDurable
when true, indicates the subscription to a topic is durable (boolean, default: false)
subscriptionShared
when true, indicates the subscription to a topic is shared (JMS 2.0) (boolean, default: false)
spring.jms.pubSubDomain
when true, indicates that the destination is a topic (boolean, default: false)
subscriptionName
a name that will be assigned to the topic subscription (String, no default)
sessionTransacted
True to enable transactions and use a `DefaultMessageListenerContainer`, false to select a `SimpleMessageListenerContainer` (String, default: true)
spring.jms.listener.concurrency
The minimum number of consumer threads. *(Integer, default: 1)
spring.jms.listener.maxConcurrency
The maximum number of consumer threads. Only supported when `sessionTransacted ` is true *(Integer, default: 1)
[Note]Note

Spring boot broker configuration is used; refer to the Spring Boot Documentation for more information. The spring.jms.* properties above are also handled by the boot JMS support.

2.6 Load Generator Source

A source that sends generated data and dispatches it to the stream. This is to provide a method for users to identify the performance of Spring Cloud Data Flow in different environments and deployment types.

2.6.1 Options

The load-generator source has the following options:

messageCount
the number of messages to send (Integer, default: 100)
messageSize
the size of message to send (Integer, 1000)
producers
the number of producers (Integer, 1)
outputType
how this module should emit messages it produces (MimeType, default: no default)

2.7 RabbitMQ Source

The "rabbit" source enables receiving messages from RabbitMQ.

The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.

2.7.1 Options

The rabbit source has the following options:

enableRetry
enable retry; when retries are exhausted the message will be rejected; message disposition will depend on dead letter configuration (boolean, default: false)
initialRetryInterval
initial interval between retries (int, default: 1000)
mappedRequestHeaders
request message header names to be mapped from the incoming message (String, default: STANDARD_REQUEST_HEADERS)
maxAttempts
maximum delivery attempts (int, default: 3)
maxConcurrency
the maximum number of consumers (int, default: 1)
maxRetryInterval
maximum retry interval (int, default: 30000)
queues
the queue(s) from which messages will be received (String, default: no default)
requeue
whether rejected messages will be requeued by default (boolean, default: true)
retryMultiplier
retry interval multiplier (double, default: 2.0)
transacted
true if the channel is to be transacted (boolean, default: false)

Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.

A Note About Retry

[Note]Note

With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless the downstream Binder is not connected for some reason. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

2.8 SFTP Source

This source app supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference
  • lines Will split files line-by-line and emit a new message for each line
  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

2.8.1 Options

The sftp source has the following options:

allowUnknownKeys
true to allow connecting to a host with an unknown or changed key (boolean, default: false)
autoCreateLocalDir
if local directory must be auto created if it does not exist (boolean, default: true)
deleteRemoteFiles
delete remote files after transfer (boolean, default: false)
fixedDelay
fixed delay in SECONDS to poll the remote directory (int, default: 1)
host
the remote host to connect to (String, default: localhost)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
knownHostsExpression
a SpEL expression location of known hosts file; required if 'allowUnknownKeys' is false; examples: systemProperties["user.home"]+"/.ssh/known_hosts", "/foo/bar/known_hosts" (String, no default)
localDir
set the local directory the remote files are transferred to (String, default: ``)
maxMessages
the maximum messages per poll; -1 for unlimited (long, default: -1)
mode
specifies how the file is being read. By default the content of a file is provided as byte array (FileReadingMode, default: contents, possible values: ref,lines,contents)
passPhrase
the passphrase to use (String, default: ``)
password
the password for the provided user (String, default: ``)
pattern
simple filename pattern to apply to the filter (String, no default)
port
the remote port to connect to (int, default: 22)
privateKey
the private key location (a valid Spring Resource URL) (String, default: ``)
regexPattern
filename regex pattern to apply to the filter (String, no default)
remoteDir
the remote directory to transfer the files from (String, no default)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)
tmpFileSuffix
extension to use when downloading files (String, default: .tmp)
user
the username to use (String, no default)
withMarkers
if true emits start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines' (Boolean, no default)

2.9 SYSLOG Source

The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.

2.9.1 Options

The syslog source has the following options:

protocol
`udp`, `tcp`, or `both` (String, default tcp)
rfc
`3164` or `5424` (String, default 3164)
port
the port on which to listen (String, default 1514)
bufferSize
the maximum size allowed (TCP) (int, default 2048)
nio
`true` to use NIO - only recommended when supporting many connections (Boolean, default false)
reverseLookup
`true` to perform a reverse lookup on the remote IP address (Boolean, default false)
socketTimeout
the socket timeout (long, default none)

2.10 TCP

The tcp source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

Messages produced by the TCP source application have a byte[] payload.

2.10.1 Options

bufferSize
the size of the buffer (bytes) to use when decoding (int, default: 2048)
decoder
the decoder to use when receiving messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
nio
whether or not to use NIO (boolean, default: false)
port
the port on which to listen (int, default: 1234)
reverseLookup
perform a reverse DNS lookup on the remote IP Address (boolean, default: false)
socketTimeout
the timeout (ms) before closing the socket when no data is received (int, default: 120000)
useDirectBuffers
whether or not to use direct buffers (boolean, default: false)

2.10.2 Available Decoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

2.11 Time Source

The time source will simply emit a String with the current time every so often.

2.11.1 Options

The time source has the following options:

fixedDelay
time delay between messages, expressed in TimeUnits (seconds by default) (int, default: 1)
dateFormat
how to render the current time, using SimpleDateFormat (String, default: yyyy-MM-dd HH:mm:ss)
initialDelay
an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (int, default: 0)
timeUnit
the time unit for the fixed and initial delays (String, default: SECONDS)

2.12 Trigger Source

This app sends trigger based on a fixed delay, date or cron expression. A payload which is evaluated using SpEL can also be sent each time the trigger fires.

2.12.1 Options

The trigger source has the following options:

payload
a SpEL expression. (int, default: ``)
fixedDelay
fixed delay for the Periodic trigger (String, default: yyyy-MM-dd HH:mm:ss)
initialDelay
initial delay for the Periodic trigger (int, default: 0)
timeUnit
time unit to apply for delay (String, default: SECONDS)
date
the date value for the trigger (String)
dateFormat
format for the date value (String)
cron
the Cron expression for the Cron Trigger (String, default: SECONDS)

2.13 Twitter Stream Source

This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).

You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these as the following environment variables: CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN and ACCESS_TOKEN_SECRET.

2.13.1 Options

The twitterstream source has the following options:

accessToken
a valid OAuth access token (String, no default)
accessTokenSecret
an OAuth secret corresponding to the access token (String, no default)
consumerKey
a consumer key issued by twitter (String, no default)
consumerSecret
consumer secret corresponding to the consumer key (String, no default)
language
language code e.g. 'en' (String, default: ``)
[Note]Note

twitterstream emit JSON in the native Twitter format.

3. Processors

3.1 Bridge Processor

A Processor module that returns messages that is passed by connecting just the input and output channels.

3.2 Filter Processor

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

3.2.1 Options

The filter processor has the following options:

expression
a SpEL expression used to transform messages (String, default: payload.toString())

3.3 Groovy Filter Processor

A Processor module that retains or discards messages according to a predicate, expressed as a Groovy script.

3.3.1 Options

The groovy-filter processor has the following options:

script
The script resource location (String, default: ``)
variables
Variable bindings as a comma delimited string of name-value pairs, e.g. 'foo=bar,baz=car' (String, default: ``)
variablesLocation
The location of a properties file containing custom script variable bindings (String, default: ``)

3.4 Groovy Transform Processor

A Processor module that transforms messages using a Groovy script.

3.4.1 Options

The groovy-transform processor has the following options:

script
The script resource location (String, default: ``)
variables
Variable bindings as a comma delimited string of name-value pairs, e.g. 'foo=bar,baz=car' (String, default: ``)
variablesLocation
The location of a properties file containing custom script variable bindings (String, default: ``)

3.5 Http Client Processor

A processor app that makes requests to an HTTP resource and emits the response body as a message payload. This processor can be combined, e.g., with a time source app to periodically poll results from a HTTP resource.

3.5.1 Options

The httpclient processor has the following options:

url
The URL to issue an http request to, as a static value.
urlExpression
A SpEL expression against incoming message to determine the URL to use.
httpMethod
The kind of http method to use.
body
The (static) body of the request to use.
bodyExpression
A SpEL expression against incoming message to derive the request body to use.
headersExpression
A SpEL expression used to derive the http headers map to use.
expectedResponseType
The type used to interpret the response.
replyExpression
A SpEL expression used to compute the final result, applied against the whole http response.

3.6 PMML Processor

A processor that evaluates a machine learning model stored in PMML format.

3.6.1 Options

The pmml processor has the following options:

modelLocation
The location of the PMML model file.
modelName
If the model file contains multiple models, the name of the one to use.
modelNameExpression
If the model file contains multiple models, the name of the one to use, as a SpEL expression.
inputs
How to compute model active fields from input message properties as modelField→SpEL.
outputs
How to emit evaluation results in the output message as msgProperty→SpEL.

3.7 Scripable Transform Processor

A Spring Cloud Stream module that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).

3.7.1 Options

The scriptable-transform processor has the following options:

script
The script text (String, default: ``)
lang
The script language (String, default: ``)
variables
Variable bindings as a comma delimited string of name-value pairs, e.g. 'foo=bar,baz=car' (String, default: ``)
variablesLocation
The location of a properties file containing custom script variable bindings (String, default: ``)

3.8 Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

3.8.1 Options

expression
a SpEL expression which would typically evaluate to an array or collection (String, default: null)
delimiters
A list of delimiters to tokenize a String payload ('expression' must be null) (String, default: null)
fileMarkers
Split File payloads, when true, START and END marker messages will be emitted, when false no markers are emitted (String, default: null)
charset
Split File payloads using this charset to convert bytes to String (String, default: null)
applySequence
Add correlation and sequence information to the message headers (String, default: true)

When no expression, fileMarkers, or charset is provided, a DefaultMessageSplitter is configured with (optional) delimiters. When fileMarkers or charset is provided, a FileSplitter is configured (you must provide either a fileMarkers or charset to split files, which must be text-based - they are split into lines). Otherwise, an ExpressionEvaluatingMessageSplitter is configured.

When splitting File payloads, the sequenceSize header is zero because the size cannot be determined at the beginning.

[Caution]Caution

Ambiguous properties are not allowed.

3.8.2 JSON Example

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload, '<json path expression>').

For example, consider the following JSON:

{ "store": {
    "book": [
        {
            "category": "reference",
            "author": "Nigel Rees",
            "title": "Sayings of the Century",
            "price": 8.95
        },
        {
            "category": "fiction",
            "author": "Evelyn Waugh",
            "title": "Sword of Honour",
            "price": 12.99
        },
        {
            "category": "fiction",
            "author": "Herman Melville",
            "title": "Moby Dick",
            "isbn": "0-553-21311-3",
            "price": 8.99
        },
        {
            "category": "fiction",
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings",
            "isbn": "0-395-19395-8",
            "price": 22.99
        }
    ],
    "bicycle": {
        "color": "red",
        "price": 19.95
    }
}}

and an expression #jsonPath(payload, '$.store.book'); the result will be 4 messages, each with a Map payload containing the properties of a single book.

3.9 Transform Processor

Use the transform app in a stream to convert a Message’s content or structure.

The transform processor is used by passing a SpEL expression. The expression should return the modified message or payload. For example, --expression=payload.toUpperCase().

This transform will convert all message payloads to upper case.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

3.9.1 Options

The transform processor has the following options:

expression
a SpEL expression used to transform messages (String, default: payload.toString())

4. Sinks

4.1 Cassandra Sink

This module writes the content of each message it receives to Cassandra.

4.1.1 Options

The cassandra sink has the following options:

compressionType
the compression to use for the transport (CompressionType, default: NONE, possible values: NONE,SNAPPY)
consistencyLevel
the consistencyLevel option of WriteOptions (ConsistencyLevel, no default, possible values: ANY,ONE,TWO,THREE,QUOROM,LOCAL_QUOROM,EACH_QUOROM,ALL,LOCAL_ONE,SERIAL,LOCAL_SERIAL)
spring.cassandra.contactPoints
the comma-delimited string of the hosts to connect to Cassandra (String, default: localhost)
entityBasePackages
the base packages to scan for entities annotated with Table annotations (String[], default: [])
ingestQuery
the ingest Cassandra query (String, no default)
spring.cassandra.initScript
the path to file with CQL scripts (delimited by ';') to initialize keyspace schema (String, no default)
spring.cassandra.keyspace
the keyspace name to connect to (String, default: <stream name>)
metricsEnabled
enable/disable metrics collection for the created cluster (boolean, default: true)
spring.cassandra.password
the password for connection (String, no default)
spring.cassandra.port
the port to use to connect to the Cassandra host (int, default: 9042)
queryType
the queryType for Cassandra Sink (Type, default: INSERT, possible values: INSERT,UPDATE,DELETE,STATEMENT)
retryPolicy
the retryPolicy option of WriteOptions (RetryPolicy, no default, possible values: DEFAULT,DOWNGRADING_CONSISTENCY,FALLTHROUGH,LOGGING)
statementExpression
the expression in Cassandra query DSL style (String, no default)
spring.cassandra.schemaAction
schema action to perform (SchemaAction, default: NONE, possible values: CREATE,NONE,RECREATE,RECREATE_DROP_UNUSED)
ttl
the time-to-live option of WriteOptions (int, default: 0)
spring.cassandra.username
the username for connection (String, no default)

4.2 Counter Sink

The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.

4.2.1 Options

The counter sink has the following options:

name
The name of the counter to increment. (String, default: counts)
nameExpression
A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (String, default: ``)
store
The name of a store used to store the counter. (String, default: redis, possible values: memory, redis)

4.3 Field Value Counter Sink

A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload. This sinks supports the following payload types out of the box:

  • POJO (Java bean)
  • Tuple
  • JSON String

For example suppose a message source produces a payload with a field named user :

class Foo {
   String user;
   public Foo(String user) {
       this.user = user;
   }
}

If the stream source produces messages with the following objects:

   new Foo("fred")
   new Foo("sue")
   new Foo("dave")
   new Foo("sue")

The field value counter on the field user will contain:

fred:1, sue:2, dave:1

Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:

users:["dave","fred","sue"]
users:["sue","jon"]

The field value counter on the field users will contain:

dave:1, fred:1, sue:2, jon:1

4.3.1 Options

The field-value-counter sink has the following options:

fieldName
the name of the field for which values are counted (String, no default)
name
the name of the metric to contribute to (will be created if necessary) (String, default: <stream name>)
nameExpression
a SpEL expression to compute the name of the metric to contribute to (String, no default)
store
The name of a store used to store the counter. (String, default: redis, possible values: memory, redis)

4.4 File Sink

This module writes each message it receives to a file.

4.4.1 Options

The file sink has the following options:

binary
if false, will append a newline character at the end of each line (boolean, default: false)
charset
the charset to use when writing a String payload (String, default: UTF-8)
dir
the directory in which files will be created (String, default: /tmp/xd/output/)
dirExpression
spring expression used to define directory name (String, no default)
mode
what to do if the file already exists (Mode, default: APPEND, possible values: APPEND,REPLACE,FAIL,IGNORE)
name
filename pattern to use (String, default: <stream name>)
nameExpression
spring expression used to define filename (String, no default)
suffix
filename extension to use (String, no default)

4.5 FTP Sink

FTP sink is a simple option to push files to an FTP server from incoming messages.

It uses an ftp-outbound-adapter, therefore incoming messages could be either a java.io.File object, a String (content of the file) or an array of bytes (file content as well).

To use this sink, you need a username and a password to login.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

4.5.1 Options

The ftp sink has the following options:

autoCreateDir
remote directory must be auto created if it does not exist (boolean, default: true)
clientMode
client mode to use: 2 for passive mode and 0 for active mode (int, default: 0)
host
the host name for the FTP server (String, default: localhost)
mode
what to do if the file already exists (Mode, default: REPLACE, possible values: APPEND,REPLACE,FAIL,IGNORE)
password
the password for the FTP connection (Password, no default)
port
the port for the FTP server (int, default: 21)
remoteDir
the remote directory to transfer the files to (String, default: /)
remoteFileSeparator
file separator to use on the remote side (String, default: /)
temporaryRemoteDir
temporary remote directory that should be used (String, default: /)
tmpFileSuffix
extension to use on server side when uploading files (String, default: .tmp)
useTemporaryFilename
use a temporary filename while transferring the file and rename it to its final name once it's fully transferred (boolean, default: true)
username
the username for the FTP connection (String, no default)

4.6 Gemfire Sink

The Gemfire sink allows one to write message payloads to a Gemfire server.

4.6.1 Options

The gemfire sink has the following options:

hostAddresses
a comma separated list of [host]:[port] specifying either locator or server addresses for the client connection pool (String, localhost:10334)
keyExpression
a SpEL expression which is evaluated to create a cache key (String, default: the value is currently the message payload')
port
port of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)
regionName
name of the region to use when storing data (String, default: ${spring.application.name})
connectType
'server' or 'locator' (String, default: locator)

4.7 Gpfdist Sink

A sink module that route messages into GPDB/HAWQ segments via gpfdist protocol. Internally, this sink creates a custom http listener that supports the gpfdist protcol and schedules a task that orchestrates a gploadd session in the same way it is done natively in Greenplum.

No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.

4.7.1 Options

The gpfdist sink has the following options:

batchCount
Number of windowed batch each segment takest (int, default: 100)
batchPeriod
Time in seconds for each load operation to sleep in between operations (int, default: 10)
batchTimeout
Timeout in seconds for segment inactivity. (Integer, default: 4)
columnDelimiter
Data record column delimiter. *(Character, default: no default)
controlFile
path to yaml control file (String, no default)
dbHost
database host (String, default: localhost)
dbName
database name (String, default: gpadmin)
dbPassword
database password (String, default: gpadmin)
dbPort
database port (int, default: 5432)
dbUser
database user (String, default: gpadmin)
delimiter
data line delimiter (String, default: newline character)
errorTable
Tablename to log errors. (String, default: ``)
flushCount
flush item count (int, default: 100)
flushTime
flush item time (int, default: 2) gpfdistPort::Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0)
matchColumns
match columns with update (String, no default)
mode
mode, either insert or update (String, no default)
nullString
Null string definition. (String, default: ``)
port
gpfdist listen port (int, default: 0)
rateInterval
enable transfer rate interval (int, default: 0)
segmentRejectLimit
Error reject limit. (String, default: ``)
segmentRejectType
Error reject type, either `rows` or `percent`. (String, default: ``)
sqlAfter
sql to run after load (String, no default)
sqlBefore
sql to run before load (String, no default)
table
target database table (String, no default)
updateColumns
update columns with update (String, no default)

4.7.2 Implementation Notes

Within a gpfdist sink we have a Reactor based stream where data is published from the incoming SI channel. This channel receives data from the Message Bus. The Reactor stream is then connected to Netty based http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among existing http clients. When Greenplum does a load from an external table, each segment will initiate a http connection and start loading data. The net effect is that incoming data is automatically spread among the Greenplum segments.

4.7.3 Detailed Option Descriptions

The gpfdist sink supports the following configuration properties:

table

Database table to work with. (String, default: ``, required)

This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.

Currently table is only way to define a structure of an external table. Effectively it will replace other_table in below clause segment.

CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
mode

Gpfdist mode, either `insert` or `update`. (String, default: insert)

Currently only insert and update gpfdist mode is supported. Mode merge familiar from a native gpfdist loader is not yet supported.

For mode update options matchColumns and updateColumns are required.

columnDelimiter

Data record column delimiter. (Character, default: ``)

Defines used delimiter character in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[DELIMITER AS 'delimiter']
segmentRejectLimit

Error reject limit. (String, default: ``)

Defines a count value in a below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]

As a conveniance this reject limit also recognizes a percentage format 2% and if used, segmentRejectType is automatically set to percent.

segmentRejectType

Error reject type, either `rows` or `percent`. (String, default: ``)

Defines ROWS or PERCENT in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
errorTable

Tablename to log errors. (String, default: ``)

As error table is optional with SEGMENT REJECT LIMIT, it’s only used if both segmentRejectLimit and segmentRejectType are set. Sets error_table in below clause segment.

[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count
  [ROWS | PERCENT] ]
nullString

Null string definition. (String, default: ``)

Defines used null string in below clause segment which would be part of a FORMAT 'TEXT' or FORMAT 'CSV' sections.

[NULL AS 'null string']
delimiter

Data record delimiter for incoming messages. (String, default: \n)

On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.

 

If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered.

 
 -- External Table Docs
matchColumns

Comma delimited list of columns to match. (String, default: ``)

[Note]Note

See more from examples below.

updateColumns

Comma delimited list of columns to update. (String, default: ``)

[Note]Note

See more from examples below.

sqlBefore
Sql clause to run before each load operation. (String, default: ``)
sqlAfter
Sql clause to run after each load operation. (String, default: ``)
rateInterval

Debug rate of data transfer. (Integer, default: 0)

If set to non zero, sink will log a rate of messages passing throught a sink after number of messages denoted by this setting has been processed. Value 0 means that this rate calculation and logging is disabled.

flushCount

Max collected size per windowed data. (Integer, default: 100)

[Note]Note

For more info on flush and batch settings, see above.

4.7.4 How Data Is Sent Into Segments

There are few important concepts involving how data passes into a sink, through it and finally lands into a database.

  • Sink has its normal message handler for incoming data from a source module, gpfdist protocol listener based on netty where segments connect to and in between those two a reactor based streams controlling load balancing into different segment connections.
  • Incoming data is first sent into a reactor which first constructs a windows. This window is then released into a downstream when it gets full(flushTime) or timeouts(flushTime) if window doesn’t get full. One window is then ready to get send into a segment.
  • Segments which connects to this stream are now able to see a stream of window data, not stream of individual messages. We can also call this as a stream of batches.
  • When segment makes a connection to a protocol listener it subscribes itself into this stream and takes count of batches denoted by batchCount and completes a stream if it got enough batches or if batchTimeout occurred due to inactivity.
  • It doesn’t matter how many simultaneous connections there are from a database cluster at any given time as reactor will load balance batches with all subscribers.
  • Database cluster will initiate this loading session when select is done from an external table which will point to this sink. These loading operations are run in a background in a loop one after another. Option batchPeriod is then used as a sleep time in between these load sessions.

Lets take a closer look how options flushCount, flushTime, batchCount, batchTimeout and batchPeriod work.

As in a highest level where incoming data into a sink is windowed, flushCount and flushTime controls when a batch of messages are sent into a downstream. If there are a lot of simultaneous segment connections, flushing less will keep more segments inactive as there is more demand for batches than what flushing will produce.

When existing segment connection is active and it has subscribed itself with a stream of batches, data will keep flowing until either batchCount is met or batchTimeout occurs due to inactivity of data from an upstream. Higher a batchCount is more data each segment will read. Higher a batchTimeout is more time segment will wait in case there is more data to come.

As gpfdist load operations are done in a loop, batchPeriod simply controls not to run things in a buzy loop. Buzy loop would be ok if there is a constant stream of data coming in but if incoming data is more like bursts then buzy loop would be unnecessary.

[Note]Note

Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully.

Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.

4.7.5 Example Usage

In this first example we’re just creating a simple stream which inserts data from a time source. Let’s create a table with two text columns.

gpadmin=# create table ticktock (date text, time text);

Create a simple stream gpstream.

dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy

Let it run and see results from a database.

gpadmin=# select count(*) from ticktock;
 count
-------
    14
(1 row)

In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.

gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');

Now table looks like this.

gpadmin=# select * from httpdata;
 col1  | col2 | col3
-------+------+------
 DATA3 | DATA | DATA
 DATA2 | DATA | DATA
 DATA1 | DATA | DATA
(3 rows)

Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.

dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy

Post some data into a stream which will be passed into a gpfdist sink via http source.

curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/

If you query table again, you’ll see that row for DATA1 has been updated.

gpadmin=# select * from httpdata;
 col1  | col2  | col3
-------+-------+-------
 DATA3 | DATA  | DATA
 DATA2 | DATA  | DATA
 DATA1 | DATA1 | DATA1
(3 rows)

4.7.6 Tuning Transfer Rate

Default values for options flushCount, flushTime, batchCount, batchTimeout and batchPeriod are relatively conservative and needs to be tuned for every use case for optimal performance. Order to make a decision on how to tune sink behaviour to suit your needs few things needs to be considered.

  • What is an average size of messages ingested by a sink.
  • How fast you want data to become visible in a database.
  • Is incoming data a constant flow or a bursts of data.

Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.

Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.

Longer a load session for each segment is active higher the overall transfer rate is going to be. Option batchCount naturally controls this. However option batchTimeout then really controls how fast each segment will complete a stream due to inactivity from upstream and to step away from a loading session to allow distributes session to finish and data become visible in a database.

4.8 HDFS Sink

This module writes each message it receives to HDFS.

4.8.1 Options

The hdfs sink has the following options:

closeTimeout
timeout in ms, regardless of activity, after which file will be automatically closed (long, default: 0)
codec
compression codec alias name (gzip, snappy, bzip2, lzo, or slzo) (String, default: ``)
directory
where to output the files in the Hadoop FileSystem (String, default: ``)
enableSync
whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput (boolean, default: false)
fileExtension
the base filename extension to use for the created files (String, default: txt)
fileName
the base filename to use for the created files (String, default: <stream name>)
fileOpenAttempts
maximum number of file open attempts to find a path (int, default: 10)
fileUuid
whether file name should contain uuid (boolean, default: false)
flushTimeout
timeout in ms, regardless of activity, after which data written to file will be flushed (long, default: 0)
fsUri
the URI to use to access the Hadoop FileSystem (String, default: ${spring.hadoop.fsUri})
idleTimeout
inactivity timeout in ms after which file will be automatically closed (long, default: 0)
inUsePrefix
prefix for files currently being written (String, default: ``)
inUseSuffix
suffix for files currently being written (String, default: .tmp)
overwrite
whether writer is allowed to overwrite files in Hadoop FileSystem (boolean, default: false)
partitionPath
a SpEL expression defining the partition path (String, default: ``)
rollover
threshold in bytes when file will be automatically rolled over (String, default: 1G)
[Note]Note

This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one.

4.9 Jdbc Sink

A module that writes its incoming payload to an RDBMS using JDBC.

4.9.1 Options

The jdbc sink has the following options:

expression
a SpEL expression used to transform messages (String, default: ``)
tableName
String (String, default: <stream name)
columns
the names of the columns that shall receive data, as a set of column[:SpEL] mappings, also used at initialization time to issue the DDL (String, default: payload)
initialize
String (Boolean, default: false)
batchSize
String (long, default: 10000)
[Note]Note

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

4.10 Log Sink

The log sink uses the application logger to output the data for inspection.

4.10.1 Options

The log sink has the following options:

expression
the expression to be evaluated for the log content; use '#root' to log the full message (String, default: payload)
level
the log level (String, default: INFO)
name
the name of the log category to log to (String, default: ``)

4.11 RabbitMQ Sink

This module sends messages to RabbitMQ.

4.11.1 Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

converterBeanName
the bean name of the message converter (String, default: none)
persistentDeliveryMode
the default delivery mode, true for persistent (boolean, default: false)
exchange
the Exchange on the RabbitMQ broker to which messages should be sent (String, default: "")
exchangeExpression
a SpEL expression that evaluates to the Exchange on the RabbitMQ broker to which messages should be sent; overrides `exchange` (String, default: ``)
mappedRequestHeaders
request message header names to be propagated to RabbitMQ, to limit to the set of standard headers plus `bar`, use `STANDARD_REQUEST_HEADERS,bar` (String, default: *)
routingKey
the routing key to be passed with the message, as a SpEL expression (String, default: none)
routingKeyExpression
an expression that evaluates to the routing key to be passed with the message, as a SpEL expression; overrides `routingKey` (String, default: none)
[Note]Note

By default, the message converter is a SimpleMessageConverter which handles byte[], String and java.io.Serializable. A well-known bean name jsonConverter will configure a Jackson2JsonMessageConverter instead. In addition, a custom converter bean can be added to the context and referenced by the converterBeanName property.

4.12 Redis Sink

This module sends messages to Redis store.

4.12.1 Options

The redis sink has the following options:

topicExpression
a SpEL expression to use for topic (String, no default)
queueExpression
a SpEL expression to use for queue (String, no default)
keyExpression
a SpEL expression to use for keyExpression (String, no default)
key
name for the key (String, no default)
queue
name for the queue (String, no default)
topic
name for the topic (String, no default)

4.13 Router Sink

This module routes messages to named channels.

4.13.1 Options

The router sink has the following options:

destinations
comma-delimited destinations mapped from evaluation results (String, no default)
defaultOutputChannel
Where to route messages where the channel cannot be resolved (String, default: nullChannel)
expression
a SpEL expression used to determine the destination (String, default: headers['routeTo'])
propertiesLocation
the path of a properties file containing custom script variable bindings (String, no default)
refreshDelay
how often to check (in milliseconds) whether the script (if present) has changed; -1 for never (long, default: 60000)
script
reference to a script used to process messages (String, no default)
destinationMappings
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (String, no default)
[Note]Note

Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel and resolutionRequired will only be used if the Binder has some problem binding to the destination.

You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations property. By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound. Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel, which must also appear in the list.

destinationMappings are used to map the evaluation results to an actual destination name.

4.13.2 SpEL-based Routing

The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.

For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.

4.13.3 Groovy-based Routing

Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

For more information, see the Spring Integration Reference manual Groovy Support.

4.14 TCP Sink

This module writes messages to TCP using an Encoder.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.

4.14.1 Options

The tcp sink has the following options:

charset
the charset used when converting from String to bytes (String, default: UTF-8)
close
whether to close the socket after each message (boolean, default: false)
encoder
the encoder to use when sending messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
host
the remote host to connect to (String, default: localhost)
nio
whether or not to use NIO (boolean, default: false)
port
the port on the remote host to connect to (int, default: 1234)
reverseLookup
perform a reverse DNS lookup on the remote IP Address (boolean, default: false)
socketTimeout
the timeout (ms) before closing the socket when no data is received (int, default: 120000)
useDirectBuffers
whether or not to use direct buffers (boolean, default: false)

4.14.2 Available Encoders

Text Data

CRLF (default)
text terminated by carriage return (0x0d) followed by line feed (0x0a)
LF
text terminated by line feed (0x0a)
NULL
text terminated by a null byte (0x00)
STXETX
text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data

RAW
no structure - the client indicates a complete message by closing the socket
L1
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
L2
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
L4
data preceded by a four byte (signed) length field (up to 231-1 bytes)

4.15 Throughput Sink

A simple handler that will count messages and log witnessed throughput at a selected interval.

4.16 Websocket Sink

A simple Websocket Sink implementation.

4.16.1 Options

The following commmand line arguments are supported:

websocketPort
controls the port onto which the Netty server binds (String, default: 9292)
websocketPath
controls the path where the WebsocketServer expects Websocket connections (String, default: /websocket)
ssl
controls whether the Websocket server should accept SSL connections (i.e. `wss://host:9292/websocket`) (Boolean, default: false)
websocketLoglevel
controls the loglevel of the underlying Netty loghandler (String, default: WARN)
threads
controls the number of worker threads used for the Netty event loop (String, default: 1)

4.16.2 Example

To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.

Step 1: Start Redis

The default broker that is used is Redis. Normally can start Redis via redis-server.

Step 2: Deploy a time-source

Step 3: Deploy a websocket-sink (the app that contains this starter jar)

Finally start a websocket-sink in trace mode so that you see the messages produced by the time-source in the log:

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.stream.module.websocket=TRACE

You should start seeing log messages in the console where you started the WebsocketSink like this:

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

4.16.3 Actuators

There is an Endpoint that you can use to access the last n messages sent and received. You have to enable it by providing --endpoints.websocketsinktrace.enabled=true. By default it shows the last 100 messages via the host:port/websocketsinktrace. Here is a sample output:

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

There is also a simple HTML page where you see forwarded messages in a text area. You can access it directly via host:port in your browser

[Note]Note

For SSL mode (--ssl=true) a self signed certificate is used that might cause troubles with some Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not self-signed) certificate.

Part III. Appendices

Appendix A. Building

A.1 Basic Compile and Test

To build the source you will need to install JDK 1.7.

The build uses the Maven wrapper so you don’t have to install a specific version of Maven. To enable the tests for Redis you should run the server before bulding. See below for more information on how run Redis.

The main build command is

$ ./mvnw clean install

You can also add '-DskipTests' if you like, to avoid running the tests.

[Note]Note

You can also install Maven (>=3.3.3) yourself and run the mvn command in place of ./mvnw in the examples below. If you do that you also might need to add -P spring if your local Maven settings do not contain repository declarations for spring pre-release artifacts.

[Note]Note

Be aware that you might need to increase the amount of memory available to Maven by setting a MAVEN_OPTS environment variable with a value like -Xmx512m -XX:MaxPermSize=128m. We try to cover this in the .mvn configuration, so if you find you have to do it to make a build succeed, please raise a ticket to get the settings added to source control.

The projects that require middleware generally include a docker-compose.yml, so consider using Docker Compose to run the middeware servers in Docker containers. See the README in the scripts demo repository for specific instructions about the common cases of mongo, rabbit and redis.

A.2 Documentation

There is a "full" profile that will generate documentation. You can build just the documentation by executing

$ ./mvnw package -DskipTests=true -P full -pl spring-cloud-stream-app-starters-docs -am

A.3 Working with the code

If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.

A.3.1 Importing into eclipse with m2eclipse

We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".

Unfortunately m2e does not yet support Maven 3.3, so once the projects are imported into Eclipse you will also need to tell m2eclipse to use the .settings.xml file for the projects. If you do not do this you may see many different errors related to the POMs in the projects. Open your Eclipse preferences, expand the Maven preferences, and select User Settings. In the User Settings field click Browse and navigate to the Spring Cloud project you imported selecting the .settings.xml file in that project. Click Apply and then OK to save the preference changes.

[Note]Note

Alternatively you can copy the repository settings from .settings.xml into your own ~/.m2/settings.xml.

A.3.2 Importing into eclipse without m2eclipse

If you prefer not to use m2eclipse you can generate eclipse project metadata using the following command:

$ ./mvnw eclipse:eclipse

The generated eclipse projects can be imported by selecting import existing projects from the file menu.

5. Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

5.1 Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

5.2 Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.
  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.
  • Add the ASF license header comment to all new .java files (copy from existing files in the project)
  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).
  • Add some Javadocs and, if you change the namespace, some XSD doc elements.
  • A few unit tests would help a lot as well — someone has to do it.
  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).
  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).