3. Streaming

3.1 HTTP to Cassandra Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an HTTP endpoint and write the payload to a Cassandra database.

We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

3.1.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

3.1.2 Running Locally

Additional Prerequisites

  • Spring Cloud Data Flow installed locally Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • Running instance of Kafka
  • Running instance of Apache Cassandra
  • A database utility tool such as DBeaver to connect to the Cassandra instance. You might have to provide host, port, username and password depending on the Cassandra configuration you are using.
  • Create a keyspace and a book table in Cassandra using:
CREATE KEYSPACE clouddata WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1' } AND DURABLE_WRITES = true;
USE clouddata;
CREATE TABLE book  (
    id          uuid PRIMARY KEY,
    isbn        text,
    author      text,
    title       text
);

Building and Running the Demo

  1. Register the out-of-the-box applications for the Kafka binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
  2. Create the stream

    dataflow:>stream create cassandrastream --definition "http --server.port=8888 --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --keyspace=clouddata" --deploy
    
    Created and deployed new stream 'cassandrastream'
    [Note]Note

    If Cassandra isn’t running on default port on localhost or if you need username and password to connect, use one of the following options to specify the necessary connection parameters: --username='<USERNAME>' --password='<PASSWORD>' --port=<PORT> --contact-points=<LIST-OF-HOSTS>

  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that cassandrastream-http and cassandrastream-cassandra Spring Cloud Stream applications are running as Spring Boot applications within the server as a collocated process.

    2015-12-15 15:52:31.576  INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:cassandra-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.cassandra
    2015-12-15 15:52:31.583  INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:http-source:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.http
  5. Post sample data pointing to the http endpoint: localhost:8888 (8888 is the server.port we specified for the http source in this case)

    dataflow:>http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://localhost:8888
    > POST (application/json;charset=UTF-8) http://localhost:8888 {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}
    > 202 ACCEPTED
  6. Connect to the Cassandra instance and query the table clouddata.book to list the persisted records

    select * from clouddata.book;
  7. You’re done!

3.1.3 Running on Cloud Foundry

Additional Prerequisites

  • Cloud Foundry instance
  • A rabbit service instance
  • A Running instance of cassandra in Cloud Foundry or from another Cloud provider
  • A database utility tool such as DBeaver to connect to the Cassandra instance. You might have to provide host, port, username and password depending on the Cassandra configuration you are using.
  • Create a book table in your Cassandra keyspace using:

    CREATE TABLE book  (
        id          uuid PRIMARY KEY,
        isbn        text,
        author      text,
        title       text
    );
  • Spring Cloud Data Flow installed on Cloud Foundry

Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.

Running the Demo

The source code for the Section 4.2, “Batch File Ingest” batch job is located in batch/file-ingest

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Create the stream

    dataflow:>stream create cassandrastream --definition "http --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --username='<USERNAME>' --password='<PASSWORD>' --port=<PORT> --contact-points=<HOST> --keyspace='<KEYSPACE>'" --deploy
    
    Created and deployed new stream 'cassandrastream'

You may want to change the cassandrastream name in PCF if you have enabled random application name prefix, you could run into issues with the route name being too long.

+ . Verify the stream is successfully deployed

+

dataflow:>stream list

+ . Notice that cassandrastream-http and cassandrastream-cassandra Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry

+

$ cf apps
Getting apps in org [your-org] / space [your-space] as user...
OK

name                        requested state   instances   memory   disk   urls
cassandrastream-cassandra   started           1/1         1G       1G     cassandrastream-cassandra.app.io
cassandrastream-http        started           1/1         1G       1G     cassandrastream-http.app.io
dataflow-server             started           1/1         1G       1G     dataflow-server.app.io

+ . Lookup the url for cassandrastream-http application from the list above. Post sample data pointing to the http endpoint: <YOUR-cassandrastream-http-APP-URL>

+

http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://<YOUR-cassandrastream-http-APP-URL>
> POST (application/json;charset=UTF-8) https://cassandrastream-http.app.io {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}
> 202 ACCEPTED

+ . Connect to the Cassandra instance and query the table book to list the data inserted

+

select * from book;

+ . Now, let’s try to take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the cassandrastream-http application from 1 to 3 instances

