51. Frequently asked questions

In this section, we will review the frequently discussed questions in Spring Cloud Data Flow.

51.1 Advanced SpEL expressions

One of the powerful features of SpEL expressions is functions. Spring Integration provides jsonPath() and xpath() out-of-the-box SpEL-functions, if appropriate libraries are in the classpath. All the provided Spring Cloud Stream application starters are supplied with the json-path and spring-integration-xml jars, thus we can use those SpEL-functions in Spring Cloud Data Flow streams whenever expressions are possible. For example we can transform JSON-aware payload from the HTTP request using some jsonPath() expression:

dataflow:>stream create jsonPathTransform --definition "http | transform --expression=#jsonPath(payload,'$.price') | log" --deploy
...
dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.04}
dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.06}
dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.08}

In this sample we apply jsonPath for the incoming payload to extract just only the price field value. Similar syntax can be used with splitter or filter expression options. Actually any available SpEL-based option has access to the built-in SpEL-functions. For example we can extract some value from JSON data to calculate the partitionKey before sending output to the Binder:

dataflow:>stream deploy foo --properties "deployer.transform.count=2,app.transform.producer.partitionKeyExpression=#jsonPath(payload,'$.symbol')"

The same syntax can be applied for xpath() SpEL-function when you deal with XML data. Any other custom SpEL-function can also be used, but for this purpose you should build a library with the @Configuration class containing an appropriate SpelFunctionFactoryBean @Bean definition. The target Spring Cloud Stream application starter should be re-packaged to supply such a custom extension via built-in Spring Boot @ComponentScan mechanism or auto-configuration hook.

51.2 How to use JDBC-sink?

The JDBC-sink can be used to insert message payload data into a relational database table. By default, it inserts the entire payload into a table named after the jdbc.table-name property, and if it is not set, by default the application expects to use a table with the name messages. To alter this behavior, the JDBC sink accepts several options that you can pass using the --foo=bar notation in the stream, or change globally. The JDBC sink has a jdbc.initialize property that if set to true will result in the sink creating a table based on the specified configuration when the it starts up. If that initialize property is false, which is the default, you will have to make sure that the table to use is already available.

A stream definition using jdbc sink relying on all defaults with MySQL as the backing database looks like the following. In this example, the system time is persisted in MySQL for every second.

dataflow:>stream create --name mydata --definition "time | jdbc --spring.datasource.url=jdbc:mysql://localhost:3306/test --spring.datasource.username=root --spring.datasource.password=root --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver" --deploy

For this to work, you’d have to have the following table in the MySQL database.

CREATE TABLE test.messages
(
  payload varchar(255)
);
mysql> desc test.messages;
+---------+--------------+------+-----+---------+-------+
| Field   | Type         | Null | Key | Default | Extra |
+---------+--------------+------+-----+---------+-------+
| payload | varchar(255) | YES  |     | NULL    |       |
+---------+--------------+------+-----+---------+-------+
1 row in set (0.00 sec)
mysql> select * from test.messages;
+-------------------+
| payload           |
+-------------------+
| 04/25/17 09:10:04 |
| 04/25/17 09:10:06 |
| 04/25/17 09:10:07 |
| 04/25/17 09:10:08 |
| 04/25/17 09:10:09 |
.............
.............
.............

51.3 How to use multiple message-binders?

For situations where the data is consumed and processed between two different message brokers, Spring Cloud Data Flow provides easy to override global configurations, out-of-the-box bridge-processor, and DSL primitives to build these type of topologies.

Let’s assume we have data queueing up in RabbitMQ (e.g., queue = fooRabbit) and the requirement is to consume all the payloads and publish them to Apache Kafka (e.g., topic = barKafka), as the destination for downstream processing.

Follow the global application of configurations to define multiple binder configurations.

# Apache Kafka Global Configurations (i.e., identified by "kafka1")
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.type=kafka
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.zkNodes=localhost:2181

# RabbitMQ Global Configurations (i.e., identified by "rabbit1")
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.type=rabbit
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=localhost
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
[Note]Note

In this example, both the message brokers are running locally and reachable at localhost with respective ports.

These properties can be supplied in a ".properties" file that is accessible to the server directly or via config-server.

java -jar spring-cloud-dataflow-server-local/target/spring-cloud-dataflow-server-local-1.1.4.RELEASE.jar --spring.config.location=<PATH-TO-FILE>/foo.properties

Spring Cloud Data Flow internally uses bridge-processor to directly connect different named channel destinations. Since we are publishing and subscribing from two different messaging systems, you’d have to build the bridge-processor with both RabbitMQ and Apache Kafka binders in the classpath. To do that, head over to start-scs.cfapps.io/ and select Bridge Processor, Kafka binder starter, and Rabbit binder starter as the dependencies and follow the patching procedure described in the reference guide. Specifically, for the bridge-processor, you’d have to import the BridgeProcessorConfiguration provided by the starter.

Once you have the necessary adjustments, you can build the application. Let’s register the name of the application as multiBinderBridge.

dataflow:>app register --type processor --name multiBinderBridge --uri file:///<PATH-TO-FILE>/multipleBinderBridge-0.0.1-SNAPSHOT.jar

It is time to create a stream definition with the newly registered processor application.

dataflow:>stream create fooRabbitToBarKafka --definition ":fooRabbit > multiBinderBridge --spring.cloud.stream.bindings.input.binder=rabbit1 --spring.cloud.stream.bindings.output.binder=kafka1 > :barKafka" --deploy
[Note]Note

Since we are to consume messages from RabbitMQ (i.e., identified by rabbit1) and then publish the payload to Apache Kafka (i.e., identified by kafka1), we are supplying them as input and output channel settings respectively.

[Note]Note

The queue fooRabbit in RabbitMQ is where the stream is consuming events from and the topic barKafka in Apache Kafka is where the data is finally landing.