You can set the maven properties such as local maven repository location, remote maven repositories and their authentication credentials including
the proxy server properties via commandline properties when starting the Dataflow server or using the SPRING_APPLICATION_JSON
environment property
for the Dataflow server.
The remote maven repositories need to be configured explicitly if the apps are resolved using maven repository except for local
Data Flow server. The other
Data Flow server implementations (that use maven resources for app artifacts resolution) have no default value for remote repositories.
The local
server has repo.spring.io/libs-snapshot
as the default remote repository.
To pass the properties as commandline options:
$ java -jar <dataflow-server>.jar --maven.localRepository=mylocal
--maven.remote-repositories.repo1.url=https://repo1
--maven.remote-repositories.repo1.auth.username=repo1user
--maven.remote-repositories.repo1.auth.password=repo1pass
--maven.remote-repositories.repo2.url=https://repo2 --maven.proxy.host=proxyhost
--maven.proxy.port=9018 --maven.proxy.auth.username=proxyuser
--maven.proxy.auth.password=proxypass
or, using the SPRING_APPLICATION_JSON
environment property:
export SPRING_APPLICATION_JSON='{ "maven": { "local-repository": "local","remote-repositories": { "repo1": { "url": "https://repo1", "auth": { "username": "repo1user", "password": "repo1pass" } }, "repo2": { "url": "https://repo2" } }, "proxy": { "host": "proxyhost", "port": 9018, "auth": { "username": "proxyuser", "password": "proxypass" } } } }'
Formatted JSON:
SPRING_APPLICATION_JSON='{ "maven": { "local-repository": "local", "remote-repositories": { "repo1": { "url": "https://repo1", "auth": { "username": "repo1user", "password": "repo1pass" } }, "repo2": { "url": "https://repo2" } }, "proxy": { "host": "proxyhost", "port": 9018, "auth": { "username": "proxyuser", "password": "proxypass" } } } }'
![]() | Note |
---|---|
Depending on Spring Cloud Data Flow server implementation, you may have to pass the
environment properties using the platform specific environment-setting capabilities. For instance,
in Cloud Foundry, you’d be passing them as |
Spring Cloud Data Flow is built upon several Spring projects, but ultimately the dataflow-server is a Spring Boot app, so the logging techniques that apply to any Spring Boot application are applicable here as well.
While troubleshooting, following are the two primary areas where enabling the DEBUG logs could be useful.
Spring Cloud Data Flow builds upon Spring Cloud Deployer SPI and the platform specific dataflow-server uses the respective SPI implementations. Specifically, if we were to troubleshoot deployment specific issues; such as the network errors, it’d be useful to enable the DEBUG logs at the underlying deployer and the libraries used by it.
For instance, if you’d like to enable DEBUG logs for the local-deployer, you’d be starting the server with following.
$ java -jar <dataflow-server>.jar --logging.level.org.springframework.cloud.deployer.spi.local=DEBUG
(where, org.springframework.cloud.deployer.spi.local
is the global package for everything local-deployer
related)
For instance, if you’d like to enable DEBUG logs for the cloudfoundry-deployer, you’d be setting the following environment variable and upon restaging the dataflow-server, we will see more logs around request, response and the elaborate stack traces (upon failures). The cloudfoundry-deployer uses cf-java-client, so we will have to enable DEBUG logs for this library.
$ cf set-env dataflow-server JAVA_OPTS '-Dlogging.level.cloudfoundry-client=DEBUG' $ cf restage dataflow-server
(where, cloudfoundry-client
is the global package for everything cf-java-client
related)
If there’s a need to review Reactor logs, which is used by the cf-java-client
, then the following
would be helpful.
$ cf set-env dataflow-server JAVA_OPTS '-Dlogging.level.cloudfoundry-client=DEBUG -Dlogging.level.reactor.ipc.netty=DEBUG' $ cf restage dataflow-server
(where, reactor.ipc.netty
is the global package for everything reactor-netty
related)
![]() | Note |
---|---|
Similar to the |
The streaming applications in Spring Cloud Data Flow are Spring Boot applications and they can be independently setup with logging configurations.
For instance, if you’d have to troubleshoot the header
and payload
specifics that are being passed
around source, processor and sink channels, you’d be deploying the stream with the following
options.
dataflow:>stream create foo --definition "http --logging.level.org.springframework.integration=DEBUG | transform --logging.level.org.springframework.integration=DEBUG | log --logging.level.org.springframework.integration=DEBUG" --deploy
(where, org.springframework.integration
is the global package for everything Spring Integration related,
which is responsible for messaging channels)
These properties can also be specified via deployment
properties when deploying the stream.
dataflow:>stream deploy foo --properties "app.*.logging.level.org.springframework.integration=DEBUG"
Given that each application is a separate process with each maintaining it’s own set of logs, accessing individual logs could be a bit inconvinient especially in the early stages of the development when logs are accessed more often (i.e., debugging, troubleshooting, etc.). Since it is also a common pattern to rely on Local SCDF Server which deploys each application as a local JVM process using local-deployer, the framework provides support for redirecting such logs to a parent process’s streams (both stdout and stderr). So essentially with Local SCDF Server the application logs will appear in the logs of running Local SCDF Server.
Typically when you deploy the stream you will see something like this in the server logs:
017-06-28 09:50:16.372 INFO 41161 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalAppDeployer : Deploying app with deploymentId mystream.myapp instance 0. Logs will be in /var/folders/l2/63gcnd9d7g5dxxpjbgr0trpw0000gn/T/spring-cloud-dataflow-5939494818997196225/mystream-1498661416369/mystream.myapp
However, by providing local.inheritLogging=true
as a deployment property you will see the following:
017-06-28 09:50:16.372 INFO 41161 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalAppDeployer : Deploying app with deploymentId mystream.myapp instance 0. Logs will be inherited.
After which the application logs will appear along side the server logs.
For example:
stream deploy --name mystream --properties "deployer.*.local.inheritLogging=true”
The above will enable log redirection for each application in the stream
stream deploy --name mystream --properties "deployer.myapp.local.inheritLogging=true”
The above will enable log redirection for application named ‘my app’ only.
![]() | Note |
---|---|
Log redirect is only supported with local-deployer. |
In this section, we will review the frequently discussed questions in Spring Cloud Data Flow.
One of the powerful features of SpEL expressions is functions.
Spring Integration provides jsonPath()
and xpath()
out-of-the-box SpEL-functions, if appropriate libraries are in the classpath.
All the provided Spring Cloud Stream application starters are supplied with the json-path
and spring-integration-xml
jars, thus we can use those SpEL-functions in Spring Cloud Data Flow streams whenever expressions are possible.
For example we can transform JSON-aware payload
from the HTTP request using some jsonPath()
expression:
dataflow:>stream create jsonPathTransform --definition "http | transform --expression=#jsonPath(payload,'$.price') | log" --deploy ... dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.04} dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.06} dataflow:> http post --target http://localhost:8080 --data {"symbol":"SCDF","price":72.08}
In this sample we apply jsonPath for the incoming payload to extract just only the price
field value.
Similar syntax can be used with splitter
or filter
expression
options.
Actually any available SpEL-based option has access to the built-in SpEL-functions.
For example we can extract some value from JSON data to calculate the partitionKey
before sending output to the Binder:
dataflow:>stream deploy foo --properties "deployer.transform.count=2,app.transform.producer.partitionKeyExpression=#jsonPath(payload,'$.symbol')"
The same syntax can be applied for xpath()
SpEL-function when you deal with XML data.
Any other custom SpEL-function can also be used, but for this purpose you should build a library with the @Configuration
class containing an appropriate SpelFunctionFactoryBean
@Bean
definition.
The target Spring Cloud Stream application starter should be re-packaged to supply such a custom extension via built-in Spring Boot @ComponentScan
mechanism or auto-configuration hook.
The JDBC-sink can be used to insert message payload data into a relational database table. By default,
it inserts the entire payload into a table named after the jdbc.table-name
property, and if it is not set,
by default the application expects to use a table with the name messages
. To alter this behavior, the
JDBC sink accepts several options that you can pass using the --foo=bar notation in the stream, or change globally.
The JDBC sink has a jdbc.initialize
property that if set to true
will result in the sink creating a table based on the specified configuration when the it starts up. If that initialize property is false
, which is the default, you will have to make sure that the table to use is already available.
A stream definition using jdbc
sink relying on all defaults with MySQL as the backing database looks
like the following. In this example, the system time is persisted in MySQL for every second.
dataflow:>stream create --name mydata --definition "time | jdbc --spring.datasource.url=jdbc:mysql://localhost:3306/test --spring.datasource.username=root --spring.datasource.password=root --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver" --deploy
For this to work, you’d have to have the following table in the MySQL database.
CREATE TABLE test.messages ( payload varchar(255) );
mysql> desc test.messages; +---------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------+--------------+------+-----+---------+-------+ | payload | varchar(255) | YES | | NULL | | +---------+--------------+------+-----+---------+-------+ 1 row in set (0.00 sec)
mysql> select * from test.messages; +-------------------+ | payload | +-------------------+ | 04/25/17 09:10:04 | | 04/25/17 09:10:06 | | 04/25/17 09:10:07 | | 04/25/17 09:10:08 | | 04/25/17 09:10:09 | ............. ............. .............
For situations where the data is consumed and processed between two different message brokers, Spring
Cloud Data Flow provides easy to override global configurations, out-of-the-box bridge-processor
,
and DSL primitives to build these type of topologies.
Let’s assume we have data queueing up in RabbitMQ (e.g., queue = fooRabbit
) and the requirement
is to consume all the payloads and publish them to Apache Kafka (e.g., topic = barKafka
), as the
destination for downstream processing.
Follow the global application of configurations to define multiple binder configurations.
# Apache Kafka Global Configurations (i.e., identified by "kafka1") spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.type=kafka spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092 spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.zkNodes=localhost:2181 # RabbitMQ Global Configurations (i.e., identified by "rabbit1") spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.type=rabbit spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=localhost spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
![]() | Note |
---|---|
In this example, both the message brokers are running locally and reachable at |
These properties can be supplied in a ".properties" file that is accessible to the server directly or via
config-server
.
java -jar spring-cloud-dataflow-server-local/target/spring-cloud-dataflow-server-local-1.1.4.RELEASE.jar --spring.config.location=<PATH-TO-FILE>/foo.properties
Spring Cloud Data Flow internally uses bridge-processor
to directly connect different named channel
destinations. Since we are publishing and subscribing from two different messaging systems, you’d have
to build the bridge-processor
with both RabbitMQ and Apache Kafka binders in the classpath. To do that,
head over to start-scs.cfapps.io/ and select Bridge Processor
, Kafka binder starter
, and
Rabbit binder starter
as the dependencies and follow the patching procedure described in the
reference guide.
Specifically, for the bridge-processor
, you’d have to import the BridgeProcessorConfiguration
provided by the starter.
Once you have the necessary adjustments, you can build the application. Let’s register the name of the
application as multiBinderBridge
.
dataflow:>app register --type processor --name multiBinderBridge --uri file:///<PATH-TO-FILE>/multipleBinderBridge-0.0.1-SNAPSHOT.jar
It is time to create a stream definition with the newly registered processor application.
dataflow:>stream create fooRabbitToBarKafka --definition ":fooRabbit > multiBinderBridge --spring.cloud.stream.bindings.input.binder=rabbit1 --spring.cloud.stream.bindings.output.binder=kafka1 > :barKafka" --deploy
![]() | Note |
---|---|
Since we are to consume messages from RabbitMQ (i.e., identified by |
![]() | Note |
---|---|
The queue |