+

$ cf scale cassandrastream-http -i 3
Scaling app cassandrastream-http in org user-dataflow / space development as user...
OK

+ . Verify App instances (3/3) running successfully

+

$ cf apps
Getting apps in org user-dataflow / space development as user...
OK

name                        requested state   instances   memory   disk   urls
cassandrastream-cassandra   started           1/1         1G       1G     cassandrastream-cassandra.app.io
cassandrastream-http        started           3/3         1G       1G     cassandrastream-http.app.io
dataflow-server             started           1/1         1G       1G     dataflow-server.app.io

+ . You’re done!

3.1.4 Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers
  • How to use Spring Cloud Data Flow’s shell
  • How to create streaming data pipeline to connect and write to Cassandra
  • How to scale applications on Pivotal Cloud Foundry :sectnums: :docs_dir: ../../..

3.2 HTTP to MySQL Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http endpoint and write to MySQL database using jdbc sink.

We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

3.2.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

3.2.2 Running Locally

Additional Prerequisites

  • Spring Cloud Data Flow installed locally Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • Running instance of Kafka
  • Running instance of MySQL
  • A database utility tool such as DBeaver or DbVisualizer
  • Create the test database with a names table (in MySQL) using:

    CREATE DATABASE test;
    USE test;
    CREATE TABLE names
    (
    	name varchar(255)
    );

Building and Running the Demo

  1. Register the out-of-the-box applications for the Kafka binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
  2. Create the stream

    dataflow:>stream create --name mysqlstream --definition "http --server.port=8787 | jdbc --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver --spring.datasource.url='jdbc:mysql://localhost:3306/test'" --deploy
    
    Created and deployed new stream 'mysqlstream'
    [Note]Note

    If MySQL isn’t running on default port on localhost or if you need username and password to connect, use one of the following options to specify the necessary connection parameters: --spring.datasource.url='jdbc:mysql://<HOST>:<PORT>/<NAME>' --spring.datasource.username=<USERNAME> --spring.datasource.password=<PASSWORD>

  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that mysqlstream-http and mysqlstream-jdbc Spring Cloud Stream applications are running as Spring Boot applications within the Local server as collocated processes.

    2016-05-03 09:29:55.918  INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app mysqlstream.jdbc instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream1-1462292995903/mysqlstream.jdbc
    2016-05-03 09:29:55.939  INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app mysqlstream.http instance 0
       Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream-1462292995934/mysqlstream.http
  5. Post sample data pointing to the http endpoint: localhost:8787 [8787 is the server.port we specified for the http source in this case]

    dataflow:>http post --contentType 'application/json' --target http://localhost:8787 --data "{\"name\": \"Foo\"}"
    > POST (application/json;charset=UTF-8) http://localhost:8787 {"name": "Spring Boot"}
    > 202 ACCEPTED
  6. Connect to the MySQL instance and query the table test.names to list the new rows:

    select * from test.names;
  7. You’re done!

3.2.3 Running on Cloud Foundry

Additional Prerequisites

  • Cloud Foundry instance
  • Running instance of rabbit in Cloud Foundry
  • Running instance of mysql in Cloud Foundry
  • A database utility tool such as DBeaver or DbVisualizer
  • Create the names table (in MySQL) using:

    CREATE TABLE names
    (
    	name varchar(255)
    );
  • Spring Cloud Data Flow installed on Cloud Foundry

Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.

