In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an HTTP endpoint and write the payload to a Cassandra database.
We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
host
, port
, username
and password
depending on the Cassandra configuration you are using.book
table in Cassandra using:CREATE KEYSPACE clouddata WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1' } AND DURABLE_WRITES = true; USE clouddata; CREATE TABLE book ( id uuid PRIMARY KEY, isbn text, author text, title text );
Register the out-of-the-box applications for the Kafka binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
Create the stream
dataflow:>stream create cassandrastream --definition "http --server.port=8888 --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --keyspace=clouddata" --deploy Created and deployed new stream 'cassandrastream'
Note | |
---|---|
If Cassandra isn’t running on default port on |
Verify the stream is successfully deployed
dataflow:>stream list
Notice that cassandrastream-http
and cassandrastream-cassandra
Spring Cloud Stream applications are running as Spring Boot applications within the server
as a collocated process.
2015-12-15 15:52:31.576 INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:cassandra-sink:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0 Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.cassandra 2015-12-15 15:52:31.583 INFO 18337 --- [nio-9393-exec-1] o.s.c.d.a.s.l.OutOfProcessModuleDeployer : deploying module org.springframework.cloud.stream.module:http-source:jar:exec:1.0.0.BUILD-SNAPSHOT instance 0 Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-data-flow-284240942697761420/cassandrastream.http
Post sample data pointing to the http
endpoint: localhost:8888
(8888
is the server.port
we specified for the http
source in this case)
dataflow:>http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://localhost:8888 > POST (application/json;charset=UTF-8) http://localhost:8888 {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"} > 202 ACCEPTED
Connect to the Cassandra instance and query the table clouddata.book
to list the persisted records
select * from clouddata.book;
rabbit
service instancecassandra
in Cloud Foundry or from another Cloud providerhost
, port
, username
and password
depending on the Cassandra configuration you are using.Create a book
table in your Cassandra keyspace using:
CREATE TABLE book ( id uuid PRIMARY KEY, isbn text, author text, title text );
Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.
The source code for the Section 4.2, “Batch File Ingest” batch job is located in batch/file-ingest
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Create the stream
dataflow:>stream create cassandrastream --definition "http --spring.cloud.stream.bindings.output.contentType='application/json' | cassandra --ingestQuery='insert into book (id, isbn, title, author) values (uuid(), ?, ?, ?)' --username='<USERNAME>' --password='<PASSWORD>' --port=<PORT> --contact-points=<HOST> --keyspace='<KEYSPACE>'" --deploy Created and deployed new stream 'cassandrastream'
You may want to change the cassandrastream
name in PCF
if you have enabled random application name prefix, you could run into issues with the route name being too long.
+ . Verify the stream is successfully deployed
+
dataflow:>stream list
+
. Notice that cassandrastream-http
and cassandrastream-cassandra
Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry
+
$ cf apps Getting apps in org [your-org] / space [your-space] as user... OK name requested state instances memory disk urls cassandrastream-cassandra started 1/1 1G 1G cassandrastream-cassandra.app.io cassandrastream-http started 1/1 1G 1G cassandrastream-http.app.io dataflow-server started 1/1 1G 1G dataflow-server.app.io
+
. Lookup the url
for cassandrastream-http
application from the list above. Post sample data pointing to the http
endpoint: <YOUR-cassandrastream-http-APP-URL>
+
http post --contentType 'application/json' --data '{"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"}' --target http://<YOUR-cassandrastream-http-APP-URL> > POST (application/json;charset=UTF-8) https://cassandrastream-http.app.io {"isbn": "1599869772", "title": "The Art of War", "author": "Sun Tzu"} > 202 ACCEPTED
+
. Connect to the Cassandra instance and query the table book
to list the data inserted
+
select * from book;
+
. Now, let’s try to take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the cassandrastream-http
application from 1 to 3 instances
+
$ cf scale cassandrastream-http -i 3 Scaling app cassandrastream-http in org user-dataflow / space development as user... OK
+ . Verify App instances (3/3) running successfully
+
$ cf apps Getting apps in org user-dataflow / space development as user... OK name requested state instances memory disk urls cassandrastream-cassandra started 1/1 1G 1G cassandrastream-cassandra.app.io cassandrastream-http started 3/3 1G 1G cassandrastream-http.app.io dataflow-server started 1/1 1G 1G dataflow-server.app.io
+ . You’re done!
In this sample, you have learned:
Local
and Cloud Foundry
serversshell
Cassandra
Pivotal Cloud Foundry
:sectnums:
:docs_dir: ../../..In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http
endpoint and write to MySQL database using jdbc
sink.
We will take you through the steps to configure and Spring Cloud Data Flow server in either a local or Cloud Foundry environment.
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
Create the test
database with a names
table (in MySQL) using:
CREATE DATABASE test; USE test; CREATE TABLE names ( name varchar(255) );
Register the out-of-the-box applications for the Kafka binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
Create the stream
dataflow:>stream create --name mysqlstream --definition "http --server.port=8787 | jdbc --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver --spring.datasource.url='jdbc:mysql://localhost:3306/test'" --deploy Created and deployed new stream 'mysqlstream'
Note | |
---|---|
If MySQL isn’t running on default port on |
Verify the stream is successfully deployed
dataflow:>stream list
Notice that mysqlstream-http
and mysqlstream-jdbc
Spring Cloud Stream applications are running as Spring Boot applications within the Local server
as collocated processes.
2016-05-03 09:29:55.918 INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app mysqlstream.jdbc instance 0 Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream1-1462292995903/mysqlstream.jdbc 2016-05-03 09:29:55.939 INFO 65162 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app mysqlstream.http instance 0 Logs will be in /var/folders/c3/ctx7_rns6x30tq7rb76wzqwr0000gp/T/spring-cloud-dataflow-6850863945840320040/mysqlstream-1462292995934/mysqlstream.http
Post sample data pointing to the http
endpoint: localhost:8787
[8787
is the server.port
we specified for the http
source in this case]
dataflow:>http post --contentType 'application/json' --target http://localhost:8787 --data "{\"name\": \"Foo\"}" > POST (application/json;charset=UTF-8) http://localhost:8787 {"name": "Spring Boot"} > 202 ACCEPTED
Connect to the MySQL instance and query the table test.names
to list the new rows:
select * from test.names;
rabbit
in Cloud Foundrymysql
in Cloud FoundryCreate the names
table (in MySQL) using:
CREATE TABLE names ( name varchar(255) );
Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Create the stream
dataflow:>stream create --name mysqlstream --definition "http | jdbc --tableName=names --columns=name" Created new stream 'mysqlstream' dataflow:>stream deploy --name mysqlstream --properties "deployer.jdbc.cloudfoundry.services=mysql" Deployed stream 'mysqlstream'
Note | |
---|---|
By supplying the |
Verify the stream is successfully deployed
dataflow:>stream list
Notice that mysqlstream-http
and mysqlstream-jdbc
Spring Cloud Stream applications are running as cloud-native (microservice) applications in Cloud Foundry
$ cf apps Getting apps in org user-dataflow / space development as user... OK name requested state instances memory disk urls mysqlstream-http started 1/1 1G 1G mysqlstream-http.app.io mysqlstream-jdbc started 1/1 1G 1G mysqlstream-jdbc.app.io dataflow-server started 1/1 1G 1G dataflow-server.app.io
Lookup the url
for mysqlstream-http
application from the list above. Post sample data pointing to the http
endpoint: <YOUR-mysqlstream-http-APP-URL>
http post --contentType 'application/json' --data "{\"name\": \"Bar"} --target https://mysqlstream-http.app.io " > POST (application/json;charset=UTF-8) https://mysqlstream-http.app.io {"name": "Bar"} > 202 ACCEPTED
Connect to the MySQL instance and query the table names
to list the new rows:
select * from names;
Now, let’s take advantage of Pivotal Cloud Foundry’s platform capability. Let’s scale the mysqlstream-http
application from 1 to 3 instances
$ cf scale mysqlstream-http -i 3 Scaling app mysqlstream-http in org user-dataflow / space development as user... OK
Verify App instances (3/3) running successfully
$ cf apps Getting apps in org user-dataflow / space development as user... OK name requested state instances memory disk urls mysqlstream-http started 3/3 1G 1G mysqlstream-http.app.io mysqlstream-jdbc started 1/1 1G 1G mysqlstream-jdbc.app.io dataflow-server started 1/1 1G 1G dataflow-server.app.io
In this sample, you have learned:
Local
and Cloud Foundry
serversshell
MySQL
Pivotal Cloud Foundry
:sectnums:
:docs_dir: ../../..In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from an http
endpoint and write to Gemfire using the gemfire
sink.
We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.
Note | |
---|---|
For legacy reasons the |
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
If you do not have access an existing Geode installation, install Apache Geode or
Pivotal Gemfire and start the gfsh
CLI in a separate terminal.
_________________________ __ / _____/ ______/ ______/ /____/ / / / __/ /___ /_____ / _____ / / /__/ / ____/ _____/ / / / / /______/_/ /______/_/ /_/ 1.8.0 Monitor and Manage Apache Geode gfsh>
Use gfsh to start a locator and server
gfsh>start locator --name=locator1 gfsh>start server --name=server1
Create a region called Stocks
gfsh>create region --name Stocks --type=REPLICATE
Use the Shell to create the sample stream
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Create the stream
This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol
and price
fields.
The property --json=true
to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way
to store JSON documents in Geode. The keyExpression
property is a SpEL expression used to extract the symbol
value the PdxInstance to use as an entry key.
Note | |
---|---|
PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged. |
dataflow:>stream create --name stocks --definition "http --port=9090 | gemfire --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy Created and deployed new stream 'stocks'
Note | |
---|---|
If the Geode locator isn’t running on default port on |
Verify the stream is successfully deployed
dataflow:>stream list
Post sample data pointing to the http
endpoint: localhost:9090
(9090
is the port
we specified for the http
source)
dataflow:>http post --target http://localhost:9090 --contentType application/json --data '{"symbol":"VMW","price":117.06}' > POST (application/json) http://localhost:9090 {"symbol":"VMW","price":117.06} > 202 ACCEPTED
Using gfsh
, connect to the locator if not already connected, and verify the cache entry was created.
gfsh>get --key='VMW' --region=/Stocks Result : true Key Class : java.lang.String Key : VMW Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl symbol | price ------ | ------ VMW | 117.06
rabbit
service in Cloud Foundrycloudcache
in Cloud Foundry.Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Get the PCC connection information
$ cf service-key cloudcache my-service-key Getting key my-service-key for service instance cloudcache as <user>... { "locators": [ "10.0.16.9[55221]", "10.0.16.11[55221]", "10.0.16.10[55221]" ], "urls": { "gfsh": "http://...", "pulse": "https://.../pulse" }, "users": [ { "password": <password>, "username": "cluster_operator" }, { "password": <password>, "username": "developer" } ] }
Using gfsh
, connect to the PCC instance as cluster_operator
using the service key values and create the Stocks region.
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>create region --name Stocks --type=REPLICATE
Create the stream, connecting to the PCC instance as developer
This example creates an http endpoint to which we will post stock prices as a JSON document containing symbol
and price
fields.
The property --json=true
to enable Geode’s JSON support and configures the sink to convert JSON String payloads to PdxInstance, the recommended way
to store JSON documents in Geode. The keyExpression
property is a SpEL expression used to extract the symbol
value the PdxInstance to use as an entry key.
Note | |
---|---|
PDX serialization is very efficient and supports OQL queries without requiring a custom domain class. Use of custom domain types requires these classes to be in the class path of both the stream apps and the cache server. For this reason, the use of custom payload types is generally discouraged. |
dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --json=true --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
Verify the stream is successfully deployed
dataflow:>stream list
Post sample data pointing to the http
endpoint
Get the url of the http source using cf apps
dataflow:>http post --target http://<http source url> --contentType application/json --data '{"symbol":"VMW","price":117.06}' > POST (application/json) http://... {"symbol":"VMW","price":117.06} > 202 ACCEPTED
Using gfsh
, connect to the PCC instance as cluster_operator
using the service key values.
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>get --key='VMW' --region=/Stocks Result : true Key Class : java.lang.String Key : VMW Value Class : org.apache.geode.pdx.internal.PdxInstanceImpl symbol | price ------ | ------ VMW | 117.06
In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire-cq
(Continuous Query) endpoint and write to a log using the log
sink.
The gemfire-cq
source creates a Continuous Query to monitor events for a region that match the query’s result set and publish a message whenever such an event is emitted. In this example, we simulate monitoring orders to trigger a process whenever
the quantity ordered is above a defined limit.
We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.
Note | |
---|---|
For legacy reasons the |
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
If you do not have access an existing Geode installation, install Apache Geode or
Pivotal Gemfire and start the gfsh
CLI in a separate terminal.
_________________________ __ / _____/ ______/ ______/ /____/ / / / __/ /___ /_____ / _____ / / /__/ / ____/ _____/ / / / / /______/_/ /______/_/ /_/ 1.8.0 Monitor and Manage Apache Geode gfsh>
Use gfsh to start a locator and server
gfsh>start locator --name=locator1 gfsh>start server --name=server1
Create a region called Orders
gfsh>create region --name Orders --type=REPLICATE
Use the Shell to create the sample stream
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Create the stream
This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders
region. For simplicity, we will avoid creating a data structure for the order.
Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the
cq-event-expression
property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this
.
In order to display the contents in the log, we will invoke toString()
on the instance.
dataflow:>stream create --name orders --definition " gemfire-cq --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString() | log" --deploy
Note | |
---|---|
If the Geode locator isn’t running on default port on |
Verify the stream is successfully deployed
dataflow:>stream list
Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this
2017-10-30 09:39:36.283 INFO 8167 --- [nio-9393-exec-5] o.s.c.d.spi.local.LocalAppDeployer : Deploying app with deploymentId orders.log instance 0. Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log
Copy the location of the log
sink logs. This is a directory that ends in orders.log
. The log files will be in stdout_0.log
under this directory. You can monitor the output of the log sink using tail
, or something similar:
$tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-5375107584795488581/orders-1509370775940/orders.log/stdout_0.log
Using gfsh
, create and update some cache entries
gfsh>put --region Orders --value-class java.lang.Integer --key 01234 --value 1000 gfsh>put --region Orders --value-class java.lang.Integer --key 11234 --value 1005 gfsh>put --region Orders --value-class java.lang.Integer --key 21234 --value 100 gfsh>put --region Orders --value-class java.lang.Integer --key 31234 --value 999 gfsh>put --region Orders --value-class java.lang.Integer --key 21234 --value 1000
Observe the log output You should see messages like:
2017-10-30 09:53:02.231 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000] 2017-10-30 09:53:19.732 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005] 2017-10-30 09:53:53.242 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
gemfire-cq
with the http-gemfire example.dataflow:> stream create --name stocks --definition "http --port=9090 | gemfire-json-server --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy dataflow:> stream create --name stock_watch --definition "gemfire-cq --query='Select * from /Stocks where symbol=''VMW''' | log" --deploy
rabbit
service in Cloud Foundrycloudcache
in Cloud Foundry.Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Get the PCC connection information
$ cf service-key cloudcache my-service-key Getting key my-service-key for service instance cloudcache as <user>... { "locators": [ "10.0.16.9[55221]", "10.0.16.11[55221]", "10.0.16.10[55221]" ], "urls": { "gfsh": "http://...", "pulse": "https://.../pulse" }, "users": [ { "password": <password>, "username": "cluster_operator" }, { "password": <password>, "username": "developer" } ] }
Using gfsh
, connect to the PCC instance as cluster_operator
using the service key values and create the Test region.
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>create region --name Orders --type=REPLICATE
Create the stream using the Data Flow Shell
This example creates an gemfire-cq source to which will publish events matching a query criteria on a region. In this case we will monitor the Orders
region. For simplicity, we will avoid creating a data structure for the order.
Each cache entry contains an integer value representing the quantity of the ordered item. This stream will fire a message whenever the value>999. By default, the source emits only the value. Here we will override that using the
cq-event-expression
property. This accepts a SpEL expression bound to a CQEvent. To reference the entire CQEvent instance, we use #this
.
In order to display the contents in the log, we will invoke toString()
on the instance.
dataflow:>stream create --name orders --definition " gemfire-cq --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --query='SELECT * from /Orders o where o > 999' --cq-event-expression=#this.toString() | log" --deploy Created and deployed new stream 'events'
Verify the stream is successfully deployed
dataflow:>stream list
Monitor stdout for the log sink
cf logs <log-sink-app-name>
Using gfsh
, create and update some cache entries
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>put --region Orders --value-class java.lang.Integer --key 01234 --value 1000 gfsh>put --region Orders --value-class java.lang.Integer --key 11234 --value 1005 gfsh>put --region Orders --value-class java.lang.Integer --key 21234 --value 100 gfsh>put --region Orders --value-class java.lang.Integer --key 31234 --value 999 gfsh>put --region Orders --value-class java.lang.Integer --key 21234 --value 1000
Observe the log output You should see messages like:
2017-10-30 09:53:02.231 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=01234; value=1000] 2017-10-30 09:53:19.732 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=CREATE; cq operation=CREATE; key=11234; value=1005] 2017-10-30 09:53:53.242 INFO 8563 --- [ire-cq.orders-1] log-sink : CqEvent [CqName=GfCq1; base operation=UPDATE; cq operation=CREATE; key=21234; value=1000]
Another interesting demonstration combines gemfire-cq
with the http-gemfire example.
dataflow:>stream create --name stocks --definition "http --security.basic.enabled=false | gemfire --json=true --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy dataflow:>stream create --name stock_watch --definition "gemfire-cq --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.25:55221 --query='SELECT * from /Stocks where symbol=''VMW''' --cq-event-expression=#this.toString() | log" --deploy dataflow:>http post --target http://data-flow-server-dpvuo77-stocks-http.apps.scdf-gcp.springapps.io/ --contentType application/json --data '{"symbol":"VMW","price":117.06}'
In this demonstration, you will learn how to build a data pipeline using Spring Cloud Data Flow to consume data from a gemfire
endpoint and write to a log using the log
sink.
The gemfire
source creates a CacheListener to monitor events for a region and publish a message whenever an entry is changed.
We will take you through the steps to configure and run Spring Cloud Data Flow server in either a local or Cloud Foundry environment.
Note | |
---|---|
For legacy reasons the |
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
If you do not have access an existing Geode installation, install Apache Geode or
Pivotal Gemfire and start the gfsh
CLI in a separate terminal.
_________________________ __ / _____/ ______/ ______/ /____/ / / / __/ /___ /_____ / _____ / / /__/ / ____/ _____/ / / / / /______/_/ /______/_/ /_/ 1.8.0 Monitor and Manage Apache Geode gfsh>
Use gfsh to start a locator and server
gfsh>start locator --name=locator1 gfsh>start server --name=server1
Create a region called Test
gfsh>create region --name Test --type=REPLICATE
Use the Shell to create the sample stream
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Create the stream
This example creates an gemfire source to which will publish events on a region
dataflow:>stream create --name events --definition " gemfire --regionName=Test | log" --deploy Created and deployed new stream 'events'
Note | |
---|---|
If the Geode locator isn’t running on default port on |
Verify the stream is successfully deployed
dataflow:>stream list
Monitor stdout for the log sink. When you deploy the stream, you will see log messages in the Data Flow server console like this
2017-10-28 17:28:23.275 INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer : Deploying app with deploymentId events.log instance 0. Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log 2017-10-28 17:28:23.277 INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController : Downloading resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE] 2017-10-28 17:28:23.311 INFO 15603 --- [nio-9393-exec-2] o.s.c.d.s.c.StreamDeploymentController : Deploying application named [gemfire] as part of stream named [events] with resource URI [maven://org.springframework.cloud.stream.app:gemfire-source-rabbit:1.2.0.RELEASE] 2017-10-28 17:28:23.318 INFO 15603 --- [nio-9393-exec-2] o.s.c.d.spi.local.LocalAppDeployer : Deploying app with deploymentId events.gemfire instance 0. Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103311/events.gemfire
Copy the location of the log
sink logs. This is a directory that ends in events.log
. The log files will be in stdout_0.log
under this directory. You can monitor the output of the log sink using tail
, or something similar:
$tail -f /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-dataflow-4093992067314402881/events-1509226103269/events.log/stdout_0.log
Using gfsh
, create and update some cache entries
gfsh>put --region /Test --key 1 --value "value 1" gfsh>put --region /Test --key 2 --value "value 2" gfsh>put --region /Test --key 3 --value "value 3" gfsh>put --region /Test --key 1 --value "new value 1"
Observe the log output You should see messages like:
2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 1" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 2" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 3" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : new value 1"
By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You
can access any fields using the source’s cache-event-expression
property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}'
(HINT: You will need to destroy the stream and recreate it to
add this property, an exercise left to the reader). Now you should see log messages like:
2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log-sink : {key:1,new_value:value 1} 2017-10-28 17:41:24.466 INFO 18986 --- [emfire.events-1] log-sink : {key:2,new_value:value 2}
rabbit
service in Cloud Foundrycloudcache
in Cloud Foundry.Follow the installation instructions to run Spring Cloud Data Flow on Cloud Foundry.
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Get the PCC connection information
$ cf service-key cloudcache my-service-key Getting key my-service-key for service instance cloudcache as <user>... { "locators": [ "10.0.16.9[55221]", "10.0.16.11[55221]", "10.0.16.10[55221]" ], "urls": { "gfsh": "http://...", "pulse": "https://.../pulse" }, "users": [ { "password": <password>, "username": "cluster_operator" }, { "password": <password>, "username": "developer" } ] }
Using gfsh
, connect to the PCC instance as cluster_operator
using the service key values and create the Test region.
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>create region --name Test --type=REPLICATE
Create the stream, connecting to the PCC instance as developer. This example creates an gemfire source to which will publish events on a region
dataflow>stream create --name events --definition " gemfire --username=developer --password=<developer-password> --connect-type=locator --host-addresses=10.0.16.9:55221 --regionName=Test | log" --deploy
Verify the stream is successfully deployed
dataflow:>stream list
Monitor stdout for the log sink
cf logs <log-sink-app-name>
Using gfsh
, create and update some cache entries
gfsh>connect --use-http --url=<gfsh-url> --user=cluster_operator --password=<cluster_operator_password> gfsh>put --region /Test --key 1 --value "value 1" gfsh>put --region /Test --key 2 --value "value 2" gfsh>put --region /Test --key 3 --value "value 3" gfsh>put --region /Test --key 1 --value "new value 1"
Observe the log output
You should see messages like:
2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 1" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 2" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : value 3" 2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log sink : new value 1"
By default, the message payload contains the updated value. Depending on your application, you may need additional information. The data comes from EntryEvent. You
can access any fields using the source’s cache-event-expression
property. This takes a SpEL expression bound to the EntryEvent. Try something like --cache-event-expression='{key:'+key+',new_value:'+newValue+'}'
(HINT: You will need to destroy the stream and recreate it to
add this property, an exercise left to the reader). Now you should see log messages like:
2017-10-28 17:28:52.893 INFO 18986 --- [emfire.events-1] log-sink : {key:1,new_value:value 1} 2017-10-28 17:41:24.466 INFO 18986 --- [emfire.events-1] log-sink : {key:2,new_value:value 2}
The Spring Cloud Data Flow Shell is available for download or you can build it yourself.
Note | |
---|---|
the Spring Cloud Data Flow Shell and Local server implementation are in the same repository and are both built by running |
To run the Shell open a new terminal session:
$ cd <PATH/TO/SPRING-CLOUD-DATAFLOW-SHELL-JAR> $ java -jar spring-cloud-dataflow-shell-<VERSION>.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>
Note | |
---|---|
The Spring Cloud Data Flow Shell is a Spring Boot application that connects to the Data Flow Server’s REST API and supports a DSL that simplifies the process of defining a stream or task and managing its lifecycle. Most of these samples use the shell. If you prefer, you can use the Data Flow UI localhost:9393/dashboard, (or wherever it the server is hosted) to perform equivalent operations. |
We will create a custom Spring Cloud Stream application and run it on Spring Cloud Data Flow. We’ll go through the steps to make a simple processor that converts temperature from Fahrenheit to Celsius. We will be running the demo locally, but all the steps will work in a Cloud Foundry environment as well.
Create a new spring cloud stream project
demo.celsius.converter
and the artifact name as celsius-converter-processor
Rabbit MQ
or Kafka
as the message transport.
For this demo, we will use rabbit
. Type rabbit in the search bar under Search for dependencies and select Stream Rabbit
.Develop the app
We can now create our custom app. Our Spring Cloud Stream application is a Spring Boot application that runs as an executable jar. The application will include two Java classes:
CelsiusConverterProcessorAplication.java
- the main Spring Boot application class, generated by Spring initializrCelsiusConverterProcessorConfiguration.java
- the Spring Cloud Stream code that we will write
We are creating a transformer that takes a Fahrenheit input and converts it to Celsius.
Following the same naming convention as the application file, create a new Java class in the same package called CelsiusConverterProcessorConfiguration.java
.
CelsiusConverterProcessorConfiguration.java.
@EnableBinding(Processor.class) public class CelsiusConverterProcessorConfiguration { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public int convertToCelsius(String payload) { int fahrenheitTemperature = Integer.parseInt(payload); return (farenheitTemperature-32)*5/9; } }
Here we introduced two important Spring annotations.
First we annotated the class with @EnableBinding(Processor.class)
.
Second we created a method and annotated it with @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
.
By adding these two annotations we have configured this stream app as a Processor
(as opposed to a Source
or a Sink
).
This means that the application receives input from an upstream application via the Processor.INPUT
channel and sends its output to a downstream application via the Processor.OUTPUT
channel.
The convertToCelsius
method takes a String
as input for Fahrenheit and then returns the converted Celsius as an integer.
This method is very simple, but that is also the beauty of this programming style.
We can add as much logic as we want to this method to enrich this processor.
As long as we annotate it properly and return valid output, it works as a Spring Cloud Stream Processor. Also note that it is straightforward to unit test this code.
Build the Spring Boot application with Maven
$cd <PROJECT_DIR> $./mvnw clean package
Run the Application standalone
java -jar target/celsius-converter-processor-0.0.1-SNAPSHOT.jar
If all goes well, we should have a running standalone Spring Boot Application. Once we verify that the app is started and running without any errors, we can stop it.
Register the out-of-the-box applications for the Rabbit binder
Note | |
---|---|
These samples assume that the Data Flow Server can access a remote Maven repository, |
dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
Register the custom processor
app register --type processor --name convertToCelsius --uri <File URL of the jar file on the local filesystem where you built the project above> --force
Create the stream
We will create a stream that uses the out of the box http
source and log
sink and our custom transformer.
dataflow:>stream create --name convertToCelsiusStream --definition "http --port=9090 | convertToCelsius | log" --deploy Created and deployed new stream 'convertToCelsiusStream'
Verify the stream is successfully deployed
dataflow:>stream list
Verify that the apps have successfully deployed
dataflow:>runtime apps
2016-09-27 10:03:11.988 INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer : deploying app convertToCelsiusStream.log instance 0 Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984991968/convertToCelsiusStream.log 2016-09-27 10:03:12.397 INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer : deploying app convertToCelsiusStream.convertToCelsius instance 0 Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984992392/convertToCelsiusStream.convertToCelsius 2016-09-27 10:03:14.445 INFO 95234 --- [nio-9393-exec-9] o.s.c.d.spi.local.LocalAppDeployer : deploying app convertToCelsiusStream.http instance 0 Logs will be in /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-3236898888473815319/convertToCelsiusStream-1474984994440/convertToCelsiusStream.http
Post sample data to the http
endpoint: localhost:9090
(9090
is the port
we specified for the http
source in this case)
dataflow:>http post --target http://localhost:9090 --data 76 > POST (text/plain;Charset=UTF-8) http://localhost:9090 76 > 202 ACCEPTED
Open the log file for the convertToCelsiusStream.log
app to see the output of our stream
tail -f /var/folders/2q/krqwcbhj2d58csmthyq_n1nw0000gp/T/spring-cloud-dataflow-7563139704229890655/convertToCelsiusStream-1474990317406/convertToCelsiusStream.log/stdout_0.log
You should see the temperature you posted converted to Celsius!
2016-09-27 10:05:34.933 INFO 95616 --- [CelsiusStream-1] log.sink : 24