Instead of using the shell to create and deploy streams, you can use the Java based DSL provided by the spring-cloud-dataflow-rest-client
module.
The Java DSL is a convenient wrapper around the DataFlowTemplate
class that makes it simple to create and deploy streams programmatically.
To get started, you will need to add the following dependency to your project.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dataflow-rest-client</artifactId> <version>1.3.0.M3</version> </dependency>
You will also need to add a reference to the Spring Milestone Maven repository.
<repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>http://repo.spring.io/libs-milestone-local</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
![]() | Note |
---|---|
A complete sample can be found in the Spring Cloud Data Flow Samples Repository to simplify getting started. |
The classes you will encounter using the Java DSL are StreamBuilder
, StreamDefinition
, Stream
, StreamApplication
, and DataFlowTemplate
.
The entry point is a builder
method on Stream
that takes an instance of a DataFlowTemplate
.
To create an instance of a DataFlowTemplate
you need to provide a URI
location of the Data Flow Server.
![]() | Note |
---|---|
The |
We will now walk though a quick example, using the definition
style.
URI dataFlowUri = URI.create("http://localhost:9393"); DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri); dataFlowOperations.appRegistryOperations().importFromResource( "http://bit.ly/Celsius-RC1-stream-applications-rabbit-maven", true); StreamDefinition streamDefinition = Stream.builder(dataFlowOperations) .name("ticktock") .definition("time | log") .create();
The method create
returns an instance of a StreamDefinition
representing a Stream that has been created but not deployed.
This is called the definition
style since it takes as a single string for the stream definition, just like in the shell.
If applications have not yet been registered in the Data Flow server, you can use the DataFlowOperations
class to register them.
With the StreamDefinition
instance, you have methods available to deploy
or destory
the stream.
Stream stream = streamDefinition.deploy();
The Stream
instance has the methods getStatus
, destroy
and undeploy
to control and query the stream.
If you are going to immediately deploy the stream, there is no need to create a separate local variable of the type StreamDefinition
. You can just chain the calls together.
Stream stream = Stream.builder(dataFlowOperations) .name("ticktock") .definition("time | log") .create() .deploy();
The deploy
method is overloaded to take a java.util.Map
of deployment properties.
The StreamApplication
class is used in the 'fluent' Java DSL style and is discussed in the next section. The StreamBuilder
class is what is returned from the method Stream.builder(dataFlowOperations)
. In larger applications, it is common to create a single instance of the StreamBuilder
as a Spring @Bean
and share it across the application.
The Java DSL offers two styles to create Streams.
definition
style keeps the feel of using the pipes and filters textual DSL in the shell. This style is selected by using the definition
method after setting the stream name, e.g. Stream.builder(dataFlowOperations).name("ticktock").definition(<definition goes here>)
.fluent
style lets you chain together sources, processors and sinks by passing in an instance of a StreamApplication
. This style is selected by using the source
method after setting the stream name, e.g. Stream.builder(dataFlowOperations).name("ticktock").source(<stream application instance goes here>)
. You then chain together processor()
and sink()
methods to create a stream definition.To demonstrate both styles we will create a simple stream using both approaches. A complete sample for you to get started can be found in the Spring Cloud Data Flow Samples Repository
public void definitionStyle() throws Exception{ DataFlowOperations dataFlowOperations = createDataFlowOperations(); Map<String, String> deploymentProperties = createDeploymentProperties(); Stream woodchuck = Stream.builder(dataFlowOperations) .name("woodchuck") .definition("http --server.port=9900 | splitter --expression=payload.split(' ') | log") .create() .deploy(deploymentProperties); waitAndDestroy(woodchuck) } public void fluentStyle() throws Exception { DataFlowOperations dataFlowOperations = createDataFlowOperations(); StreamApplication source = new StreamApplication("http").addProperty("server.port", 9900); StreamApplication processor = new StreamApplication("splitter") .addProperty("producer.partitionKeyExpression", "payload"); StreamApplication sink = new StreamApplication("log") .addDeploymentProperty("count", 2); Stream woodchuck = Stream.builder(dataFlowOperations).name("woodchuck") .source(source) .processor(processor) .sink(sink) .create() .deploy(deploymentProperties); waitAndDestroy(woodchuck) }
The waitAndDestroy
method uses the getStatus
method to poll for the stream’s status.
private void waitAndDestroy(Stream stream) throws InterruptedException { while(!stream.getStatus().equals("deployed")){ System.out.println("Wating for deployment of stream."); Thread.sleep(5000); } System.out.println("Letting the stream run for 2 minutes."); // Let the stream run for 2 minutes Thread.sleep(120000); System.out.println("Destroying stream"); stream.destroy(); }
When using the definition style, the deployment properties are specified as a java.util.Map
in the same manner as using the shell. The method createDeploymentProperties
is defined as:
private Map<String, String> createDeploymentProperties() { Map<String, String> deploymentProperties = new HashMap<>(); deploymentProperties.put("app.splitter.producer.partitionKeyExpression", "payload"); deploymentProperties.put("deployer.log.count", "2"); return deploymentProperties; }
Is this case, application properties are also overridden at deployment time in addition to setting the deployer property count
for the log application.
When using the fluent style, the the deployment properties are added using the method addDeploymentProperty
, e.g. new StreamApplication("log").addDeploymentProperty("count", 2)
and you do not need to prefix the property with deployer.<app_name>
.
![]() | Note |
---|---|
In order to create/deploy your streams, you need to make sure that the corresponding apps have been registered in the DataFlow server first.
Attempting to create or deploy a stream that contains an unknown app will throw an exception. You can register application using the |
dataFlowOperations.appRegistryOperations().importFromResource(
"http://bit.ly/Celsius-RC1-stream-applications-rabbit-maven", true);
The Stream applications can also be beans within your application that are injected in other classes to create Streams.
There are many ways to structure Spring applications, but one way to structure it is to have an @Configuration
class define the StreamBuilder
and StreamApplications
.
@Configuration public StreamConfiguration { @Bean public StreamBuilder builder() { return Stream.builder(new DataFlowTemplate(URI.create("http://localhost:9393"))); } @Bean public StreamApplication httpSource(){ return new StreamApplication("http"); } @Bean public StreamApplication logSink(){ return new StreamApplication("log"); } }
Then in another class you can @Autowire
these classes and deploy a stream.
@Component public MyStreamApps { @Autowired private StreamBuilder streamBuilder; @Autowired private StreamApplication httpSource; @Autowired private StreamApplication logSink; public void deploySimpleStream() { Stream simpleStream = streamBuilder.name("simpleStream") .source(httpSource); .sink(logSink) .create() .deploy(); } }
This style allows you to easily share StreamApplications
across multiple Streams.