Building and Running the Demo

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Create the stream

    dataflow:>stream create --name mysqlstream --definition "http | jdbc --tableName=names --columns=name"
    Created new stream 'mysqlstream'
    
    dataflow:>stream deploy --name mysqlstream --properties "deployer.jdbc.cloudfoundry.services=mysql"
    Deployed stream 'mysqlstream'
    [Note]Note

    By supplying the deployer.jdbc.cloudfoundry.services=mysql property, we are deploying the stream with jdbc-sink to automatically bind to mysql service and only this application in the stream gets the service binding. This also eliminates the requirement to supply datasource credentials in stream definition.

  3. Verify the stream is successfully deployed

    dataflow:>stream list
  4. Notice that mysqlstream-http and mysqlstream-jdbc Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry

    $ cf apps
    Getting apps in org user-dataflow / space development as user...
    OK
    
    name                        requested state   instances   memory   disk   urls
    mysqlstream-http            started           1/1         1G       1G     mysqlstream-http.app.io
    mysqlstream-jdbc            started           1/1         1G       1G     mysqlstream-jdbc.app.io
    dataflow-server             started           1/1         1G       1G     dataflow-server.app.io
  5. Lookup the url for mysqlstream-http application from the list above. Post sample data pointing to the http endpoint: <YOUR-mysqlstream-http-APP-URL>

    http post --contentType 'application/json' --data "{\"name\": \"Bar"} --target https://mysqlstream-http.app.io "
    > POST (application/json;charset=UTF-8) https://mysqlstream-http.app.io {"name": "Bar"}
    > 202 ACCEPTED
  6. Connect to the MySQL instance and query the table names to list the new rows:

    select * from names;
  7. Now, let’s take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the mysqlstream-http application from 1 to 3 instances

    $ cf scale mysqlstream-http -i 3
    Scaling app mysqlstream-http in org user-dataflow / space development as user...
    OK
  8. Verify App instances (3/3) running successfully

    $ cf apps
    Getting apps in org user-dataflow / space development as user...
    OK
    
    name                        requested state   instances   memory   disk   urls
    mysqlstream-http            started           3/3         1G       1G     mysqlstream-http.app.io
    mysqlstream-jdbc            started           1/1         1G       1G     mysqlstream-jdbc.app.io
    dataflow-server             started           1/1         1G       1G     dataflow-server.app.io
  9. You’re done!

3.2.4 Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers
  • How to use Spring Cloud Data Flow’s shell
  • How to create streaming data pipeline to connect and write to MySQL
  • How to scale applications on Pivotal Cloud Foundry :sectnums: :docs_dir: ../../..

3.3 HTTP to Gemfire Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http endpoint and write to Gemfire using the gemfire sink.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

[Note]Note

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.3.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.8.0

Monitor and Manage Apache Geode
gfsh>

3.3.2 Running Locally

Additional Prerequisites

  • Spring Cloud Data Flow installed locally Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • A running instance of Rabbit MQ

Building and Running the Demo

  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Stocks

    gfsh>create region --name Stocks --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  4. Create the stream

    This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol and price fields. The property --json=true to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way to store JSON documents in Geode. The keyExpression property is a SpEL expression used to extract the symbol value the PdxInstance to use as an entry key.

    [Note]Note

    PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged.

    dataflow:>stream create --name stocks --definition "http --port=9090 | gemfire --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
    Created and deployed new stream 'stocks'
    [Note]Note

    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.

  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Post sample data pointing to the http endpoint: localhost:9090 (9090 is the port we specified for the http source)

    dataflow:>http post --target http://localhost:9090 --contentType application/json --data '{"symbol":"VMW","price":117.06}'
    > POST (application/json) http://localhost:9090 {"symbol":"VMW","price":117.06}
    > 202 ACCEPTED
  7. Using gfsh, connect to the locator if not already connected, and verify the cache entry was created.

    gfsh>get --key='VMW' --region=/Stocks
    Result      : true
    Key Class   : java.lang.String
    Key         : VMW
    Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl
    
    symbol | price
    ------ | ------
    VMW    | 117.06
  8. You’re done!

3.3.3 Running on Cloud Foundry

Additional Prerequisites

  • A Cloud Foundry instance
  • Running instance of a rabbit service in Cloud Foundry
  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.
  • Spring Cloud Data Flow installed on Cloud Foundry Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.

Building and Running the Demo

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "https://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Stocks region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Stocks --type=REPLICATE
  4. Create the stream, connecting to the PCC instance as developer

    This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol and price fields. The property --json=true to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way to store JSON documents in Geode. The keyExpression property is a SpEL expression used to extract the symbol value the PdxInstance to use as an entry key.

    [Note]Note

    PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged.

    dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Post sample data pointing to the http endpoint

    Get the url of the http source using cf apps

    dataflow:>http post --target http://<http source url> --contentType application/json --data '{"symbol":"VMW","price":117.06}'
    > POST (application/json) http://... {"symbol":"VMW","price":117.06}
    > 202 ACCEPTED
  7. Using gfsh, connect to the PCC instance as cluster_operator using the service key values.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>get --key='VMW' --region=/Stocks
    Result      : true
    Key Class   : java.lang.String
    Key         : VMW
    Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl
    
    symbol | price
    ------ | ------
    VMW    | 117.06
  8. You’re done!

