Version 1.0.0.BUILD-SNAPSHOT

© 2012-2017 Pivotal Software, Inc.

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

1. Overview

This guide contains samples and demonstrations of how to build data pipelines with Spring Cloud Data Flow.

2. Java DSL

2.1. Deploying a stream programmaticaly

This sample shows the two usage styles of the Java DSL to create and deploy a stream. You should look in the source code to get a feel for the different styles.

1) Build the sample application

./mvnw clean package

With no command line options, the application will deploy the stream http --server.port=9900 | splitter --expression=payload.split(' ') | log using the URI localhost:9393 to connect to the Data Flow server. There is also a command line option --style whose value can be either definition or fluent. This options picks which JavaDSL style will execute. Both are identical in terms of behavior. The spring-cloud-dataflow-rest-client project provides auto-configuration for DataFlowOperations and StreamBuilder

The properties in DataFlowClientProperties can be used to configure the connection to the Data Flow server. The common property to start using is spring.cloud.dataflow.client.uri

@Autowired
private DataFlowOperations dataFlowOperations;

@Autowired
private StreamBuilder builder;

You can use those beans to build streams as well as work directly with `DataFlowOperations" REST client.

The definition style has code of the style

Stream woodchuck = builder
        .name("woodchuck")
        .definition("http --server.port=9900 | splitter --expression=payload.split(' ') | log")
        .create()
        .deploy(deploymentProperties);

while the fluent style has code of the style

Stream woodchuck = builder.name("woodchuck")
        .source(source)
        .processor(processor)
        .sink(sink)
        .create()
        .deploy(deploymentProperties);

where source, processor, and sink variables were defined as @Bean`s of the type `StreamApplication

@Bean
public StreamApplication source() {
  return new StreamApplication("http").addProperty("server.port", 9900);
}

Another useful class is the DeploymentPropertiesBuilder which aids in the creation of the Map of properties required to deploy stream applications.

	private Map<String, String> createDeploymentProperties() {
		DeploymentPropertiesBuilder propertiesBuilder = new DeploymentPropertiesBuilder();
		propertiesBuilder.memory("log", 512);
		propertiesBuilder.count("log",2);
		propertiesBuilder.put("app.splitter.producer.partitionKeyExpression", "payload");
		return propertiesBuilder.build();
	}

2) Run a local Data Flow Server and run the sample application. This sample demonstrates the use of the local Data Flow Server, but you can pass in the option --uri to point to another Data Flow server instance that is running elsewhere.

$ java -jar target/scdfdsl-0.0.1-SNAPSHOT.jar

You will then see the following output.

Deploying stream.
Wating for deployment of stream.
Wating for deployment of stream.
Wating for deployment of stream.
Wating for deployment of stream.
Wating for deployment of stream.
Letting the stream run for 2 minutes.

To verify that the application has been deployed successfully, will tail the logs of one of the log sinks and post some data to the http source. You can find the location for the logs of one of the log sink applications by looking in the Data Flow server’s log file.

3) Post some data to the server

curl http://localhost:9900 -H "Content-Type:text/plain"   -X POST -d "how much wood would a woodchuck chuck if a woodchuck could chuck wood"

5) Verify the output Tailing the log file of the first instance

cd /tmp/spring-cloud-dataflow-4323595028663837160/woodchuck-1511390696355/woodchuck.log
tail -f stdout_0.log
2017-11-22 18:04:08.631  INFO 26652 --- [r.woodchuck-0-1] log-sink          : how
2017-11-22 18:04:08.632  INFO 26652 --- [r.woodchuck-0-1] log-sink          : chuck
2017-11-22 18:04:08.634  INFO 26652 --- [r.woodchuck-0-1] log-sink          : chuck

Tailing the log file of the second instance

cd /tmp/spring-cloud-dataflow-4323595028663837160/woodchuck-1511390696355/woodchuck.log
tail -f stdout_1.log

You should see the output

$ tail -f stdout_1.log
2017-11-22 18:04:08.636  INFO 26655 --- [r.woodchuck-1-1] log-sink          : much
2017-11-22 18:04:08.638  INFO 26655 --- [r.woodchuck-1-1] log-sink          : wood
2017-11-22 18:04:08.639  INFO 26655 --- [r.woodchuck-1-1] log-sink          : would
2017-11-22 18:04:08.640  INFO 26655 --- [r.woodchuck-1-1] log-sink          : a
2017-11-22 18:04:08.641  INFO 26655 --- [r.woodchuck-1-1] log-sink          : woodchuck
2017-11-22 18:04:08.642  INFO 26655 --- [r.woodchuck-1-1] log-sink          : if
2017-11-22 18:04:08.644  INFO 26655 --- [r.woodchuck-1-1] log-sink          : a
2017-11-22 18:04:08.645  INFO 26655 --- [r.woodchuck-1-1] log-sink          : woodchuck
2017-11-22 18:04:08.646  INFO 26655 --- [r.woodchuck-1-1] log-sink          : could
2017-11-22 18:04:08.647  INFO 26655 --- [r.woodchuck-1-1] log-sink          : wood

Note that the partitioning is done based on the hash of the java.lang.String object.

3. Streaming

3.1. HTTP to Cassandra Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an HTTP endpoint and write the payload to a Cassandra database.

We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

3.1.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

3.1.2. Using the Local Server

Additional Prerequisites
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • Running instance of Kafka

  • Running instance of Apache Cassandra

  • A database utility tool such as DBeaver to connect to the Cassandra instance. You might have to provide host, port, username and password depending on the Cassandra configuration you are using.

  • Create a keyspace and a book table in Cassandra using:

CREATE KEYSPACE clouddata WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1' } AND DURABLE_WRITES = true;
USE clouddata;
CREATE TABLE book  (
    id          uuid PRIMARY KEY,
    isbn        text,
    author      text,
    title       text
);
Building and Running the Demo
  1. Register the out-of-the-box applications for the Kafka binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-kafka-10-maven
  2. Create the stream

    dataflow:>stream create cassandrastream --definition "http --server.port=8888 --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --keyspace=clouddata" --deploy
    
    Created and deployed new stream 'cassandrastream'
    If Cassandra isn’t running on default port on localhost or if you need username and password to connect, use one of the following options to specify the necessary connection parameters: --username='<USERNAME>' --password='<PASSWORD>' --port=<PORT> --contact-points=<LIST-OF-HOSTS>
  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that cassandrastream-http and cassandrastream-cassandra Spring Cloud Stream applications are running as Spring Boot applications within the server as a collocated process.

    2015-12-15 15:52:31.576  INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:cassandra-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.cassandra
    2015-12-15 15:52:31.583  INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:http-source:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.http
  5. Post sample data pointing to the http endpoint: localhost:8888 (8888 is the server.port we specified for the http source in this case)

    dataflow:>http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://localhost:8888
    > POST (application/json;charset=UTF-8) http://localhost:8888 {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}
    > 202 ACCEPTED
  6. Connect to the Cassandra instance and query the table clouddata.book to list the persisted records

    select * from clouddata.book;
  7. You’re done!

3.1.3. Using the Cloud Foundry Server

Additional Prerequisites
  • Cloud Foundry instance

  • A rabbit service instance

  • A Running instance of cassandra in Cloud Foundry or from another Cloud provider

  • A database utility tool such as DBeaver to connect to the Cassandra instance. You might have to provide host, port, username and password depending on the Cassandra configuration you are using.

  • Create a book table in your Cassandra keyspace using:

    CREATE TABLE book  (
        id          uuid PRIMARY KEY,
        isbn        text,
        author      text,
        title       text
    );
  • The Spring Cloud Data Flow Cloud Foundry Server

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Create the stream

    dataflow:>stream create cassandrastream --definition "http --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --username='<USERNAME>' --password='<PASSWORD>' --port=<PORT> --contact-points=<HOST> --keyspace='<KEYSPACE>'" --deploy
    
    Created and deployed new stream 'cassandrastream'

You may want to change the cassandrastream name in PCF if you have enabled random application name prefix, you could run into issues with the route name being too long.

+ . Verify the stream is successfully deployed

+

dataflow:>stream list

+ . Notice that cassandrastream-http and cassandrastream-cassandra Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry

+

$ cf apps
Getting apps in org [your-org] / space [your-space] as user...
OK

name                        requested state   instances   memory   disk   urls
cassandrastream-cassandra   started           1/1         1G       1G     cassandrastream-cassandra.app.io
cassandrastream-http        started           1/1         1G       1G     cassandrastream-http.app.io
dataflow-server             started           1/1         1G       1G     dataflow-server.app.io

+ . Lookup the url for cassandrastream-http application from the list above. Post sample data pointing to the http endpoint: <YOUR-cassandrastream-http-APP-URL>

+

http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://<YOUR-cassandrastream-http-APP-URL>
> POST (application/json;charset=UTF-8) http://cassandrastream-http.app.io {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}
> 202 ACCEPTED

+ . Connect to the Cassandra instance and query the table book to list the data inserted

+

select * from book;

+ . Now, let’s try to take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the cassandrastream-http application from 1 to 3 instances

+

$ cf scale cassandrastream-http -i 3
Scaling app cassandrastream-http in org user-dataflow / space development as user...
OK

+ . Verify App instances (3/3) running successfully

+

$ cf apps
Getting apps in org user-dataflow / space development as user...
OK

name                        requested state   instances   memory   disk   urls
cassandrastream-cassandra   started           1/1         1G       1G     cassandrastream-cassandra.app.io
cassandrastream-http        started           3/3         1G       1G     cassandrastream-http.app.io
dataflow-server             started           1/1         1G       1G     dataflow-server.app.io

+ . You’re done!

3.1.4. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers

  • How to use Spring Cloud Data Flow’s shell

  • How to create streaming data pipeline to connect and write to Cassandra

  • How to scale applications on Pivotal Cloud Foundry :sectnums: :docs_dir: ../../..

3.2. HTTP to MySQL Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http endpoint and write to MySQL database using jdbc sink.

We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

3.2.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

3.2.2. Using the Local Server

Additional Prerequisites
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • Running instance of Kafka

  • Running instance of MySQL

  • A database utility tool such as DBeaver or DbVisualizer

  • Create the test database with a names table (in MySQL) using:

    CREATE DATABASE test;
    USE test;
    CREATE TABLE names
    (
    	name varchar(255)
    );
Building and Running the Demo
  1. Register the out-of-the-box applications for the Kafka binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-kafka-10-maven
  2. Create the stream

    dataflow:>stream create --name mysqlstream --definition "http --server.port=8787 | jdbc --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver --spring.datasource.url='jdbc:mysql://localhost:3306/test'" --deploy
    
    Created and deployed new stream 'mysqlstream'
    If MySQL isn’t running on default port on localhost or if you need username and password to connect, use one of the following options to specify the necessary connection parameters: --spring.datasource.url='jdbc:mysql://<HOST>:<PORT>/<NAME>' --spring.datasource.username=<USERNAME> --spring.datasource.password=<PASSWORD>
  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that mysqlstream-http and mysqlstream-jdbc Spring Cloud Stream applications are running as Spring Boot applications within the Local server as collocated processes.

    2016-05-03 09:29:55.918  INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app mysqlstream.jdbc instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream1-1462292995903/mysqlstream.jdbc
    2016-05-03 09:29:55.939  INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app mysqlstream.http instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream-1462292995934/mysqlstream.http
  5. Post sample data pointing to the http endpoint: localhost:8787 [8787 is the server.port we specified for the http source in this case]

    dataflow:>http post --contentType 'application/json' --target http://localhost:8787 --data "{\"name\": \"Foo\"}"
    > POST (application/json;charset=UTF-8) http://localhost:8787 {"name": "Spring Boot"}
    > 202 ACCEPTED
  6. Connect to the MySQL instance and query the table test.names to list the new rows:

    select * from test.names;
  7. You’re done!

3.2.3. Using the Cloud Foundry Server

Additional Prerequisites
  • Cloud Foundry instance

  • Running instance of rabbit in Cloud Foundry

  • Running instance of mysql in Cloud Foundry

  • A database utility tool such as DBeaver or DbVisualizer

  • Create the names table (in MySQL) using:

    CREATE TABLE names
    (
    	name varchar(255)
    );
  • The Spring Cloud Data Flow Cloud Foundry Server

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Create the stream

    dataflow:>stream create --name mysqlstream --definition "http | jdbc --tableName=names --columns=name"
    Created new stream 'mysqlstream'
    
    dataflow:>stream deploy --name mysqlstream --properties "deployer.jdbc.cloudfoundry.services=mysql"
    Deployed stream 'mysqlstream'
    By supplying the deployer.jdbc.cloudfoundry.services=mysql property, we are deploying the stream with jdbc-sink to automatically bind to mysql service and only this application in the stream gets the service binding. This also eliminates the requirement to supply datasource credentials in stream definition.
  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that mysqlstream-http and mysqlstream-jdbc Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry

    $ cf apps
    Getting apps in org user-dataflow / space development as user...
    OK
    
    name                        requested state   instances   memory   disk   urls
    mysqlstream-http            started           1/1         1G       1G     mysqlstream-http.app.io
    mysqlstream-jdbc            started           1/1         1G       1G     mysqlstream-jdbc.app.io
    dataflow-server             started           1/1         1G       1G     dataflow-server.app.io
  5. Lookup the url for mysqlstream-http application from the list above. Post sample data pointing to the http endpoint: <YOUR-mysqlstream-http-APP-URL>

    http post --contentType 'application/json' --data "{\"name\": \"Bar"} --target http://mysqlstream-http.app.io "
    > POST (application/json;charset=UTF-8) http://mysqlstream-http.app.io {"name": "Bar"}
    > 202 ACCEPTED
  6. Connect to the MySQL instance and query the table names to list the new rows:

    select * from names;
  7. Now, let’s take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the mysqlstream-http application from 1 to 3 instances

    $ cf scale mysqlstream-http -i 3
    Scaling app mysqlstream-http in org user-dataflow / space development as user...
    OK
  8. Verify App instances (3/3) running successfully

    $ cf apps
    Getting apps in org user-dataflow / space development as user...
    OK
    
    name                        requested state   instances   memory   disk   urls
    mysqlstream-http            started           3/3         1G       1G     mysqlstream-http.app.io
    mysqlstream-jdbc            started           1/1         1G       1G     mysqlstream-jdbc.app.io
    dataflow-server             started           1/1         1G       1G     dataflow-server.app.io
  9. You’re done!

3.2.4. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers

  • How to use Spring Cloud Data Flow’s shell

  • How to create streaming data pipeline to connect and write to MySQL

  • How to scale applications on Pivotal Cloud Foundry :sectnums: :docs_dir: ../../..

3.3. HTTP to Gemfire Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http endpoint and write to Gemfire using the gemfire sink.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.3.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.2.1

Monitor and Manage Apache Geode
gfsh>

3.3.2. Using the Local Server

Additional Prerequisites
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
Building and Running the Demo
  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Stocks

    gfsh>create region --name Stocks --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  4. Create the stream

    This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol and price fields. The property --json=true to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way to store JSON documents in Geode. The keyExpression property is a SpEL expression used to extract the symbol value the PdxInstance to use as an entry key.

    PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged.
    dataflow:>stream create --name stocks --definition "http --port=9090 | gemfire --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
    Created and deployed new stream 'stocks'
    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Post sample data pointing to the http endpoint: localhost:9090 (9090 is the port we specified for the http source)

    dataflow:>http post --target http://localhost:9090 --contentType application/json --data '{"symbol":"VMW","price":117.06}'
    > POST (application/json) http://localhost:9090 {"symbol":"VMW","price":117.06}
    > 202 ACCEPTED
  7. Using gfsh, connect to the locator if not already connected, and verify the cache entry was created.

    gfsh>get --key='VMW' --region=/Stocks
    Result      : true
    Key Class   : java.lang.String
    Key         : VMW
    Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl
    
    symbol | price
    ------ | ------
    VMW    | 117.06
  8. You’re done!

Using the Cloud Foundry Server
Additional Prerequisites
  • A Cloud Foundry instance

  • Running instance of a rabbit service in Cloud Foundry

  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.

  • Cloud Data Flow Cloud Foundry Server

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "http://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Stocks region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Stocks --type=REPLICATE
  4. Create the stream, connecting to the PCC instance as developer

    This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol and price fields. The property --json=true to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way to store JSON documents in Geode. The keyExpression property is a SpEL expression used to extract the symbol value the PdxInstance to use as an entry key.

    PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged.
    dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Post sample data pointing to the http endpoint

    Get the url of the http source using cf apps

    dataflow:>http post --target http://<http source url> --contentType application/json --data '{"symbol":"VMW","price":117.06}'
    > POST (application/json) http://... {"symbol":"VMW","price":117.06}
    > 202 ACCEPTED
  7. Using gfsh, connect to the PCC instance as cluster_operator using the service key values.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>get --key='VMW' --region=/Stocks
    Result      : true
    Key Class   : java.lang.String
    Key         : VMW
    Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl
    
    symbol | price
    ------ | ------
    VMW    | 117.06
  8. You’re done!

3.3.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers

  • How to use Spring Cloud Data Flow’s shell

  • How to create streaming data pipeline to connect and write to gemfire :sectnums: :docs_dir: ../../..

3.4. Gemfire CQ to Log Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire-cq (Continuous Query) endpoint and write to a log using the log sink. The gemfire-cq source creates a Continuous Query to monitor events for a region that match the query’s result set and publish a message whenever such an event is emitted. In this example, we simulate monitoring orders to trigger a process whenever the quantity ordered is above a defined limit.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.4.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.2.1

Monitor and Manage Apache Geode
gfsh>

3.4.2. Using the Local Server

Additional Prerequisites
  • A Running Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
Building and Running the Demo
  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Orders

    gfsh>create region --name Orders --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  4. Create the stream

    This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders region. For simplicity, we will avoid creating a data structure for the order. Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the cq-event-expression property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this. In order to display the contents in the log, we will invoke toString() on the instance.

    dataflow:>stream create --name orders --definition " gemfire-cq --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString() | log" --deploy
    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this

    2017-10-30 09:39:36.283  INFO 8167 --- [nio-9393-exec-5] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId orders.log instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log

    Copy the location of the log sink logs. This is a directory that ends in orders.log. The log files will be in stdout_0.log under this directory. You can monitor the output of the log sink using tail, or something similar:

    $tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log/stdout_0.log
  7. Using gfsh, create and update some cache entries

    gfsh>put --region Orders  --value-class java.lang.Integer --key 01234 --value 1000
    gfsh>put --region Orders  --value-class java.lang.Integer --key 11234 --value 1005
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 100
    gfsh>put --region Orders  --value-class java.lang.Integer --key 31234 --value 999
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 1000
  8. Observe the log output You should see messages like:

    2017-10-30 09:53:02.231  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000]
    2017-10-30 09:53:19.732  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005]
    2017-10-30 09:53:53.242  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
  9. Another interesting demonstration combines gemfire-cq with the http-gemfire example.

dataflow:> stream create --name stocks --definition "http --port=9090 | gemfire-json-server --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
dataflow:> stream create --name stock_watch --definition "gemfire-cq --query='Select * from /Stocks where symbol=''VMW''' | log" --deploy
  1. You’re done!

3.4.3. Using the Cloud Foundry Server

Additional Prerequisites
  • A Cloud Foundry instance

  • Running instance of a rabbit service in Cloud Foundry

  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.

  • The Spring Cloud Data Flow Cloud Foundry Server

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "http://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Test region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Orders --type=REPLICATE
  4. Create the stream using the Data Flow Shell

    This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders region. For simplicity, we will avoid creating a data structure for the order. Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the cq-event-expression property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this. In order to display the contents in the log, we will invoke toString() on the instance.

    dataflow:>stream create --name orders --definition " gemfire-cq  --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString()  | log" --deploy
    Created and deployed new stream 'events'
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink

    cf logs <log-sink-app-name>
  7. Using gfsh, create and update some cache entries

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>put --region Orders  --value-class java.lang.Integer --key 01234 --value 1000
    gfsh>put --region Orders  --value-class java.lang.Integer --key 11234 --value 1005
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 100
    gfsh>put --region Orders  --value-class java.lang.Integer --key 31234 --value 999
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 1000
  8. Observe the log output You should see messages like:

    2017-10-30 09:53:02.231  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000]
    2017-10-30 09:53:19.732  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005]
    2017-10-30 09:53:53.242  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
  9. Another interesting demonstration combines gemfire-cq with the http-gemfire example.

    dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --json=true --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
    
    dataflow:>stream create --name stock_watch --definition "gemfire-cq  --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --query='SELECT * from /Stocks where symbol=''VMW''' --cq-event-expression=#this.toString()  | log" --deploy
    
    dataflow:>http post --target http://data-flow-server-dpvuo77-stocks-http.apps.scdf-gcp.springapps.io/ --contentType application/json --data '{"symbol":"VMW","price":117.06}'
  10. You’re done!

3.4.4. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers

  • How to use Spring Cloud Data Flow’s shell

  • How to create streaming data pipeline to connect and publish CQ events from gemfire :sectnums: :docs_dir: ../../..

3.5. Gemfire to Log Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire endpoint and write to a log using the log sink. The gemfire source creates a CacheListener to monitor events for a region and publish a message whenever an entry is changed.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.5.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.2.1

Monitor and Manage Apache Geode
gfsh>

3.5.2. Using the Local Server

Additional Prerequisites
  • A Running Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
Building and Running the Demo
  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Test

    gfsh>create region --name Test --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  4. Create the stream

    This example creates an gemfire source to which will publish events on a region

    dataflow:>stream create --name events --definition " gemfire --regionName=Test | log" --deploy
    Created and deployed new stream 'events'
    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this

    2017-10-28 17:28:23.275  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId events.log instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log
    2017-10-28 17:28:23.277  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController   : Downloading resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE]
    2017-10-28 17:28:23.311  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController   : Deploying application named [gemfire] as part of stream named [events] with resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE]
    2017-10-28 17:28:23.318  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId events.gemfire instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103311/events.gemfire

    Copy the location of the log sink logs. This is a directory that ends in events.log. The log files will be in stdout_0.log under this directory. You can monitor the output of the log sink using tail, or something similar:

    $tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log/stdout_0.log
  7. Using gfsh, create and update some cache entries

    gfsh>put --region /Test --key 1  --value "value 1"
    gfsh>put --region /Test --key 2  --value "value 2"
    gfsh>put --region /Test --key 3  --value "value 3"
    gfsh>put --region /Test --key 1  --value "new value 1"
  8. Observe the log output You should see messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 1"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 2"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 3"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : new value 1"

    By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You can access any fields using the source’s cache-event-expression property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}' (HINT: You will need to destroy the stream and recreate it to add this property, an exercise left to the reader). Now you should see log messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:1,new_value:value 1}
    2017-10-28 17:41:24.466  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:2,new_value:value 2}
  9. You’re done!

3.5.3. Using the Cloud Foundry Server

Additional Prerequisites
  • A Cloud Foundry instance

  • Running instance of a rabbit service in Cloud Foundry

  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.

  • The Spring Cloud Data Flow Cloud Foundry Server

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "http://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Test region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Test --type=REPLICATE
  4. Create the stream, connecting to the PCC instance as developer. This example creates an gemfire source to which will publish events on a region

    dataflow>stream create --name events --definition " gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --regionName=Test | log" --deploy
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink

    cf logs <log-sink-app-name>
  7. Using gfsh, create and update some cache entries

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>put --region /Test --key 1  --value "value 1"
    gfsh>put --region /Test --key 2  --value "value 2"
    gfsh>put --region /Test --key 3  --value "value 3"
    gfsh>put --region /Test --key 1  --value "new value 1"
  8. Observe the log output

    You should see messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 1"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 2"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 3"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : new value 1"

    By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You can access any fields using the source’s cache-event-expression property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}' (HINT: You will need to destroy the stream and recreate it to add this property, an exercise left to the reader). Now you should see log messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:1,new_value:value 1}
    2017-10-28 17:41:24.466  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:2,new_value:value 2}
  9. You’re done!

3.5.4. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers

  • How to use Spring Cloud Data Flow’s shell

  • How to create streaming data pipeline to connect and publish events from gemfire :sectnums: :docs_dir: ../../..

3.6. Custom Spring Cloud Stream Processor

3.6.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar

3.6.2. Creating the Custom Stream App

We will create a custom Spring Cloud Stream application and run it on Spring Cloud Data Flow. We’ll go through the steps to make a simple processor that converts temperature from Fahrenheit to Celsius. We will be running the demo locally, but all the steps will work in a Cloud Foundry environment as well.

  1. Create a new spring cloud stream project

    • Create a Spring initializer project

    • Set the group to demo.celsius.converter and the artifact name as celsius-converter-processor

    • Choose a message transport binding as a dependency for the custom app There are options for choosing Rabbit MQ or Kafka as the message transport. For this demo, we will use rabbit. Type rabbit in the search bar under Search for dependencies and select Stream Rabbit.

    • Hit the generate project button and open the new project in an IDE of your choice

  2. Develop the app

    We can now create our custom app. Our Spring Cloud Stream application is a Spring Boot application that runs as an executable jar. The application will include two Java classes:

    • CelsiusConverterProcessorAplication.java - the main Spring Boot application class, generated by Spring initializr

    • CelsiusConverterProcessorConfiguration.java - the Spring Cloud Stream code that we will write

      We are creating a transformer that takes a Fahrenheit input and converts it to Celsius. Following the same naming convention as the application file, create a new Java class in the same package called CelsiusConverterProcessorConfiguration.java.

      CelsiusConverterProcessorConfiguration.java
      @EnableBinding(Processor.class)
      public class CelsiusConverterProcessorConfiguration {
      
          @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
          public int convertToCelsius(String payload) {
              int fahrenheitTemperature = Integer.parseInt(payload);
              return (farenheitTemperature-32)*5/9;
          }
      }

      Here we introduced two important Spring annotations. First we annotated the class with @EnableBinding(Processor.class). Second we created a method and annotated it with @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT). By adding these two annotations we have configured this stream app as a Processor (as opposed to a Source or a Sink). This means that the application receives input from an upstream application via the Processor.INPUT channel and sends its output to a downstream application via the Processor.OUTPUT channel.

      The convertToCelsius method takes a String as input for Fahrenheit and then returns the converted Celsius as an integer. This method is very simple, but that is also the beauty of this programming style. We can add as much logic as we want to this method to enrich this processor. As long as we annotate it properly and return valid output, it works as a Spring Cloud Stream Processor. Also note that it is straightforward to unit test this code.

  3. Build the Spring Boot application with Maven

    $cd <PROJECT_DIR>
    $./mvnw clean package
  4. Run the Application standalone

    java -jar target/celsius-converter-processor-0.0.1-SNAPSHOT.jar

    If all goes well, we should have a running standalone Spring Boot Application. Once we verify that the app is started and running without any errors, we can stop it.

3.6.3. Deploying the App to Spring Cloud Data Flow

  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Register the custom processor

    app register --type processor --name convertToCelsius --uri <File URL of the jar file on the local filesystem where you built the project above> --force
  3. Create the stream

    We will create a stream that uses the out of the box http source and log sink and our custom transformer.

    dataflow:>stream create --name convertToCelsiusStream --definition "http  --port=9090 | convertToCelsius | log" --deploy
    
    Created and deployed new stream 'convertToCelsiusStream'
  4. Verify the stream is successfully deployed

    dataflow:>stream list
  5. Verify that the apps have successfully deployed

    dataflow:>runtime apps
    2016-09-27 10:03:11.988  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.log instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984991968/convertToCelsiusStream.log
    2016-09-27 10:03:12.397  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.convertToCelsius instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984992392/convertToCelsiusStream.convertToCelsius
    2016-09-27 10:03:14.445  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.http instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984994440/convertToCelsiusStream.http
  6. Post sample data to the http endpoint: localhost:9090 (9090 is the port we specified for the http source in this case)

    dataflow:>http post --target http://localhost:9090 --data 76
    > POST (text/plain;Charset=UTF-8) http://localhost:9090 76
    > 202 ACCEPTED
  7. Open the log file for the convertToCelsiusStream.log app to see the output of our stream

    tail -f /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-7563139704229890655/convertToCelsiusStream-1474990317406/convertToCelsiusStream.log/stdout_0.log

    You should see the temperature you posted converted to Celsius!

2016-09-27 10:05:34.933  INFO 95616 --- [CelsiusStream-1] log.sink                                 : 24

3.6.4. Summary

In this sample, you have learned:

  • How to write a custom Processor stream application

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

4. Task / Batch

4.1. Batch Job on Cloud Foundry

In this demonstration, you will learn how to orchestrate short-lived data processing application (eg: Spring Batch Jobs) using Spring Cloud Task and Spring Cloud Data Flow on Cloud Foundry.

4.1.1. Prerequisites

  • Local PCFDev instance

  • Local install of cf CLI command line tool

  • Running instance of mysql in PCFDev

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • The Spring Cloud Data Flow Cloud Foundry Server running in PCFDev

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. Don’t worry about creating a Redis service. We won’t need it. If you are familiar with Cloud Foundry application manifests, we recommend creating a manifest for the the Data Flow server as shown here.

    As of this writing, there is a typo on the SPRING_APPLICATION_JSON entry in the sample manifest. SPRING_APPLICATION_JSON must be followed by : and the JSON string must be wrapped in single quotes. Alternatively, you can replace that line with MAVEN_REMOTE_REPOSITORIES_REPO1_URL: repo.spring.io/libs-snapshot. If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.
  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>

4.1.2. Building and Running the Demo

PCF 1.7.12 or greater is required to run Tasks on Spring Cloud Data Flow. As of this writing, PCFDev and PWS supports builds upon this version.
  1. Task support needs to be enabled on pcf-dev. Being logged as admin, issue the following command:

    cf enable-feature-flag task_creation
    Setting status of task_creation as admin...
    
    OK
    
    Feature task_creation Enabled.
    For this sample, all you need is the mysql service and in PCFDev, the mysql service comes with a different plan. From CF CLI, create the service by: cf create-service p-mysql 512mb mysql and bind this service to dataflow-server by: cf bind-service dataflow-server mysql.
    All the apps deployed to PCFDev start with low memory by default. It is recommended to change it to at least 768MB for dataflow-server. Ditto for every app spawned by Spring Cloud Data Flow. Change the memory by: cf set-env dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_MEMORY 512. Likewise, we would have to skip SSL validation by: cf set-env dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SKIP_SSL_VALIDATION true.
  2. Tasks in Spring Cloud Data Flow require an RDBMS to host "task repository" (see here for more details), so let’s instruct the Spring Cloud Data Flow server to bind the mysql service to each deployed task:

    $ cf set-env dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_TASK_SERVICES mysql
    $ cf restage dataflow-server
    We only need mysql service for this sample.
  3. As a recap, here is what you should see as configuration for the Spring Cloud Data Flow server:

    cf env dataflow-server
    
    ....
    User-Provided:
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_DOMAIN: local.pcfdev.io
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_MEMORY: 512
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_ORG: pcfdev-org
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_PASSWORD: pass
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SKIP_SSL_VALIDATION: false
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SPACE: pcfdev-space
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_TASK_SERVICES: mysql
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_URL: https://api.local.pcfdev.io
    SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_USERNAME: user
    
    No running env variables have been set
    
    No staging env variables have been set
  4. Notice that dataflow-server application is started and ready for interaction via dataflow-server.local.pcfdev.io endpoint

  5. Build and register the batch-job example from Spring Cloud Task samples. For convenience, the final uber-jar artifact is provided with this sample.

    dataflow:>app register --type task --name simple_batch_job --uri https://github.com/spring-cloud/spring-cloud-dataflow-samples/raw/master/src/main/asciidoc/tasks/simple-batch-job/batch-job-1.3.0.BUILD-SNAPSHOT.jar
  6. Create the task with simple-batch-job application

    dataflow:>task create foo --definition "simple_batch_job"
    Unlike Streams, the Task definitions don’t require explicit deployment. They can be launched on-demand, scheduled, or triggered by streams.
  7. Verify there’s still no Task applications running on PCFDev - they are listed only after the initial launch/staging attempt on PCF

    $ cf apps
    Getting apps in org pcfdev-org / space pcfdev-space as user...
    OK
    
    name              requested state   instances   memory   disk   urls
    dataflow-server   started           1/1         768M     512M   dataflow-server.local.pcfdev.io
  8. Let’s launch foo

    dataflow:>task launch foo
  9. Verify the execution of foo by tailing the logs

    $ cf logs foo
    Retrieving logs for app foo in org pcfdev-org / space pcfdev-space as user...
    
    2016-08-14T18:48:54.22-0700 [APP/TASK/foo/0]OUT Creating container
    2016-08-14T18:48:55.47-0700 [APP/TASK/foo/0]OUT
    
    2016-08-14T18:49:06.59-0700 [APP/TASK/foo/0]OUT 2016-08-15 01:49:06.598  INFO 14 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job1]] launched with the following parameters: [{}]
    
    ...
    ...
    
    2016-08-14T18:49:06.78-0700 [APP/TASK/foo/0]OUT 2016-08-15 01:49:06.785  INFO 14 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job1]] completed with the following parameters: [{}] and the following status: [COMPLETED]
    
    ...
    ...
    
    2016-08-14T18:49:07.36-0700 [APP/TASK/foo/0]OUT 2016-08-15 01:49:07.363  INFO 14 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job2]] launched with the following parameters: [{}]
    
    ...
    ...
    
    2016-08-14T18:49:07.53-0700 [APP/TASK/foo/0]OUT 2016-08-15 01:49:07.536  INFO 14 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job2]] completed with the following parameters: [{}] and the following status: [COMPLETED]
    
    ...
    ...
    
    2016-08-14T18:49:07.71-0700 [APP/TASK/foo/0]OUT Exit status 0
    2016-08-14T18:49:07.78-0700 [APP/TASK/foo/0]OUT Destroying container
    2016-08-14T18:49:08.47-0700 [APP/TASK/foo/0]OUT Successfully destroyed container
    Verify job1 and job2 operations embedded in simple-batch-job application are launched independently and they returned with the status COMPLETED.
    Unlike LRPs in Cloud Foundry, tasks are short-lived, so the logs aren’t always available. They are generated only when the Task application runs; at the end of Task operation, the container that ran the Task application is destroyed to free-up resources.
  10. List Tasks in Cloud Foundry

    $ cf apps
    Getting apps in org pcfdev-org / space pcfdev-space as user...
    OK
    
    name              requested state   instances   memory   disk   urls
    dataflow-server   started           1/1         768M     512M   dataflow-server.local.pcfdev.io
    foo               stopped           0/1         1G       1G
  11. Verify Task execution details

    dataflow:>task execution list
    ╔══════════════════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
    ║        Task Name         │ID│         Start Time         │          End Time          │Exit Code║
    ╠══════════════════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
    ║foo                       │1 │Sun Aug 14 18:49:05 PDT 2016│Sun Aug 14 18:49:07 PDT 2016│0        ║
    ╚══════════════════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝
  12. Verify Job execution details

    dataflow:>job execution list
    ╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
    ║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
    ╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
    ║2  │1      │job2     │Sun Aug 14 18:49:07 PDT 2016│1                    │Destroyed         ║
    ║1  │1      │job1     │Sun Aug 14 18:49:06 PDT 2016│1                    │Destroyed         ║
    ╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

4.1.3. Summary

In this sample, you have learned:

  • How to register and orchestrate Spring Batch jobs in Spring Cloud Data Flow

  • How to use the cf CLI in the context of Task applications orchestrated by Spring Cloud Data Flow

  • How to verify task executions and task repository

4.2. Batch File Ingest

In this demonstration, you will learn how to create a data processing application using Spring Batch which will then be run within Spring Cloud Data Flow.

4.2.1. Prerequisites

  • A Running Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

4.2.2. Batch File Ingest Demo Overview

The source for the demo project is located in here. The sample is a Spring Boot application that demonstrates how to read data from a flat file, perform processing on the records, and store the transformed data into a database using Spring Batch.

The key classes for creating the batch job are:

  • BatchConfiguration.java - this is where we define our batch job, the step and components that are used read, process, and write our data. In the sample we use a FlatFileItemReader which reads a delimited file, a custom PersonItemProcessor to transform the data, and a JdbcBatchItemWriter to write our data to a database.

  • Person.java - the domain object representing the data we are reading and processing in our batch job. The sample data contains records made up of a persons first and last name.

  • PersonItemProcessor.java - this class is an ItemProcessor implementation which receives records after they have been read and before they are written. This allows us to transform the data between these two steps. In our sample ItemProcessor implementation, we simply transform the first and last name of each Person to uppercase characters.

  • Application.java - the main entry point into the Spring Boot application which is used to launch the batch job

Resource files are included to set up the database and provide sample data:

  • schema-all.sql - this is the database schema that will be created when the application starts up. In this sample, an in-memory database is created on start up and destroyed when the application exits.

  • data.csv - sample data file containing person records used in the demo

This example expects to use the Spring Cloud Data Flow Server’s embedded H2 database. If you wish to use another repository, be sure to add the correct dependencies to the pom.xml and update the schema-all.sql.

4.2.3. Building and Running the Demo

  1. Build the demo JAR

    $ mvn clean package
  2. Register the task

    dataflow:>app register --name fileIngest --type task --uri file:///path/to/target/ingest-X.X.X.jar
    Successfully registered application 'task:fileIngest'
    dataflow:>
  3. Create the task

    dataflow:>task create fileIngestTask --definition fileIngest
    Created new task 'fileIngestTask'
    dataflow:>
  4. Launch the task

    dataflow:>task launch fileIngestTask --arguments "filePath=classpath:data.csv --spring.cloud.task.closecontext_enable=false"
    Launched task 'fileIngestTask'
    dataflow:>
  5. Inspect logs

    The log file path for the launched task can be found in the local server output, for example:

    2017-10-27 14:58:18.112  INFO 19485 --- [nio-9393-exec-6] o.s.c.d.spi.local.LocalTaskLauncher      : launching task fileIngestTask-8932f73d-f17a-4bba-b44d-3fd9df042ac0
       Logs will be in /var/folders/6x/tgtx9xbn0x16xq2sx1j2rld80000gn/T/spring-cloud-dataflow-983191515779755562/fileIngestTask-1509130698071/fileIngestTask-8932f73d-f17a-4bba-b44d-3fd9df042ac0
  6. Verify Task execution details

    dataflow:>task execution list
    ╔══════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
    ║  Task Name   │ID│         Start Time         │          End Time          │Exit Code║
    ╠══════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
    ║fileIngestTask│1 │Fri Oct 27 14:58:20 EDT 2017│Fri Oct 27 14:58:20 EDT 2017│0        ║
    ╚══════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝
  7. Verify Job execution details

    dataflow:>job execution list
    ╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
    ║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
    ╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
    ║1  │1      │ingestJob│Fri Oct 27 14:58:20 EDT 2017│1                    │Created           ║
    ╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

4.2.4. Summary

In this sample, you have learned:

  • How to create a data processing batch job application

  • How to register and orchestrate Spring Batch jobs in Spring Cloud Data Flow

  • How to verify status via logs and shell commands

5. Stream Launching Batch Job

5.1. Batch File Ingest - SFTP

In the Batch File Ingest demonstration we built a Spring Batch application that would deploy into Spring Cloud Data Flow as a task and process a file embedded in the batch job JAR. This time we will build upon that sample but rather than deploying as a task in Spring Cloud Data Flow, we will create a Stream. This stream will poll an SFTP server and for each new file that it finds it launches the batch job to download the file and process it.

5.1.1. Prerequisites

  • Running instance of Kafka

  • Running instance of Redis

  • A Running Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • Either a remote or local host accepting SFTP connections.

  • A database tool such as DBeaver to inspect the database contents

To simplify the dependencies and configuration in this example, we will use our local machine acting as an SFTP server.

5.1.2. Batch File Ingest SFTP Demo Overview

The source for the demo project is located in the batch/file-ingest-sftp directory at the top-level of this repository. The code in this directory is built upon the same code found in Batch File Ingest.

The key modifications from the Batch File Ingest sample are:

  • BatchConfiguration - The main change to this class is the addition of a StepExecutionLister. This listener gets set into the configuration of step1 so on execution of the step, the listener will fetch the provided file. Since we are now fetching a file from a remote resource and downloading it for processing, we obtain the remote file location from a JobParameter named remoteFilePath and the path to where the downloaded file will be stored as under the JobParameter named localFilePath.

  • SftpRemoteResource - To obtain the file from SFTP, this class was created utilizing the SftpRemoteFileTemplate class from Spring Integration. We create a new bean from this class in BatchConfiguration and the StepExecutionListener uses it to fetch files.

Additionally we use Redis to persist the paths of files we have seen on the SFTP server. We persist this data rather than storing it in memory so the the seen files won’t be sent for processing in the event of a failure.

5.1.3. Building and Running the Demo

  1. Build the demo JAR

    From the root of this project:

    $ cd batch/file-ingest-sftp
    $ mvn clean package
  2. Create the data directories

    Now we create directories where the batch job expects to find files that would be on the remote SFTP server as well as where they should be transferred locally. These paths must exist prior to running the batch job.

    If you are using a non-local SFTP server, the /tmp/remote-files directory would be created on the SFTP server and /tmp/local-files would be created on your local machine.
    $ mkdir -p /tmp/remote-files /tmp/local-files
  3. Register the the SFTP source and the Task Launcher Local sink

    With our Spring Cloud Data Flow server running, we register the SFTP source and task-launcher-local sink. The SFTP source application will do the work of polling for new files and when received, it sends a message to the task-launcher-local to launch the batch job for that file.

    In the Spring Cloud Data Flow shell:

    dataflow:>app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-source-kafka:2.0.0.BUILD-SNAPSHOT
    Successfully registered application 'source:sftp'
    dataflow:>app register --name task-launcher-local --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-local-sink-kafka:2.0.0.M1
    Successfully registered application 'sink:task-launcher-local'
  4. Create and deploy the stream

    Now lets create and deploy the stream which will start polling the SFTP server and when new files arrive launch the batch job.

    you must replace --username=user, --password=pass and --batch-resource-uri=file:////path/to/sftp-ingest.jar below to their respective values. The --username= and --password= parameters are the credentials for your local (or remote) user and --batch-resource-uri= is the fully qualified path to the sample ingest JAR we built above. If rather than using the local system as an SFTP server, to specify the host use the --host= parameter and optionally --port=. If not defined, --host defaults to 127.0.0.1 and port defaults to 22.
    dataflow:>stream create --name inboundSftp --definition "sftp --username=user --password=pass --allow-unknown-keys=true --task-launcher-output=true --remote-dir=/tmp/remote-files/ --batch-resource-uri=file:////path/to/sftp-ingest.jar --data-source-url=jdbc:h2:tcp://localhost:19092/mem:dataflow --data-source-user-name=sa --local-file-path-job-parameter-value=/tmp/local-files/ | task-launcher-local" --deploy
    Created new stream 'inboundSftp'
    Deployment request has been sent
  5. Verify Stream deployment

    We can see the status of the streams to be deployed with stream list, for example:

    dataflow:>stream list
    ╔═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═════════════╗
    ║Stream Name│                                                                     Stream Definition                                                                     │   Status    ║
    ╠═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════╣
    ║inboundSftp│sftp --username=user --password=****** --allow-unknown-keys=true --task-launcher-output=true --remote-dir=/tmp/remote-files/                               │The stream   ║
    ║           │--batch-resource-uri=file:///path/to/spring-cloud-dataflow-samples/batch/file-ingest-sftp/target/ingest-sftp-1.0.0.jar                                     │has been     ║
    ║           │--data-source-url=jdbc:h2:tcp://localhost:19092/mem:dataflow --data-source-user-name=sa --local-file-path-job-parameter-value=/tmp/local-files/ |          │successfully ║
    ║           │task-launcher-local                                                                                                                                        │deployed     ║
    ╚═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═════════════╝
  6. Inspecting logs

    In the event the stream failed to deploy, or you would like to inspect the logs for any reason, the log paths to applications created within the inboundSftp stream will be printed to console where the Spring Cloud Data Flow server was launched from, for example:

    2018-04-27 11:13:50.361  INFO 46308 --- [nio-9393-exec-8] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId inboundSftp.task-launcher-local instance 0.
       Logs will be in /var/folders/6x/tgtx9xbn0x16xq2sx1j2rld80000gn/T/spring-cloud-deployer-1671726770179703111/inboundSftp-1524842030314/inboundSftp.task-launcher-local
    ...
    ...
    2018-04-27 11:13:50.369  INFO 46308 --- [nio-9393-exec-8] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId inboundSftp.sftp instance 0.
       Logs will be in /var/folders/6x/tgtx9xbn0x16xq2sx1j2rld80000gn/T/spring-cloud-deployer-1671726770179703111/inboundSftp-1524842030363/inboundSftp.sftp

    In this example, the logs for the SFTP application would be in:

    /var/folders/6x/tgtx9xbn0x16xq2sx1j2rld80000gn/T/spring-cloud-deployer-1671726770179703111/inboundSftp-1524842030363/inboundSftp.sftp

    The log files contained in this directory would be useful to debug issues such as SFTP connection failures.

    Additionally, the logs for the task-launcher-local application would be in:

    /var/folders/6x/tgtx9xbn0x16xq2sx1j2rld80000gn/T/spring-cloud-deployer-1671726770179703111/inboundSftp-1524842030314/inboundSftp.task-launcher-local

    Since we utilize the task-launcher-local application to launch batch jobs upon receiving new files, this file would contain the start up logs of the task-launcher-local application but also print out the log paths to all applications deployed from it. The log files for each launched task can also be inspected as needed for debugging or verification.

  7. Add data

    Normally data would be arriving on an SFTP server, but since we are running this sample locally we will simulate that by adding data into the path specified by --remote-dir. A sample data file can be found in the data/ directory of the sample project.

    Lets copy data/people.csv into the /tmp/remote-files directory which is acting as the remote SFTP server directory. This file will be detected by the SFTP application that is polling the remote directory and launch a batch job for processing.

    $ cp data/people.csv /tmp/remote-files
  8. Inspect Job Executions

    After data is received and the batch job runs, it will be recorded as a Job Execution. We can view job executions by for example issuing the following command in the Spring Cloud Data Flow shell:

    dataflow:>job execution list
    ╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
    ║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
    ╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
    ║1  │1      │ingestJob│Tue May 01 23:34:05 EDT 2018│1                    │Destroyed         ║
    ╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

    As well as list more details about that specific job execution:

    dataflow:>job execution display --id 1
    ╔═══════════════════════╤════════════════════════════╗
    ║          Key          │           Value            ║
    ╠═══════════════════════╪════════════════════════════╣
    ║Job Execution Id       │1                           ║
    ║Task Execution Id      │1                           ║
    ║Task Instance Id       │1                           ║
    ║Job Name               │ingestJob                   ║
    ║Create Time            │Tue May 01 23:34:05 EDT 2018║
    ║Start Time             │Tue May 01 23:34:05 EDT 2018║
    ║End Time               │Tue May 01 23:34:06 EDT 2018║
    ║Running                │false                       ║
    ║Stopping               │false                       ║
    ║Step Execution Count   │1                           ║
    ║Execution Status       │COMPLETED                   ║
    ║Exit Status            │COMPLETED                   ║
    ║Exit Message           │                            ║
    ║Definition Status      │Destroyed                   ║
    ║Job Parameters         │                            ║
    ║run.id(LONG)           │1                           ║
    ║remoteFilePath(STRING) │/tmp/remote-files/people.csv║
    ║localFilePath(STRING)  │/tmp/local-files/people.csv ║
    ╚═══════════════════════╧════════════════════════════╝
  9. Verify data

    When the the batch job runs, we download the file to the local directory of /tmp/local-files and then transform that data into uppercase names and store the data in the database.

    You may use any database tool that supports the H2 database to inspect the data. In this example we use the database tool DBeaver. Lets inspect the table to ensure our data was processed correctly.

    Within DBeaver, create a connection to the database using the JDBC URL of jdbc:h2:tcp://localhost:19092/mem:dataflow. Upon connection expand the PUBLIC schema, then expand Tables and then double click on the table PEOPLE. When the table data loads, click the "Data" tab and the transformed data from the CSV file will appear containing the records from the file uppercased.

  10. Seen file caching

    Since we are storing file paths that have been seen on the SFTP server, updating or adding to /tmp/remote-files/people.csv will not cause a new batch job to run. If using the example data file above simply copy the file as a new name, for example:

    $ cp data/people.csv /tmp/remote-files/people2.csv

    Refreshing the contents of the database table will show the new data that was transformed and stored. The job execution list, job execution display --id X and database inspection commands above will let you view details about subsequent runs spawned from new files arriving.

    Alternatively you can delete a single seen files from Redis by for example:

    127.0.0.1:6379> DEL sftpSource "/tmp/remote-files/people.csv"
    (integer) 1
    127.0.0.1:6379>

    Or delete all seen files, for example:

    127.0.0.1:6379> DEL sftpSource
    (integer) 1
    127.0.0.1:6379>

    These files should be deleted from /tmp/remote-files prior to deleting them from Redis, otherwise they will be seen again and re-processed.

5.1.4. Using the Cloud Foundry Server

Additional Prerequisites
  • Cloud Foundry instance

  • A mysql service instance

  • A rabbit service instance

  • A redis service instance

  • The Spring Cloud Data Flow Cloud Foundry Server

  • An SFTP server accessible from the Cloud Foundry instance

The Cloud Foundry Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-cloudfoundry/target

Although you can run the Data Flow Cloud Foundry Server locally and configure it to deploy to any Cloud Foundry instance, we will deploy the server to Cloud Foundry as recommended.
  1. Verify that CF instance is reachable (Your endpoint urls will be different from what is shown here).

    $ cf api
    API endpoint: https://api.system.io (API version: ...)
    
    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    No apps found
  2. Follow the instructions to deploy the Spring Cloud Data Flow Cloud Foundry server. The following manifest file can be used, replacing values as needed:

    ---
    applications:
    - name: dataflow-server
      host: dataflow-server
      memory: 2G
      disk_quota: 2G
      instances: 1
      path: /PATH/TO/SPRING-CLOUD-DATAFLOW-SERVER-CLOUDFOUNDRY-JAR
      env:
        SPRING_APPLICATION_NAME: dataflow-server
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_URL: YOUR_CF_URL
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_ORG: YOUR_CF_ORG
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SPACE: YOUR_CF_SPACE
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_DOMAIN: YOUR_CF_DOMAIN
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_USERNAME: YOUR_CF_USER
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_PASSWORD: YOUR_CF_PASSWORD
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_STREAM_SERVICES: rabbit
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_TASK_SERVICES: mysql
        SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SKIP_SSL_VALIDATION: true
        SPRING_APPLICATION_JSON: '{"maven": { "remote-repositories": { "repo1": { "url": "https://repo.spring.io/libs-release"}, "repo2": { "url": "https://repo.spring.io/libs-snapshot"}, "repo3": { "url": "https://repo.spring.io/libs-milestone"} } } }'
    services:
      - mysql
      - redis

    If your Cloud Foundry installation is behind a firewall, you may need to install the stream apps used in this sample in your internal Maven repository and configure the server to access that repository.

  3. Once you have successfully executed cf push, verify the dataflow server is running

    $ cf apps
    Getting apps in org [your-org] / space [your-space] as user...
    OK
    
    name                 requested state   instances   memory   disk   urls
    dataflow-server      started           1/1         1G       1G     dataflow-server.app.io
  4. Notice that the dataflow-server application is started and ready for interaction via the url endpoint

  5. Connect the shell with server running on Cloud Foundry, e.g., dataflow-server.app.io

    $ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
    $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
    
      ____                              ____ _                __
     / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
     \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
      ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
     |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
      ____ |_|    _          __|___/                 __________
     |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
     | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
     | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
     |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
    
    
    Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
    server-unknown:>
    server-unknown:>dataflow config server http://dataflow-server.app.io
    Successfully targeted http://dataflow-server.app.io
    dataflow:>
Building and Running the Demo
  1. Build the demo JAR

    Building upon the code in batch/file-ingest-sftp, in this demo we utilize Spring Cloud Connectors to automatically bind Cloud Foundry services such as MySQL and Redis.

    From the root of this project:

    $ cd batch/file-ingest-sftp-cf
    $ mvn clean package

    The resulting target/ingest-sftp-cf-1.0.0.jar artifact must be uploaded to a remote location such as an HTTP server or Maven repository that is accessible to your Cloud Foundry installation. For convenience, a pre-built demo artifact can be found at: https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest-sftp-cf/artifacts/ingest-sftp-cf-1.0.0.jar

  2. Create the data directory

    A directory must be created on the SFTP server where the batch job will find files and download for processing. This path must exist prior to running the batch job can can be any location that is accessible by the configured SFTP user. On the SFTP server create a directory, for example:

    $ mkdir /tmp/remote-files
  3. Register the the SFTP source and the Task Launcher Cloud Foundry sink

    With the Spring Cloud Data Flow server running, the SFTP source and task-launcher-cloudfoundry sink needs to be registered. The SFTP source application will do the work of polling for new files and when received, it sends a message to the task-launcher-cloudfoundry to launch the batch job for that file.

    In the Spring Cloud Data Flow shell:

    dataflow:>app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-source-rabbit:2.0.0.BUILD-SNAPSHOT
    Successfully registered application 'source:sftp'
    dataflow:>app register --name task-launcher-cloudfoundry --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-cloudfoundry-sink-rabbit:2.0.0.BUILD-SNAPSHOT
    Successfully registered application 'sink:task-launcher-local'
  4. Create and deploy the stream

    Now a stream needs to be created that will poll the SFTP server, launching the batch job when new files arrive.

    Create the stream:

    You must replace --username=user, --password=pass and --host=1.1.1.1 below to their respective values. The --username= and --password= parameters are the credentials for your remote SFTP user. The --batch-resource-uri= parameter is the path to the batch artifact to use. In this Stream definition, the published sample batch artifact JAR is used. If you would like to use a custom built artifact, replace this value with the artifact location.
    dataflow:>stream create --name inboundSftp --definition "sftp --username=user --password=pass --host=1.1.1.1 --allow-unknown-keys=true --task-launcher-output=true --remote-dir=/tmp/remote-files --batch-resource-uri=https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest-sftp-cf/artifacts/ingest-sftp-cf-1.0.0.jar --local-file-path-job-parameter-value=/tmp/ | task-launcher-cloudfoundry --spring.cloud.deployer.cloudfoundry.services=mysql"
    Created new stream 'inboundSftp'
    dataflow:>

    Deploy the stream:

    You must replace CF_USER, CF_PASSWORD, CF_ORG, CF_SPACE, and CF_URL below with the appropriate values for your setup. The values will be used by the task launcher to launch tasks.
    dataflow:>stream deploy inboundSftp --properties "app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.username=CF_USER,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.password=CF_PASSWORD,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.org=CF_ORG,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.space=CF_SPACE,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.url=CF_URL,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.skip-ssl-validation=true,app.task-launcher-cloudfoundry.spring.cloud.deployer.cloudfoundry.apiTimeout=30000,deployer.sftp.cloudfoundry.services=redis"
    Deployment request has been sent for stream 'inboundSftp'
    
    dataflow:>
  5. Verify Stream deployment

    The status of the stream to be deployed can be queried with stream list, for example:

    dataflow:>stream list
    ╔═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
    ║Stream Name│                                                                     Stream Definition                                                                                │      Status      ║
    ╠═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
    ║inboundSftp│sftp --password='******' --local-file-path-job-parameter-value=/tmp/ --host=1.1.1.1 --remote-dir=/tmp/remote-files --allow-unknown-keys=true                          │The stream has    ║
    ║           │--task-launcher-output=true                                                                                                                                           |been successfully ║
    ║           |--batch-resource-uri=https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest-sftp-cf/artifacts/ingest-sftp-cf-1.0.0.jar |deployed          ║
    ║           |--username=user | task-launcher-cloudfoundry --spring.cloud.deployer.cloudfoundry.services=mysql                                                                      |                  ║
    ╚═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
  6. Inspecting logs

    In the event the stream failed to deploy, or you would like to inspect the logs for any reason, the logs can be obtained from individual applications. First list the deployed apps:

    $ cf apps
    Getting apps in org cf_org / space cf_space as cf_user...
    OK
    
    name                                                             requested state   instances   memory   disk   urls
    dataflow-server                                                  started           1/1         2G       2G     dataflow-server.app.io
    dataflow-server-N5RYLDj-inboundSftp-sftp                         started           1/1         1G       1G     dataflow-server-N5RYLDj-inboundSftp-sftp.dataflow-server.app.io
    dataflow-server-N5RYLDj-inboundSftp-task-launcher-cloudfoundry   started           1/1         1G       1G     dataflow-server-N5RYLDj-inboundSftp-task-launcher-cloudfoundry.dataflow-server.app.io

    In this example, the logs for the SFTP application can be viewed by:

    cf logs dataflow-server-N5RYLDj-inboundSftp-sftp --recent

    The log files of this application would be useful to debug issues such as SFTP connection failures.

    Additionally, the logs for the task-launcher-local application can be viewed by:

    cf logs dataflow-server-N5RYLDj-inboundSftp-task-launcher-cloudfoundry --recent

    Since the task-launcher-cloudfoundry application is used to launch batch jobs upon receiving new files, this log would contain the start up logs of the task-launcher-cloudfoundry application but also log the name and other information of all applications deployed from it. The application log file for each launched task can also be inspected as needed for debugging or verification.

  7. Add data

    A sample data file can be found in the data/ directory of the sample project. Copy data/people.csv into the /tmp/remote-files directory of the remote SFTP server directory. This file will be detected by the SFTP application that is polling the remote directory and launch a batch job for processing.

  8. Inspect Job Executions

    After data is received and the batch job runs, it will be recorded as a Job Execution. We can view job executions by for example issuing the following command in the Spring Cloud Data Flow shell:

    dataflow:>job execution list
    ╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
    ║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
    ╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
    ║1  │1      │ingestJob│Thu Jun 07 13:46:42 EDT 2018│1                    │Destroyed         ║
    ╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

    As well as list more details about that specific job execution:

    dataflow:>job execution display --id 1
    ╔═══════════════════════╤════════════════════════════╗
    ║          Key          │           Value            ║
    ╠═══════════════════════╪════════════════════════════╣
    ║Job Execution Id       │1                           ║
    ║Task Execution Id      │1                           ║
    ║Task Instance Id       │1                           ║
    ║Job Name               │ingestJob                   ║
    ║Create Time            │Thu Jun 07 13:46:42 EDT 2018║
    ║Start Time             │Thu Jun 07 13:46:42 EDT 2018║
    ║End Time               │Thu Jun 07 13:46:44 EDT 2018║
    ║Running                │false                       ║
    ║Stopping               │false                       ║
    ║Step Execution Count   │1                           ║
    ║Execution Status       │COMPLETED                   ║
    ║Exit Status            │COMPLETED                   ║
    ║Exit Message           │                            ║
    ║Definition Status      │Destroyed                   ║
    ║Job Parameters         │                            ║
    ║run.id(LONG)           │1                           ║
    ║remoteFilePath(STRING) │/tmp/remote-files/1012.csv  ║
    ║localFilePath(STRING)  │/tmp/1012.csv               ║
    ╚═══════════════════════╧════════════════════════════╝
  9. Verification of Data and Seen Files

    Verification of data loaded by the batch job and seen file tracking can be accomplished in the same way as with Local Server using the appropriate tools. Consult the documentation for the service broker on your platform (PWS, PCF, etc) for information on how to connect to the backing service.

5.1.5. Summary

In this sample, you have learned:

  • How to integrate SFTP file fetching into your batch job

  • How to create and launch a stream to poll files on an SFTP server and launch a batch job

  • How to verify status via logs and shell commands

  • How to run the SFTP file ingest batch job on Cloud Foundry

6. Analytics

6.1. Twitter Analytics

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from TwitterStream and compute simple analytics over data-in-transit using Field-Value-Counter.

We will take you through the steps to configure Spring Cloud Data Flow’s Local server.

6.1.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar

6.1.2. Building and Running the Demo

  1. Register the out-of-the-box applications for the Kafka binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-kafka-10-maven
  2. Create and deploy the following streams

    dataflow:>stream create tweets --definition "twitterstream --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET> | log"
    Created new stream 'tweets'
    dataflow:>stream create tweetlang  --definition ":tweets.twitterstream > field-value-counter --fieldName=lang --name=language" --deploy
    Created and deployed new stream 'tweetlang'
    dataflow:>stream create tagcount --definition ":tweets.twitterstream > field-value-counter --fieldName=entities.hashtags.text --name=hashtags" --deploy
    Created and deployed new stream 'tagcount'
    dataflow:>stream deploy tweets
    Deployed stream 'tweets'
    To get a consumerKey and consumerSecret you need to register a twitter application. If you don’t already have one set up, you can create an app at the Twitter Developers site to get these credentials. The tokens <CONSUMER_KEY>, <CONSUMER_SECRET>, <ACCESS_TOKEN>, and <ACCESS_TOKEN_SECRET> are required to be replaced with your account credentials.
  3. Verify the streams are successfully deployed. Where: (1) is the primary pipeline; (2) and (3) are tapping the primary pipeline with the DSL syntax <stream-name>.<label/app name> [e.x. :tweets.twitterstream]; and (4) is the final deployment of primary pipeline

    dataflow:>stream list
  4. Notice that tweetlang.field-value-counter, tagcount.field-value-counter, tweets.log and tweets.twitterstream Spring Cloud Stream applications are running as Spring Boot applications within the local-server.

    2016-02-16 11:43:26.174  INFO 10189 --- [nio-9393-exec-2] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:field-value-counter-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-6990537012958280418/tweetlang-1455651806160/tweetlang.field-value-counter
    2016-02-16 11:43:26.206  INFO 10189 --- [nio-9393-exec-3] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:field-value-counter-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-6990537012958280418/tagcount-1455651806202/tagcount.field-value-counter
    2016-02-16 11:43:26.806  INFO 10189 --- [nio-9393-exec-4] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:log-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-6990537012958280418/tweets-1455651806800/tweets.log
    2016-02-16 11:43:26.813  INFO 10189 --- [nio-9393-exec-4] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:twitterstream-source:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-6990537012958280418/tweets-1455651806800/tweets.twitterstream
  5. Verify that two field-value-counter with the names hashtags and language is listing successfully

    dataflow:>field-value-counter list
    ╔════════════════════════╗
    ║Field Value Counter name║
    ╠════════════════════════╣
    ║hashtags                ║
    ║language                ║
    ╚════════════════════════╝
  6. Verify you can query individual field-value-counter results successfully

    dataflow:>field-value-counter display hashtags
    Displaying values for field value counter 'hashtags'
    ╔══════════════════════════════════════╤═════╗
    ║                Value                 │Count║
    ╠══════════════════════════════════════╪═════╣
    ║KCA                                   │   40║
    ║PENNYSTOCKS                           │   17║
    ║TEAMBILLIONAIRE                       │   17║
    ║UCL                                   │   11║
    ║...                                   │   ..║
    ║...                                   │   ..║
    ║...                                   │   ..║
    ╚══════════════════════════════════════╧═════╝
    
    dataflow:>field-value-counter display language
    Displaying values for field value counter 'language'
    ╔═════╤═════╗
    ║Value│Count║
    ╠═════╪═════╣
    ║en   │1,171║
    ║es   │  337║
    ║ar   │  296║
    ║und  │  251║
    ║pt   │  175║
    ║ja   │  137║
    ║..   │  ...║
    ║..   │  ...║
    ║..   │  ...║
    ╚═════╧═════╝
  7. Go to Dashboard accessible at localhost:9393/dashboard and launch the Analytics tab. From the default Dashboard menu, select the following combinations to visualize real-time updates on field-value-counter.

    • For real-time updates on language tags, select:

      1. Metric Type as Field-Value-Counters

      2. Stream as language

      3. Visualization as Bubble-Chart or Pie-Chart

    • For real-time updates on hashtags tags, select:

      1. Metric Type as Field-Value-Counters

      2. Stream as hashtags

      3. Visualization as Bubble-Chart or Pie-Chart

Twitter Analytics Visualization

6.1.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

  • How to create streaming data pipeline to compute simple analytics using Twitter Stream and Field Value Counter applications

7. Data Science

7.1. Species Prediction

In this demonstration, you will learn how to use PMML model in the context of streaming data pipeline orchestrated by Spring Cloud Data Flow.

We will present the steps to prep, configure and rub Spring Cloud Data Flow’s Local server, a Spring Boot application.

7.1.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • Running instance of Kafka

7.1.2. Building and Running the Demo

  1. Register the out-of-the-box applications for the Kafka binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-kafka-10-maven
  2. Create and deploy the following stream

    dataflow:>stream create --name pmmlTest --definition "http --server.port=9001 | pmml --modelLocation=https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-modules/master/pmml-processor/src/test/resources/iris-flower-classification-naive-bayes-1.pmml.xml --inputs='Sepal.Length=payload.sepalLength,Sepal.Width=payload.sepalWidth,Petal.Length=payload.petalLength,Petal.Width=payload.petalWidth' --outputs='Predicted_Species=payload.predictedSpecies' --inputType='application/x-spring-tuple' --outputType='application/json'| log" --deploy
    Created and deployed new stream 'pmmlTest'
    The built-in pmml processor will load the given PMML model definition and create an internal object representation that can be evaluated quickly. When the stream receives the data, it will be used as the input for the evaluation of the analytical model iris-flower-classifier-1 contained in the PMML document. The result of this evaluation is a new field predictedSpecies that was created from the pmml processor by applying a classifier that uses the naiveBayes algorithm.
  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that pmmlTest.http, pmmlTest.pmml, and pmmlTest.log Spring Cloud Stream applications are running within the local-server.

    2016-02-18 06:36:45.396  INFO 31194 --- [nio-9393-exec-1] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:log-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-3038434123335455382/pmmlTest-1455806205386/pmmlTest.log
    2016-02-18 06:36:45.402  INFO 31194 --- [nio-9393-exec-1] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:pmml-processor:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-3038434123335455382/pmmlTest-1455806205386/pmmlTest.pmml
    2016-02-18 06:36:45.407  INFO 31194 --- [nio-9393-exec-1] o.s.c.d.d.l.OutOfProcessModuleDeployer   : deploying module org.springframework.cloud.stream.module:http-source:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-3038434123335455382/pmmlTest-1455806205386/pmmlTest.http
  5. Post sample data to the http endpoint: localhost:9001 (9001 is the port we specified for the http source in this case)

    dataflow:>http post --target http://localhost:9001 --contentType application/json --data "{ \"sepalLength\": 6.4, \"sepalWidth\": 3.2, \"petalLength\":4.5, \"petalWidth\":1.5 }"
    > POST (application/json;charset=UTF-8) http://localhost:9001 { "sepalLength": 6.4, "sepalWidth": 3.2, "petalLength":4.5, "petalWidth":1.5 }
    > 202 ACCEPTED
  6. Verify the predicted outcome by tailing <PATH/TO/LOGAPP/pmmlTest.log/stdout_0.log file. The predictedSpecies in this case is versicolor.

    {
      "sepalLength": 6.4,
      "sepalWidth": 3.2,
      "petalLength": 4.5,
      "petalWidth": 1.5,
      "Species": {
        "result": "versicolor",
        "type": "PROBABILITY",
        "categoryValues": [
          "setosa",
          "versicolor",
          "virginica"
        ]
      },
      "predictedSpecies": "versicolor",
      "Probability_setosa": 4.728207706362856E-9,
      "Probability_versicolor": 0.9133639504608079,
      "Probability_virginica": 0.0866360448109845
    }
  7. Let’s post with a slight variation in data.

    dataflow:>http post --target http://localhost:9001 --contentType application/json --data "{ \"sepalLength\": 6.4, \"sepalWidth\": 3.2, \"petalLength\":4.5, \"petalWidth\":1.8 }"
    > POST (application/json;charset=UTF-8) http://localhost:9001 { "sepalLength": 6.4, "sepalWidth": 3.2, "petalLength":4.5, "petalWidth":1.8 }
    > 202 ACCEPTED
    petalWidth value changed from 1.5 to 1.8
  8. The predictedSpecies will now be listed as virginica.

    {
      "sepalLength": 6.4,
      "sepalWidth": 3.2,
      "petalLength": 4.5,
      "petalWidth": 1.8,
      "Species": {
        "result": "virginica",
        "type": "PROBABILITY",
        "categoryValues": [
          "setosa",
          "versicolor",
          "virginica"
        ]
      },
      "predictedSpecies": "virginica",
      "Probability_setosa": 1.0443898084700813E-8,
      "Probability_versicolor": 0.1750120333571921,
      "Probability_virginica": 0.8249879561989097
    }

7.1.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

  • How to use pmml processor to compute real-time predictions

8. Functions

8.1. Functions in Spring Cloud Data Flow

This is a experiment to run Spring Cloud Function workload in Spring Cloud Data Flow. The current release of function-runner used in this sample is at 1.0 M1 release and it is not recommended to be used in production.

In this sample, you will learn how to use Spring Cloud Function based streaming applications in Spring Cloud Data Flow. To learn more about Spring Cloud Function, check out the project page.

8.1.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
This sample requires access to both Spring’s snapshot and milestone repos. Please follow how-to-guides on how to set repo.spring.io/libs-release and repo.spring.io/libs-milestone as remote repositories in SCDF.

8.1.2. Building and Running the Demo

  1. Register the out-of-the-box applications for the Rabbit binder

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri bit.ly/Celsius-SR1-stream-applications-rabbit-maven (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.3.1.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.
    dataflow:>app import --uri http://bit.ly/Celsius-SR1-stream-applications-rabbit-maven
  2. Register the out-of-the-box function-runner application (current release is at 1.0.0.M1)

    dataflow:>app register --name function-runner --type processor --uri maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.M1 --metadata-uri maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.M1
  3. Create and deploy the following stream

    dataflow:>stream create foo --definition "http --server.port=9001 | function-runner --function.className=com.example.functions.CharCounter --function.location=file:///<PATH/TO/SPRING-CLOUD-FUNCTION>/spring-cloud-function-samples/function-sample/target/spring-cloud-function-sample-1.0.0.BUILD-SNAPSHOT.jar | log" --deploy
    Replace the <PATH/TO/SPRING-CLOUD-FUNCTION> with the correct path.
    The source core of CharCounter function is in Spring cloud Function’s samples repo.
  4. Verify the stream is successfully deployed.

    dataflow:>stream list
  5. Notice that foo-http, foo-function-runner, and foo-log Spring Cloud Stream applications are running as Spring Boot applications and the log locations will be printed in the Local-server console.

    ....
    ....
    2017-10-17 11:43:03.714  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.s.s.AppDeployerStreamDeployer    : Deploying application named [log] as part of stream named [foo] with resource URI [maven://org.springframework.cloud.stream.app:log-sink-rabbit:jar:1.2.0.RELEASE]
    2017-10-17 11:43:04.379  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId foo.log instance 0.
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gs/T/spring-cloud-dataflow-6549025456609489200/foo-1508265783715/foo.log
    2017-10-17 11:43:04.380  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.s.s.AppDeployerStreamDeployer    : Deploying application named [function-runner] as part of stream named [foo] with resource URI [file:/var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gs/T/deployer-resource-cache8941581850579153886/http-c73a62adae0abd7ec0dee91d891575709f02f8c9]
    2017-10-17 11:43:04.384  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId foo.function-runner instance 0.
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gs/T/spring-cloud-dataflow-6549025456609489200/foo-1508265784380/foo.function-runner
    2017-10-17 11:43:04.385  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.s.s.AppDeployerStreamDeployer    : Deploying application named [http] as part of stream named [foo] with resource URI [maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:1.2.0.RELEASE]
    2017-10-17 11:43:04.391  INFO 18409 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId foo.http instance 0.
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gs/T/spring-cloud-dataflow-6549025456609489200/foo-1508265784385/foo.http
    ....
    ....
  6. Post sample data to the http endpoint: localhost:9001 (9001 is the port we specified for the http source in this case)

    dataflow:>http post --target http://localhost:9001 --data "hello world"
    > POST (text/plain) http://localhost:9001 hello world
    > 202 ACCEPTED
    
    
    dataflow:>http post --target http://localhost:9001 --data "hmm, yeah, it works now!"
    > POST (text/plain) http://localhost:9001 hmm, yeah, it works now!
    > 202 ACCEPTED
  7. Tail the log-sink’s standard-out logs to see the character counts

    $ tail -f /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gs/T/spring-cloud-dataflow-6549025456609489200/foo-1508265783715/foo.log/stdout_0.log
    
    ....
    ....
    ....
    ....
    2017-10-17 11:45:39.363  INFO 19193 --- [on-runner.foo-1] log-sink       : 11
    2017-10-17 11:46:40.997  INFO 19193 --- [on-runner.foo-1] log-sink       : 24
    ....
    ....

8.1.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

  • How to use the out-of-the-box function-runner application in Spring Cloud Data Flow

9. Micrometer

9.1. SCDF metrics with InfluxDB and Grafana

In this demonstration, you will learn how Micrometer can help to monitor your Spring Cloud Data Flow (SCDF) streams using InfluxDB and Grafana.

InfluxDB is a real-time storage for time-series data, such as SCDF metrics. It supports downsampling, automatically expiring and deleting unwanted data, as well as backup and restore. Analysis of data is done via a SQL-like query language.

Grafana is open source metrics Dashboard platform. It supports multiple backend time-series databases including InluxDB.

The architecture (Fig.1) builds on the Spring Boot Micrometer functionality. When a micrometer-registry-influx dependency is found on the classpath the Spring Boot auto-configures the metrics export for InfluxDB.

The Spring Cloud Stream (SCSt) applications inherit the mircometer functionality, allowing them to compute and send various application metrics to the configured time-series database.

scdf micrometer influxdb grafana architecture
Figure 1. SCDF metrics analyzis with InfluxDB and Grafana

Out of the box, SCSt sends core metrics such as CPU, Memory, MVC and Health to name some. Among those the Spring Integration metrics allows computing the Rate and the Latency of the messages in the SCDF streams.

Unlike Spring Cloud Data Flow Metrics Collector, metrics here are sent synchronously over HTTP not through a Binder channel topic.

All Spring Cloud Stream App Starers enrich the standard dimensional tags with the following SCDF specific tags:

tag name SCDF property default value

stream.name

spring.cloud.dataflow.stream.name

unknown

application.name

spring.cloud.dataflow.stream.app.label

unknown

instance.index

instance.index

0

application.guid

spring.cloud.application.guid

unknown

application.type

spring.cloud.dataflow.stream.app.type

unknown

For custom app starters that don’t extend from the core parent, you should add the app-starters-common : org.springframework.cloud.stream.app dependency to enable the SCDF tags.

Below we will present the steps to prep, configure the demo of Spring Cloud Data Flow’s Local server integration with InfluxDB. For other deployment environment, such as Cloud Foundry or Kubernetes, additional configurations might be required.

9.1.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • Running instance of Kafka

  • Spring Cloud Stream 2.x based Time and Log applications starters, pre-built with io.micrometer:micrometer-registry-influx dependency.

    Next versions of the SCSt App Initializr utility would add support for Micrometer dependencies to facilitate the injection of micrometer-registries with SCSt apps.

9.1.2. Building and Running the Demo

  1. Register time and log applications that are pre-built with io.micrometer:micrometer-registry-influx. The next version of SCSt App Initializr allows adding Micrometer registry dependencies as well.

    app register --name time2 --type source --uri file://<path-to-your-time-app>/time-source-kafka-2.0.0.BUILD-SNAPSHOT.jar --metadata-uri file://<path-to-your-time-app>/time-source-kafka-2.0.0.BUILD-SNAPSHOT-metadata.jar
    
    app register --name log2 --type sink --uri file://<path-to-your-log-app>/log-sink-kafka-2.0.0.BUILD-SNAPSHOT.jar --metadata-uri file://<path-to-your-log-app>/log-sink-kafka-2.0.0.BUILD-SNAPSHOT-metadata.jar
  2. Create InfluxDB and Grafana Docker containers

    docker run -d --name grafana -p 3000:3000 grafana/grafana:5.1.0
    
    docker run -d --name influxdb -p 8086:8086 influxdb:1.5.2-alpine
  3. Create and deploy the following stream

    dataflow:>stream create --name t2 --definition "time2 | log2"
    
    dataflow:>stream deploy --name t2 --properties "app.*.management.metrics.export.influx.db=myinfluxdb"

    The app.*.management.metrics.export.influx.db=myinfluxdb instructs the time2 and log2 apps to use the myinfluxdb database (created automatically).

    By default, the InfluxDB server runs on localhost:8086. You can add the app.*.management.metrics.export.influx.uri={influxbb-server-url} property to alter the default location.

    You can connect to the InfluxDB and explore the measurements

    docker exec -it influxdb /bin/bash
    root:/# influx
    > show databases
    > use myinfluxdb
    > show measurements
    > select * from spring_integration_send limit 10
  4. Configure Grafana

    • Open Grafana UI (localhost:3000) and log-in (user: admin, password: admin).

    • Create InfluxDB datasource called: influx_auto_DataFlowMetricsCollector that connects to our myinfluxdb influx database.

      InfluxDB DataSource
      Table 1. DataSource Properties

      Name

      influx_auto_DataFlowMetricsCollector

      Type

      InfluxDB

      Host

      localhost:8086

      Access

      Browser

      Database

      myinfluxdb

      User (DB)

      admin

      Password (DB)

      admin

      For previous Grafana 4.x set the Access property to direct instead.
    • Import the scdf-influxdb-dashboard.json dashboard

      grafana influx dashboard

9.1.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

  • How to use InfluxDB and Grafana to monitor and visualize Spring Cloud Stream application metrics. :sectnums: :docs_dir: ../..

9.2. SCDF metrics with Prometheus and Grafana

In this demonstration, you will learn how Micrometer can help to monitor your Spring Cloud Data Flow Streams using Prometheus and Grafana.

Prometheus is time series database used for monitoring of highly dynamic service-oriented architectures. In a world of microservices, its support for multi-dimensional data collection and querying is a particular strength.

Grafana is open source metrics Dashboard platform. It supports multiple backend time-series databases including Prometheus.

The architecture (Fig.1) builds on the Spring Boot Micrometer functionality. When a micrometer-registry-prometheus dependency is found on the classpath the Spring Boot auto-configures the metrics export for Prometheus.

The Spring Cloud Stream (SCSt) applications inherit the mircometer functionality, allowing them to compute and send various application metrics to the configured time-series database.

scdf micrometer prometheus grafana architecture
Figure 2. SCDF metrics analyzis with Prometheus and Grafana

Out of the box, SCSt sends core metrics such as CPU, Memory, MVC and Health to name some. Among those the Spring Integration metrics allows computing the Rate and the Latency of the messages in the SCDF streams.

Unlike Spring Cloud Data Flow Metrics Collector, metrics here are sent synchronously over HTTP not through a Binder channel topic.

All Spring Cloud Stream App Starers enrich the standard dimensional tags with the following SCDF specific tags:

tag name SCDF property default value

stream.name

spring.cloud.dataflow.stream.name

unknown

application.name

spring.cloud.dataflow.stream.app.label

unknown

instance.index

instance.index

0

application.guid

spring.cloud.application.guid

unknown

application.gype

spring.cloud.dataflow.stream.app.type

unknown

For custom app starters that don’t extend from the core parent, you should add the app-starters-common : org.springframework.cloud.stream.app dependency to enable the SCDF tags.

Prometheus employs the pull-metrics model, called metrics scraping. Spring Boot provides an actuator endpoint available at /actuator/prometheus to present a Prometheus scrape with the appropriate format.

Furthermore Prometheus requires a mechanism to discover the target applications to be monitored (e.g. the URLs of the SCSt app instances). Targets may be statically configured via the static_configs parameter or dynamically discovered using one of the supported service-discovery mechanisms.

The SCDF Prometheus Service Discovery is a standalone (Spring Boot) service, that uses the runtime/apps endpoint to retrieve the URLs of the running SCDF applications and generate targets.json file. The targets.json file is compliant with the <file_sd_config> Prometheus discovery format.

Below we will present the steps to prepare, configure the demo of Spring Cloud Data Flow’s Local server integration with Prometheus. For other deployment environment, such as Cloud Foundry or Kubernetes, additional configurations might be required.

9.2.1. Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.
  • A running local Data Flow Server

The Local Data Flow Server is Spring Boot application available for download or you can build it yourself. If you build it yourself, the executable jar will be in spring-cloud-dataflow-server-local/target

To run the Local Data Flow server Open a new terminal session:

$cd  <PATH/TO/SPRING-CLOUD-DATAFLOW-LOCAL-JAR>
$java -jar spring-cloud-dataflow-server-local-<VERSION>.jar
  • Running instance of Kafka

  • Spring Cloud Stream 2.x based Time and Log applications starters, pre-built with io.micrometer:micrometer-registry-prometheus dependency.

    Next versions of the SCSt App Initializr utility would add support for Micrometer dependencies to facilitate the injection of micrometer-registries with SCSt apps.

9.2.2. Building and Running the Demo

  1. Register time and log applications that are pre-built with io.micrometer:micrometer-registry-prometheus. The next version of SCSt App Initializr allows adding Micrometer registry dependencies as well.

    app register --name time2 --type source --uri file://<path-to-your-time-app>/time-source-kafka-2.0.0.BUILD-SNAPSHOT.jar --metadata-uri file://<path-to-your-time-app>/time-source-kafka-2.0.0.BUILD-SNAPSHOT-metadata.jar
    
    app register --name log2 --type sink --uri file://<path-to-your-log-app>/log-sink-kafka-2.0.0.BUILD-SNAPSHOT.jar --metadata-uri file://<path-to-your-log-app>/log-sink-kafka-2.0.0.BUILD-SNAPSHOT-metadata.jar
  2. Create and deploy the following stream

    dataflow:>stream create --name t2 --definition "time2 | log2"
    
    dataflow:>stream deploy --name t2 --properties "app.*.management.endpoints.web.exposure.include=prometheus,app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration"

    The deployment properties make sure that the prometheus actuator is enabled and the Spring Boot security is disabled

  3. Build and start the SCDF Prometheus Service Discovery application

    cd ./spring-cloud-dataflow-samples/micrometer/spring-cloud-dataflow-prometheus-service-discovery
    ./mvnw clean install

    For convenience, the final spring-cloud-dataflow-prometheus-service-discovery-0.0.1-SNAPSHOT.jar artifact is provided with this sample.

    Start the service discovery application:

    java -jar ./target/spring-cloud-dataflow-prometheus-service-discovery-0.0.1-SNAPSHOT.jar \
    --metrics.prometheus.target.discovery.url=http://localhost:9393/runtime/apps \
    --metrics.prometheus.target.file.path=/tmp/targets.json \
    --metrics.prometheus.target.refresh.rate=10000 \
    --metrics.prometheus.target.mode=local

    It will connect to the SCDF runtime url, and generates /tmp/targets.json files every 10 sec.

  4. Create Prometheus configuration file (prometheus-local-file.yml)

    global:
      scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
      evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
      # scrape_timeout is set to the global default (10s).
    
    # A scrape configuration containing exactly one endpoint to scrape:
    scrape_configs:
      # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
      - job_name: 'scdf'
        metrics_path: '/actuator/prometheus'
        file_sd_configs:
          - files:
            - targets.json
            refresh_interval: 30s

    Configure the file_sd_config discovery mechanism using the generated targets.json:

  5. Start Prometheus

    docker run -d --name prometheus \
     -p 9090:9090 \
     -v <full-path-to>/prometheus-local-file.yml:/etc/prometheus/prometheus.yml \
     -v /tmp/targets.json:/etc/prometheus/targets.json \
     prom/prometheus:v2.2.1

    Pass the prometheus.yml and map the /tmp/targets.json into /etc/prometheus/targets.json

    Use the management UI: localhost:9090/graph to verify that SCDF apps metrics have been collected:

    # Throughput
    rate(spring_integration_send_seconds_count{type="channel"}[60s])
    
    # Latency
    rate(spring_integration_send_seconds_sum{type="channel"}[60s])/rate(spring_integration_send_seconds_count{type="channel"}[60s])
  6. Start Grafana Docker containers

    docker run -d --name grafana -p 3000:3000 grafana/grafana:5.1.0
  7. Configure Grafana

    • Open Grafana UI (localhost:3000) and log-in (user: admin, password: admin).

    • Create Prometheus datasource called: ScdfPrometheus

      Table 2. DataSource Properties

      Name

      ScdfPrometheus

      Type

      Prometheus

      Host

      localhost:9090

      Access

      Browser

      For previous Grafana 4.x set the Access property to direct instead.
    • Import the scdf-prometheus-grafana-dashboard.json dashboard

      grafana prometheus dashboard

9.2.3. Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local server

  • How to use Spring Cloud Data Flow’s shell application

  • How to use Prometheus and Grafana to monitor and visualize Spring Cloud Stream application metrics.