In this section, we will review the frequently discussed questions in Spring Cloud Data Flow.
One of the powerful features of SpEL expressions is functions.
Spring Integration provides jsonPath()
and xpath()
out-of-the-box SpEL-functions, if appropriate libraries are in the classpath.
All the provided Spring Cloud Stream application starters are supplied with the json-path
and spring-integration-xml
jars, thus we can use those SpEL-functions in Spring Cloud Data Flow streams whenever expressions are possible.
For example we can transform JSON-aware payload
from the HTTP request using some jsonPath()
expression:
dataflow:>stream create jsonPathTransform --definition "http | transform --expression=#jsonPath(payload,'$.price') | log" --deploy ... dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.04} dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.06} dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.08}
In this sample we apply jsonPath for the incoming payload to extract just only the price
field value.
Similar syntax can be used with splitter
or filter
expression
options.
Actually any available SpEL-based option has access to the built-in SpEL-functions.
For example we can extract some value from JSON data to calculate the partitionKey
before sending output to the Binder:
dataflow:>stream deploy foo --properties "deployer.transform.count=2,app.transform.producer.partitionKeyExpression=#jsonPath(payload,'$.symbol')"
The same syntax can be applied for xpath()
SpEL-function when you deal with XML data.
Any other custom SpEL-function can also be used, but for this purpose you should build a library with the @Configuration
class containing an appropriate SpelFunctionFactoryBean
@Bean
definition.
The target Spring Cloud Stream application starter should be re-packaged to supply such a custom extension via built-in Spring Boot @ComponentScan
mechanism or auto-configuration hook.
The JDBC-sink can be used to insert message payload data into a relational database table. By default,
it inserts the entire payload into a table named after the jdbc.table-name
property, and if it is not set,
by default the application expects to use a table with the name messages
. To alter this behavior, the
JDBC sink accepts several options that you can pass using the --foo=bar notation in the stream, or change globally.
The JDBC sink has a jdbc.initialize
property that if set to true
will result in the sink creating a table based on the specified configuration when the it starts up. If that initialize property is false
, which is the default, you will have to make sure that the table to use is already available.
A stream definition using jdbc
sink relying on all defaults with MySQL as the backing database looks
like the following. In this example, the system time is persisted in MySQL for every second.
dataflow:>stream create --name mydata --definition "time | jdbc --spring.datasource.url=jdbc:mysql://localhost:3306/test --spring.datasource.username=root --spring.datasource.password=root --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver" --deploy
For this to work, you’d have to have the following table in the MySQL database.
CREATE TABLE test.messages ( payload varchar(255) );
mysql> desc test.messages; +---------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------+--------------+------+-----+---------+-------+ | payload | varchar(255) | YES | | NULL | | +---------+--------------+------+-----+---------+-------+ 1 row in set (0.00 sec)
mysql> select * from test.messages; +-------------------+ | payload | +-------------------+ | 04/25/17 09:10:04 | | 04/25/17 09:10:06 | | 04/25/17 09:10:07 | | 04/25/17 09:10:08 | | 04/25/17 09:10:09 | ............. ............. .............
For situations where the data is consumed and processed between two different message brokers, Spring
Cloud Data Flow provides easy to override global configurations, out-of-the-box bridge-processor
,
and DSL primitives to build these type of topologies.
Let’s assume we have data queueing up in RabbitMQ (e.g., queue = fooRabbit
) and the requirement
is to consume all the payloads and publish them to Apache Kafka (e.g., topic = barKafka
), as the
destination for downstream processing.
Follow the global application of configurations to define multiple binder configurations.
# Apache Kafka Global Configurations (i.e., identified by "kafka1") spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.type=kafka spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092 spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.zkNodes=localhost:2181 # RabbitMQ Global Configurations (i.e., identified by "rabbit1") spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.type=rabbit spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=localhost spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
Note | |
---|---|
In this example, both the message brokers are running locally and reachable at |
These properties can be supplied in a ".properties" file that is accessible to the server directly or via
config-server
.
java -jar spring-cloud-dataflow-server-local/target/spring-cloud-dataflow-server-local-1.1.4.RELEASE.jar --spring.config.location=<PATH-TO-FILE>/foo.properties
Spring Cloud Data Flow internally uses bridge-processor
to directly connect different named channel
destinations. Since we are publishing and subscribing from two different messaging systems, you’d have
to build the bridge-processor
with both RabbitMQ and Apache Kafka binders in the classpath. To do that,
head over to start-scs.cfapps.io/ and select Bridge Processor
, Kafka binder starter
, and
Rabbit binder starter
as the dependencies and follow the patching procedure described in the
reference guide.
Specifically, for the bridge-processor
, you’d have to import the BridgeProcessorConfiguration
provided by the starter.
Once you have the necessary adjustments, you can build the application. Let’s register the name of the
application as multiBinderBridge
.
dataflow:>app register --type processor --name multiBinderBridge --uri file:///<PATH-TO-FILE>/multipleBinderBridge-0.0.1-SNAPSHOT.jar
It is time to create a stream definition with the newly registered processor application.
dataflow:>stream create fooRabbitToBarKafka --definition ":fooRabbit > multiBinderBridge --spring.cloud.stream.bindings.input.binder=rabbit1 --spring.cloud.stream.bindings.output.binder=kafka1 > :barKafka" --deploy
Note | |
---|---|
Since we are to consume messages from RabbitMQ (i.e., identified by |
Note | |
---|---|
The queue |