3.3.4 Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers
  • How to use Spring Cloud Data Flow’s shell
  • How to create streaming data pipeline to connect and write to gemfire :sectnums: :docs_dir: ../../..

3.4 Gemfire CQ to Log Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire-cq (Continuous Query) endpoint and write to a log using the log sink. The gemfire-cq source creates a Continuous Query to monitor events for a region that match the query’s result set and publish a message whenever such an event is emitted. In this example, we simulate monitoring orders to trigger a process whenever the quantity ordered is above a defined limit.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

[Note]Note

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.4.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.8.0

Monitor and Manage Apache Geode
gfsh>

3.4.2 Running Locally

Additional Prerequisites

  • Spring Cloud Data Flow installed locally Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • A running instance of Rabbit MQ

Building and Running the Demo

  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Orders

    gfsh>create region --name Orders --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  4. Create the stream

    This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders region. For simplicity, we will avoid creating a data structure for the order. Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the cq-event-expression property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this. In order to display the contents in the log, we will invoke toString() on the instance.

    dataflow:>stream create --name orders --definition " gemfire-cq --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString() | log" --deploy
    [Note]Note

    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.

  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this

    2017-10-30 09:39:36.283  INFO 8167 --- [nio-9393-exec-5] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId orders.log instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log

    Copy the location of the log sink logs. This is a directory that ends in orders.log. The log files will be in stdout_0.log under this directory. You can monitor the output of the log sink using tail, or something similar:

    $tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log/stdout_0.log
  7. Using gfsh, create and update some cache entries

    gfsh>put --region Orders  --value-class java.lang.Integer --key 01234 --value 1000
    gfsh>put --region Orders  --value-class java.lang.Integer --key 11234 --value 1005
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 100
    gfsh>put --region Orders  --value-class java.lang.Integer --key 31234 --value 999
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 1000
  8. Observe the log output You should see messages like:

    2017-10-30 09:53:02.231  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000]
    2017-10-30 09:53:19.732  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005]
    2017-10-30 09:53:53.242  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
  9. Another interesting demonstration combines gemfire-cq with the http-gemfire example.
dataflow:> stream create --name stocks --definition "http --port=9090 | gemfire-json-server --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
dataflow:> stream create --name stock_watch --definition "gemfire-cq --query='Select * from /Stocks where symbol=''VMW''' | log" --deploy
  1. You’re done!

3.4.3 Running on Cloud Foundry

Additional Prerequisites

  • A Cloud Foundry instance
  • Running instance of a rabbit service in Cloud Foundry
  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.
  • Spring Cloud Data Flow installed on Cloud Foundry

Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.

Building and Running the Demo

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "https://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Test region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Orders --type=REPLICATE
  4. Create the stream using the Data Flow Shell

    This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders region. For simplicity, we will avoid creating a data structure for the order. Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the cq-event-expression property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this. In order to display the contents in the log, we will invoke toString() on the instance.

    dataflow:>stream create --name orders --definition " gemfire-cq  --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString()  | log" --deploy
    Created and deployed new stream 'events'
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink

    cf logs <log-sink-app-name>
  7. Using gfsh, create and update some cache entries

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>put --region Orders  --value-class java.lang.Integer --key 01234 --value 1000
    gfsh>put --region Orders  --value-class java.lang.Integer --key 11234 --value 1005
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 100
    gfsh>put --region Orders  --value-class java.lang.Integer --key 31234 --value 999
    gfsh>put --region Orders  --value-class java.lang.Integer --key 21234 --value 1000
  8. Observe the log output You should see messages like:

    2017-10-30 09:53:02.231  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000]
    2017-10-30 09:53:19.732  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005]
    2017-10-30 09:53:53.242  INFO 8563 --- [ire-cq.orders-1] log-sink                                 : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
  9. Another interesting demonstration combines gemfire-cq with the http-gemfire example.

    dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --json=true --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
    
    dataflow:>stream create --name stock_watch --definition "gemfire-cq  --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --query='SELECT * from /Stocks where symbol=''VMW''' --cq-event-expression=#this.toString()  | log" --deploy
    
    dataflow:>http post --target http://data-flow-server-dpvuo77-stocks-http.apps.scdf-gcp.springapps.io/ --contentType application/json --data '{"symbol":"VMW","price":117.06}'
  10. You’re done!

