Version Fahrenheit.M2

© 2012-2020 Pivotal Software, Inc.

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

Reference Guide

This section provides you with a detailed overview of the out-of-the-box Spring Cloud Stream Applications. It assumes familiarity with general Spring Cloud Stream concepts, which you can find in the Spring Cloud Stream reference documentation.

These Spring Cloud Stream Applications provide you with out-of-the-box Spring Cloud Stream utility applications that you can run independently or with Spring Cloud Data Flow. They include:

  • Connectors (sources, processors, and sinks) for a variety of middleware technologies, including message brokers, storage (relational, non-relational, filesystem).

  • Adapters for various network protocols.

  • Generic processors that you can customize with Spring Expression Language (SpEL) or by scripting.

You can find a detailed listing of all the applications and their options in the corresponding section of this guide.

1. Pre-built Applications

Out-of-the-box applications are Spring Boot applications that include a Binder implementation on top of the basic logic of the app (a function for example) — a fully functional uber-jar. These uber-jars include the minimal code required for standalone execution. For each function application, the project provides a prebuilt version for Apache Kafka and Rabbit MQ Binders.

Prebuilt applications are generated according to the stream apps generator Maven plugin.

2. Classification

Based on their target application type, they can be either:

  • A source that connects to an external resource to poll and receive data that is published to the default “output” channel;

  • A processor that receives data from an “input” channel and processes it, sending the result on the default “output” channel;

  • A sink that connects to an external resource to send the received data to the default “input” channel.

The prebuilt applications follow a naming convention: <functionality>-<type>-<binder>. For example, rabbit-sink-kafka is a Rabbit sink that uses the Kafka binder that is running with Kafka as the middleware.

2.1. Maven and Docker Access

The core functionality of the applications is available as functions. See the Java Functions repository for more details. Prebuilt applications are available as Maven artifacts. You can download the executable jar artifacts from the Spring Maven repositories. The root directory of the Maven repository that hosts release versions is repo.spring.io/release/org/springframework/cloud/stream/app/. From there, you can navigate to the latest released version of a specific app — for example, log-sink-rabbit-2.0.2.RELEASE.jar. You need to use the Milestone and Snapshot repository locations for Milestone and Snapshot executable jar artifacts.

The Docker versions of the applications are available in Docker Hub, at hub.docker.com/r/springcloudstream/. Naming and versioning follows the same general conventions as Maven — for example:

docker pull springcloudstream/cassandra-sink-kafka

The preceding command pulls the latest Docker image of the Cassandra sink with the Kafka binder.

2.2. Building the Artifacts

You can build the project and generate the artifacts (including the prebuilt applications) on your own. This is useful if you want to deploy the artifacts locally or add additional features. If you are at the root of the repository, steam-applications, doing a maven build generates the entire binder based apps. If you do not want to do that and instead only are interested in a certain application, then cd into the right module and invoke the build from there. Then run the following Maven command:

mvn clean package

This command generates the applications. By default, the generated projects are placed under a directory called apps. There, you can find the binder based applications, which you can then build and run.

3. Patching Pre-built Applications

If you are looking to patch the pre-built applications to accommodate the addition of new dependencies, you can use the following example as the reference. To add mysql driver to jdbc-sink application:

  1. Clone the GitHub repository at github.com/spring-cloud-stream-app-starters/stream-applications

  2. Open it in an IDE and make the necessary changes in the right generator project. The repository is organized as source-apps-generator, sink-apps-generator, and processor-apps-generator.

    Find the module that you want to patch and make the changes. For example, you can add the following to the generator plugin’s configuration in the pom.xml:

    <dependencies>
      <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.37</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      </dependency>
    </dependencies>
  3. Generate the binder based apps as specified above and build the apps.

Starters

4. Sources

4.1. Http Source