3.4.4 Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers
  • How to use Spring Cloud Data Flow’s shell
  • How to create streaming data pipeline to connect and publish CQ events from gemfire :sectnums: :docs_dir: ../../..

3.5 Gemfire to Log Demo

In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire endpoint and write to a log using the log sink. The gemfire source creates a CacheListener to monitor events for a region and publish a message whenever an entry is changed.

We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.

[Note]Note

For legacy reasons the gemfire Spring Cloud Stream Apps are named after Pivotal GemFire. The code base for the commercial product has since been open sourced as Apache Geode. These samples should work with compatible versions of Pivotal GemFire or Apache Geode. Herein we will refer to the installed IMDG simply as Geode.

3.5.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

  • A Geode installation with a locator and cache server running

If you do not have access an existing Geode installation, install Apache Geode or Pivotal Gemfire and start the gfsh CLI in a separate terminal.

    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    1.8.0

Monitor and Manage Apache Geode
gfsh>

3.5.2 Running Locally

Additional Prerequisites

  • Spring Cloud Data Flow installed locally Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • A running instance of Rabbit MQ

Building and Running the Demo

  1. Use gfsh to start a locator and server

    gfsh>start locator --name=locator1
    gfsh>start server --name=server1
  2. Create a region called Test

    gfsh>create region --name Test --type=REPLICATE

    Use the Shell to create the sample stream

  3. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  4. Create the stream

    This example creates an gemfire source to which will publish events on a region

    dataflow:>stream create --name events --definition " gemfire --regionName=Test | log" --deploy
    Created and deployed new stream 'events'
    [Note]Note

    If the Geode locator isn’t running on default port on localhost, add the options --connect-type=locator --host-addresses=<host>:<port>. If there are multiple locators, you can provide a comma separated list of locator addresses. This is not necessary for the sample but is typical for production environments to enable fail-over.

  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this

    2017-10-28 17:28:23.275  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId events.log instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log
    2017-10-28 17:28:23.277  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController   : Downloading resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE]
    2017-10-28 17:28:23.311  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController   : Deploying application named [gemfire] as part of stream named [events] with resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE]
    2017-10-28 17:28:23.318  INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer       : Deploying app with deploymentId events.gemfire instance 0.
       Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103311/events.gemfire

    Copy the location of the log sink logs. This is a directory that ends in events.log. The log files will be in stdout_0.log under this directory. You can monitor the output of the log sink using tail, or something similar:

    $tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log/stdout_0.log
  7. Using gfsh, create and update some cache entries

    gfsh>put --region /Test --key 1  --value "value 1"
    gfsh>put --region /Test --key 2  --value "value 2"
    gfsh>put --region /Test --key 3  --value "value 3"
    gfsh>put --region /Test --key 1  --value "new value 1"
  8. Observe the log output You should see messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 1"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 2"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 3"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : new value 1"

    By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You can access any fields using the source’s cache-event-expression property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}' (HINT: You will need to destroy the stream and recreate it to add this property, an exercise left to the reader). Now you should see log messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:1,new_value:value 1}
    2017-10-28 17:41:24.466  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:2,new_value:value 2}
  9. You’re done!

3.5.3 Running on Cloud Foundry

Additional Prerequisites

  • A Cloud Foundry instance
  • Running instance of a rabbit service in Cloud Foundry
  • Running instance of the Pivotal Cloud Cache for PCF (PCC) service cloudcache in Cloud Foundry.
  • Spring Cloud Data Flow installed on Cloud Foundry

Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.

Building and Running the Demo

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Get the PCC connection information

    $ cf service-key cloudcache my-service-key
    Getting key my-service-key for service instance cloudcache as <user>...
    
    {
     "locators": [
      "10.0.16.9[55221]",
      "10.0.16.11[55221]",
      "10.0.16.10[55221]"
     ],
     "urls": {
      "gfsh": "http://...",
      "pulse": "https://.../pulse"
     },
     "users": [
      {
       "password": <password>,
       "username": "cluster_operator"
      },
      {
       "password": <password>,
       "username": "developer"
      }
     ]
    }
  3. Using gfsh, connect to the PCC instance as cluster_operator using the service key values and create the Test region.

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>create region --name Test --type=REPLICATE
  4. Create the stream, connecting to the PCC instance as developer. This example creates an gemfire source to which will publish events on a region

    dataflow>stream create --name events --definition " gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --regionName=Test | log" --deploy
  5. Verify the stream is successfully deployed

    dataflow:>stream list
  6. Monitor stdout for the log sink

    cf logs <log-sink-app-name>
  7. Using gfsh, create and update some cache entries

    gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password>
    gfsh>put --region /Test --key 1  --value "value 1"
    gfsh>put --region /Test --key 2  --value "value 2"
    gfsh>put --region /Test --key 3  --value "value 3"
    gfsh>put --region /Test --key 1  --value "new value 1"
  8. Observe the log output

    You should see messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 1"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 2"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : value 3"
    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log sink                               : new value 1"

    By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You can access any fields using the source’s cache-event-expression property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}' (HINT: You will need to destroy the stream and recreate it to add this property, an exercise left to the reader). Now you should see log messages like:

    2017-10-28 17:28:52.893  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:1,new_value:value 1}
    2017-10-28 17:41:24.466  INFO 18986 --- [emfire.events-1] log-sink                                 : {key:2,new_value:value 2}
  9. You’re done!

3.5.4 Summary

In this sample, you have learned:

  • How to use Spring Cloud Data Flow’s Local and Cloud Foundry servers
  • How to use Spring Cloud Data Flow’s shell
  • How to create streaming data pipeline to connect and publish events from gemfire :sectnums: :docs_dir: ../../..

3.6 Custom Spring Cloud Stream Processor

3.6.1 Prerequisites

  • A Running Data Flow Shell

The Spring Cloud Data Flow Shell is available for download or you can build it yourself.

[Note]Note

the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running ./mvnw install from the project root directory. If you have already run the build, use the jar in spring-cloud-dataflow-shell/target

To run the Shell open a new terminal session:

$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR>
$ java -jar spring-cloud-dataflow-shell-<VERSION>.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
[Note]Note

The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations.

  • A running local Data Flow Server Follow the installation instructions to run Spring Cloud Data Flow on a local host.
  • A Java IDE
  • Maven Installed
  • A running instance of Rabbit MQ

3.6.2 Creating the Custom Stream App

We will create a custom Spring Cloud Stream application and run it on Spring Cloud Data Flow. We’ll go through the steps to make a simple processor that converts temperature from Fahrenheit to Celsius. We will be running the demo locally, but all the steps will work in a Cloud Foundry environment as well.

  1. Create a new spring cloud stream project

    • Create a Spring initializer project
    • Set the group to demo.celsius.converter and the artifact name as celsius-converter-processor
    • Choose a message transport binding as a dependency for the custom app There are options for choosing Rabbit MQ or Kafka as the message transport. For this demo, we will use rabbit. Type rabbit in the search bar under Search for dependencies and select Stream Rabbit.
    • Hit the generate project button and open the new project in an IDE of your choice
  2. Develop the app

    We can now create our custom app. Our Spring Cloud Stream application is a Spring Boot application that runs as an executable jar. The application will include two Java classes:

    • CelsiusConverterProcessorAplication.java - the main Spring Boot application class, generated by Spring initializr
    • CelsiusConverterProcessorConfiguration.java - the Spring Cloud Stream code that we will write

      We are creating a transformer that takes a Fahrenheit input and converts it to Celsius. Following the same naming convention as the application file, create a new Java class in the same package called CelsiusConverterProcessorConfiguration.java.

      CelsiusConverterProcessorConfiguration.java. 

      @EnableBinding(Processor.class)
      public class CelsiusConverterProcessorConfiguration {
      
          @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
          public int convertToCelsius(String payload) {
              int fahrenheitTemperature = Integer.parseInt(payload);
              return (farenheitTemperature-32)*5/9;
          }
      }

      Here we introduced two important Spring annotations. First we annotated the class with @EnableBinding(Processor.class). Second we created a method and annotated it with @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT). By adding these two annotations we have configured this stream app as a Processor (as opposed to a Source or a Sink). This means that the application receives input from an upstream application via the Processor.INPUT channel and sends its output to a downstream application via the Processor.OUTPUT channel.

      The convertToCelsius method takes a String as input for Fahrenheit and then returns the converted Celsius as an integer. This method is very simple, but that is also the beauty of this programming style. We can add as much logic as we want to this method to enrich this processor. As long as we annotate it properly and return valid output, it works as a Spring Cloud Stream Processor. Also note that it is straightforward to unit test this code.

  3. Build the Spring Boot application with Maven

    $cd <PROJECT_DIR>
    $./mvnw clean package
  4. Run the Application standalone

    java -jar target/celsius-converter-processor-0.0.1-SNAPSHOT.jar

    If all goes well, we should have a running standalone Spring Boot Application. Once we verify that the app is started and running without any errors, we can stop it.