A source application that listens for HTTP requests and emits the body as a message payload. If the Content-Type matches text/* or application/json, the payload will be a String, otherwise the payload will be a byte array.

Payload:

If content type matches text/* or application/json

  • String

If content type does not match text/* or application/json

  • byte array

4.1.2. Options

The http source supports the following configuration properties:

http.cors.allow-credentials

Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default: <none>)

http.cors.allowed-headers

List of request headers that can be used during the actual request. (String[], default: <none>)

http.cors.allowed-origins

List of allowed origins, e.g. "http://domain1.com". (String[], default: <none>)

http.mapped-request-headers

Headers that will be mapped. (String[], default: <none>)

http.path-pattern

HTTP endpoint path mapping. (String, default: /)

server.port

Server HTTP port. (Integer, default: 8080)

4.2. JDBC Source

This source polls data from an RDBMS. This source is fully based on the DataSourceAutoConfiguration, so refer to the Spring Boot JDBC Support for more information.

Payload
  • Map<String, Object> when jdbc.split == true (default) and List<Map<String, Object>> otherwise

4.2.2. Options

The jdbc source has the following options:

jdbc.supplier.max-rows

Max numbers of rows to process for query. (Integer, default: 0)

jdbc.supplier.query

The query to use to select data. (String, default: <none>)

jdbc.supplier.split

Whether to split the SQL result as individual messages. (Boolean, default: true)

jdbc.supplier.update

An SQL update statement to execute for marking polled messages as 'seen'. (String, default: <none>)

spring.cloud.stream.poller.cron

Cron expression value for the Cron Trigger. (String, default: <none>)

spring.cloud.stream.poller.fixed-delay

Fixed delay for default poller. (Long, default: 1000)

spring.cloud.stream.poller.initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

spring.cloud.stream.poller.max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

spring.datasource.data

Data (DML) script resource references. (List<String>, default: <none>)

spring.datasource.driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

spring.datasource.initialization-mode

Initialize the datasource with available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)

spring.datasource.password

Login password of the database. (String, default: <none>)

spring.datasource.schema

Schema (DDL) script resource references. (List<String>, default: <none>)

spring.datasource.url

JDBC URL of the database. (String, default: <none>)

spring.datasource.username

Login username of the database. (String, default: <none>)

Also see the Spring Boot Documentation for addition DataSource properties and TriggerProperties and MaxMessagesProperties for polling options.

4.3. MongoDB Source

This source polls data from MongoDB. This source is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

4.3.1. Options

The mongodb source has the following options:

mongodb.supplier.collection

The MongoDB collection to query (String, default: <none>)

mongodb.supplier.query

The MongoDB query (String, default: { })

mongodb.supplier.query-expression

The SpEL expression in MongoDB query DSL style (Expression, default: <none>)

mongodb.supplier.split

Whether to split the query result as individual messages. (Boolean, default: true)

Also see the Spring Boot Documentation for additional MongoProperties properties. See and TriggerProperties for polling options.

4.4. Time Source

The time source will simply emit a String with the current time every so often.

4.4.1. Options

The time source has the following options:

spring.cloud.stream.poller.cron

Cron expression value for the Cron Trigger. (String, default: <none>)

spring.cloud.stream.poller.fixed-delay

Fixed delay for default poller. (Long, default: 1000)

spring.cloud.stream.poller.initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

spring.cloud.stream.poller.max-messages-per-poll

Maximum messages per poll for the default poller. (Long, default: 1)

time.date-format

Format for the date value. (String, default: MM/dd/yy HH:mm:ss)

5. Processors

5.1. Filter Processor

Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued. For example, if the incoming payload is of type String and you want to filter out anything that has less than five characters, you can run the filter processor as below.

java -jar filter-processor-kafka-<version>.jar --spel.function.expression=payload.length() > 4

Change kafka to rabbit if you want to run it against RabbitMQ.

Payload

You can pass any type as payload and then apply SpEL expressions against it to filter. If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

5.1.2. Options

spel.function.expression

A SpEL expression to apply. (String, default: <none>)

5.2. Splitter Processor

The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages. The processor uses a function that takes a Message<?> as input and then produces a List<Message<?> as output based on various properties (see below). You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.

Payload
  • Incoming payload - Message<?>

If the incoming type is byte[] and the content type is set to text/plain or application/json, then the application converts the byte[] into String.

  • Outgoing payload - List<Message<?>

5.2.2. Options

splitter.apply-sequence

Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default: true)

splitter.charset

The charset to use when converting bytes in text-based files to String. (String, default: <none>)

splitter.delimiters

When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default: <none>)

splitter.expression

A SpEL expression for splitting payloads. (String, default: <none>)

splitter.file-markers

Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default: <none>)

splitter.markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

5.3. Transform Processor

Transformer processor allows you to convert the message payload structure based on a SpEL expression.

Here is an example of how you can run this application.

java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()

Change kafka to rabbit if you want to run it against RabbitMQ.

Payload

Incoming message can contain any type of payload.

5.3.2. Options

spel.function.expression

A SpEL expression to apply. (String, default: <none>)

6. Sinks

6.1. Cassandra Sink

This sink application writes the content of each message it receives into Cassandra.

It expects a payload of JSON String and uses it’s properties to map to table columns.

Payload

A JSON String or byte array representing the entity (or a list of entities) to be persisted.

6.1.2. Options

The cassandra sink has the following options:

cassandra.cluster.create-keyspace

Flag to create (or not) keyspace on application startup. (Boolean, default: false)

cassandra.cluster.entity-base-packages

Base packages to scan for entities annotated with Table annotations. (String[], default: [])

cassandra.cluster.init-script

Resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default: <none>)

cassandra.cluster.skip-ssl-validation

Flag to validate the Servers' SSL certs (Boolean, default: false)

cassandra.consistency-level

The consistency level for write operation. (ConsistencyLevel, default: <none>)

cassandra.ingest-query

Ingest Cassandra query. (String, default: <none>)

cassandra.query-type

QueryType for Cassandra Sink. (Type, default: <none>, possible values: INSERT,UPDATE,DELETE,STATEMENT)

cassandra.statement-expression

Expression in Cassandra query DSL style. (Expression, default: <none>)

cassandra.ttl

Time-to-live option of WriteOptions. (Integer, default: 0)

spring.data.cassandra.cluster-name

<documentation missing> (String, default: <none>)

spring.data.cassandra.compression

Compression supported by the Cassandra binary protocol. (Compression, default: none, possible values: LZ4,SNAPPY,NONE)

spring.data.cassandra.connect-timeout

Socket option: connection time out. (Duration, default: <none>)

spring.data.cassandra.consistency-level

Queries consistency level. (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

spring.data.cassandra.contact-points

Cluster node addresses in the form 'host:port'. (List<String>, default: [127.0.0.1:9042])

spring.data.cassandra.fetch-size

<documentation missing> (Integer, default: <none>)

spring.data.cassandra.keyspace-name

Keyspace name to use. (String, default: <none>)

spring.data.cassandra.local-datacenter

Datacenter that is considered "local". Contact points should be from this datacenter. (String, default: <none>)

spring.data.cassandra.page-size

Queries default page size. (Integer, default: 5000)

spring.data.cassandra.password

Login password of the server. (String, default: <none>)

spring.data.cassandra.read-timeout

Socket option: read time out. (Duration, default: <none>)

spring.data.cassandra.schema-action

Schema action to take at startup. (String, default: none)

spring.data.cassandra.serial-consistency-level

Queries serial consistency level. (DefaultConsistencyLevel, default: <none>, possible values: ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)

spring.data.cassandra.session-name

Name of the Cassandra session. (String, default: <none>)

spring.data.cassandra.ssl

Enable SSL support. (Boolean, default: false)

spring.data.cassandra.username

Login user of the server. (String, default: <none>)

6.2. Counter Sink

Counter that compute multiple counters from the received messages. It leverages the Micrometer library and can use various popular TSDB to persist the counter values.

By default the Counter Sink increments the message.name counter on every received message. The message-counter-enabled allows you to disable this counter when required.

If tag expressions are provided (via the counter.tag.expression.<tagKey>=<tagValue SpEL expression> property) then the `name counter is incremented. Note that each SpEL expression can evaluate into multiple values resulting into multiple counter increments (one fore every value resolved).

If fixed tags are provided they are include in all message and expression counter increment measurements.

Counter’s implementation is based on the Micrometer library which is a Vendor-neutral application metrics facade that supports the most popular monitoring systems. See the Micrometer documentation for the list of supported monitoring systems. Starting with Spring Boot 2.0, Micrometer is the instrumentation library powering the delivery of application metrics from Spring Boot.

All Spring Cloud Stream App Starters are configured to support two of the most popular monitoring systems, Prometheus and InfluxDB. You can declaratively select which monitoring system to use. If you are not using Prometheus or InfluxDB, you can customise the App starters to use a different monitoring system as well as include your preferred micrometer monitoring system library in your own custom applications.

Grafana is a popular platform for building visualization dashboards.

To enable Micrometer’s Prometheus meter registry for Spring Cloud Stream application starters, set the following properties.

management.metrics.export.prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus

and disable the application’s security which allows for a simple Prometheus configuration to scrape counter information by setting the following property.

spring.cloud.streamapp.security.enabled=false

To enable Micrometer’s Influx meter registry for Spring Cloud Stream application starters, set the following property.

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
if the Data Flow Server metrics is enabled then the Counter will reuse the exiting configurations.

Following diagram illustrates Counter’s information collection and processing flow.

Counter Architecture
Payload

6.2.2. Options

counter.amount-expression

A SpEL expression (against the incoming Message) to derive the amount to add to the counter. If not set the counter is incremented by 1.0 (Expression, default: <none>)

counter.message-counter-enabled

Enables counting the number of messages processed. Uses the 'message.' counter name prefix to distinct it form the expression based counter. The message counter includes the fixed tags when provided. (Boolean, default: true)

counter.name

The name of the counter to increment. The 'name' and 'nameExpression' are mutually exclusive. Only one can be set. (String, default: <none>)

counter.name-expression

A SpEL expression (against the incoming Message) to derive the name of the counter to increment. The 'name' and 'nameExpression' are mutually exclusive. Only one can be set. (Expression, default: <none>)

counter.tag.expression

Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate counter increment. Tag expression format is: counter.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default: <none>)

counter.tag.fixed

Custom tags assigned to every counter increment measurements. This is a map so the property convention fixed tags is: counter.tag.fixed.[tag-name]=[tag-value] (Map<String, String>, default: <none>)

6.3. JDBC Sink

JDBC sink allows you to persist incoming payload into an RDBMS database.

The jdbc.consumer.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name"
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.consumer.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

6.3.1. Examples

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
Payload

6.3.2. Options

The jdbc sink has the following options:

jdbc.consumer.batch-size

Threshold in number of messages when data will be flushed to database table. (Integer, default: 1)

jdbc.consumer.columns

The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())

jdbc.consumer.idle-timeout

Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)

jdbc.consumer.initialize

'true', 'false' or the location of a custom initialization script for the table. (String, default: false)

jdbc.consumer.table-name

The name of the table to write into. (String, default: messages)

spring.datasource.data

Data (DML) script resource references. (List<String>, default: <none>)

spring.datasource.driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

spring.datasource.initialization-mode

Initialize the datasource with available DDL and DML scripts. (DataSourceInitializationMode, default: embedded, possible values: ALWAYS,EMBEDDED,NEVER)

spring.datasource.password

Login password of the database. (String, default: <none>)

spring.datasource.schema

Schema (DDL) script resource references. (List<String>, default: <none>)

spring.datasource.url

JDBC URL of the database. (String, default: <none>)

spring.datasource.username

Login username of the database. (String, default: <none>)

6.4. Log Sink

The log sink uses the application logger to output the data for inspection.

Please understand that log sink uses type-less handler, which affects how the actual logging will be performed. This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged. Please see more info in the user-guide.

6.4.1. Options

The log sink has the following options:

log.expression

A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default: payload)

log.level

The level at which to log messages. (Level, default: <none>, possible values: FATAL,ERROR,WARN,INFO,DEBUG,TRACE)

log.name

The name of the logger to use. (String, default: <none>)

6.5. MongoDB Sink

This sink application ingest incoming data into MongoDB. This application is fully based on the MongoDataAutoConfiguration, so refer to the Spring Boot MongoDB Support for more information.

6.5.1. Input

Payload
  • Any POJO

  • String

  • byte[]

6.5.2. Options

The mongodb sink has the following options:

mongodb.consumer.collection

The MongoDB collection to store data (String, default: <none>)

mongodb.consumer.collection-expression

The SpEL expression to evaluate MongoDB collection (Expression, default: <none>)

6.6. RabbitMQ Sink

This module sends messages to RabbitMQ.

6.6.1. Options

The rabbit sink has the following options:

(See the Spring Boot documentation for RabbitMQ connection properties)

rabbit.converter-bean-name

The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default: <none>)

rabbit.exchange

Exchange name - overridden by exchangeNameExpression, if supplied. (String, default: <empty string>)

rabbit.exchange-expression

A SpEL expression that evaluates to an exchange name. (Expression, default: <none>)

rabbit.mapped-request-headers

Headers that will be mapped. (String[], default: [*])

rabbit.own-connection

When true, use a separate connection based on the boot properties. (Boolean, default: false)

rabbit.persistent-delivery-mode

Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default: false)

rabbit.routing-key

Routing key - overridden by routingKeyExpression, if supplied. (String, default: <none>)

rabbit.routing-key-expression

A SpEL expression that evaluates to a routing key. (Expression, default: <none>)

spring.rabbitmq.addresses

Comma-separated list of addresses to which the client should connect. (String, default: <none>)

spring.rabbitmq.connection-timeout

Connection timeout. Set it to zero to wait forever. (Duration, default: <none>)

spring.rabbitmq.host

RabbitMQ host. (String, default: localhost)

spring.rabbitmq.password

Login to authenticate against the broker. (String, default: guest)

spring.rabbitmq.port

RabbitMQ port. (Integer, default: 5672)

spring.rabbitmq.publisher-confirm-type

Type of publisher confirms to use. (ConfirmType, default: <none>, possible values: SIMPLE,CORRELATED,NONE)

spring.rabbitmq.publisher-returns

Whether to enable publisher returns. (Boolean, default: false)

spring.rabbitmq.requested-channel-max

Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default: 2047)

spring.rabbitmq.requested-heartbeat

Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default: <none>)

spring.rabbitmq.username

Login user to authenticate to the broker. (String, default: guest)

spring.rabbitmq.virtual-host

Virtual host to use when connecting to the broker. (String, default: <none>)

Appendices

Appendix A: Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

A.1. Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

A.2. Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).