3.6.3 Deploying the App to Spring Cloud Data Flow

  1. Register the out-of-the-box applications for the Rabbit binder

    [Note]Note

    These samples assume that the Data Flow Server can access a remote Maven repository, repo.spring.io/libs-release by default. If your Data Flow server is running behind a firewall, or you are using a maven proxy preventing access to public repositories, you will need to install the sample apps in your internal Maven repository and configure the server accordingly. The sample applications are typically registered using Data Flow’s bulk import facility. For example, the Shell command dataflow:>app import --uri dataflow.spring.io/rabbitmq-maven-latest (The actual URI is release and binder specific so refer to the sample instructions for the actual URL). The bulk import URI references a plain text file containing entries for all of the publicly available Spring Cloud Stream and Task applications published to repo.spring.io. For example, source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:2.1.0.RELEASE registers the http source app at the corresponding Maven address, relative to the remote repository(ies) configured for the Data Flow server. The format is maven://<groupId>:<artifactId>:<version> You will need to download the required apps or build them and then install them in your Maven repository, using whatever group, artifact, and version you choose. If you do this, register individual apps using dataflow:>app register…​ using the maven:// resource URI format corresponding to your installed app.

    dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
  2. Register the custom processor

    app register --type processor --name convertToCelsius --uri <File URL of the jar file on the local filesystem where you built the project above> --force
  3. Create the stream

    We will create a stream that uses the out of the box http source and log sink and our custom transformer.

    dataflow:>stream create --name convertToCelsiusStream --definition "http  --port=9090 | convertToCelsius | log" --deploy
    
    Created and deployed new stream 'convertToCelsiusStream'
  4. Verify the stream is successfully deployed

    dataflow:>stream list
  5. Verify that the apps have successfully deployed

    dataflow:>runtime apps
    2016-09-27 10:03:11.988  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.log instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984991968/convertToCelsiusStream.log
    2016-09-27 10:03:12.397  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.convertToCelsius instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984992392/convertToCelsiusStream.convertToCelsius
    2016-09-27 10:03:14.445  INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer       : deploying app convertToCelsiusStream.http instance 0
       Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984994440/convertToCelsiusStream.http
  6. Post sample data to the http endpoint: localhost:9090 (9090 is the port we specified for the http source in this case)

    dataflow:>http post --target http://localhost:9090 --data 76
    > POST (text/plain;Charset=UTF-8) http://localhost:9090 76
    > 202 ACCEPTED
  7. Open the log file for the convertToCelsiusStream.log app to see the output of our stream

    tail -f /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-7563139704229890655/convertToCelsiusStream-1474990317406/convertToCelsiusStream.log/stdout_0.log

    You should see the temperature you posted converted to Celsius!

2016-09-27 10:05:34.933  INFO 95616 --- [CelsiusStream-1] log.sink                                 : 24

3.6.4 Summary

In this sample, you have learned:

  • How to write a custom Processor stream application
  • How to use Spring Cloud Data Flow’s Local server
  • How to use Spring Cloud Data Flow’s shell application