28. Stream Java DSL

Instead of using the shell to create and deploy streams, you can use the Java based DSL provided by the spring-cloud-dataflow-rest-client module. The Java DSL is a convenient wrapper around the DataFlowTemplate class that makes it simple to create and deploy streams programmatically.

To get started, you will need to add the following dependency to your project.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dataflow-rest-client</artifactId>
	<version>1.3.0.M3</version>
</dependency>

You will also need to add a reference to the Spring Milestone Maven repository.

	<repositories>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>http://repo.spring.io/libs-milestone-local</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
[Note]Note

A complete sample can be found in the Spring Cloud Data Flow Samples Repository to simplify getting started.

28.1 Overview

The classes you will encounter using the Java DSL are StreamBuilder, StreamDefinition, Stream, StreamApplication, and DataFlowTemplate. The entry point is a builder method on Stream that takes an instance of a DataFlowTemplate. To create an instance of a DataFlowTemplate you need to provide a URI location of the Data Flow Server.

[Note]Note

The DataFlowTemplate does not support a simple way to configure HTTP basic authentication or OAuth. This will be addressed in a future release.

We will now walk though a quick example, using the definition style.

URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);
dataFlowOperations.appRegistryOperations().importFromResource(
                     "http://bit.ly/Celsius-RC1-stream-applications-rabbit-maven", true);
StreamDefinition streamDefinition = Stream.builder(dataFlowOperations)
                                      .name("ticktock")
                                      .definition("time | log")
                                      .create();

The method create returns an instance of a StreamDefinition representing a Stream that has been created but not deployed. This is called the definition style since it takes as a single string for the stream definition, just like in the shell. If applications have not yet been registered in the Data Flow server, you can use the DataFlowOperations class to register them. With the StreamDefinition instance, you have methods available to deploy or destory the stream.

Stream stream = streamDefinition.deploy();

The Stream instance has the methods getStatus, destroy and undeploy to control and query the stream. If you are going to immediately deploy the stream, there is no need to create a separate local variable of the type StreamDefinition. You can just chain the calls together.

Stream stream = Stream.builder(dataFlowOperations)
                  .name("ticktock")
                  .definition("time | log")
                  .create()
                  .deploy();

The deploy method is overloaded to take a java.util.Map of deployment properties.

The StreamApplication class is used in the 'fluent' Java DSL style and is discussed in the next section. The StreamBuilder class is what is returned from the method Stream.builder(dataFlowOperations). In larger applications, it is common to create a single instance of the StreamBuilder as a Spring @Bean and share it across the application.

28.2 Java DSL styles

The Java DSL offers two styles to create Streams.

  • The definition style keeps the feel of using the pipes and filters textual DSL in the shell. This style is selected by using the definition method after setting the stream name, e.g. Stream.builder(dataFlowOperations).name("ticktock").definition(<definition goes here>).
  • The fluent style lets you chain together sources, processors and sinks by passing in an instance of a StreamApplication. This style is selected by using the source method after setting the stream name, e.g. Stream.builder(dataFlowOperations).name("ticktock").source(<stream application instance goes here>). You then chain together processor() and sink() methods to create a stream definition.

To demonstrate both styles we will create a simple stream using both approaches. A complete sample for you to get started can be found in the Spring Cloud Data Flow Samples Repository

public void definitionStyle() throws Exception{

  DataFlowOperations dataFlowOperations = createDataFlowOperations();
  Map<String, String> deploymentProperties = createDeploymentProperties();

  Stream woodchuck = Stream.builder(dataFlowOperations)
          .name("woodchuck")
          .definition("http --server.port=9900 | splitter --expression=payload.split(' ') | log")
          .create()
          .deploy(deploymentProperties);

  waitAndDestroy(woodchuck)
}

public void fluentStyle() throws Exception {

  DataFlowOperations dataFlowOperations = createDataFlowOperations();

  StreamApplication source = new StreamApplication("http").addProperty("server.port", 9900);

  StreamApplication processor = new StreamApplication("splitter")
                                 .addProperty("producer.partitionKeyExpression", "payload");

  StreamApplication sink = new StreamApplication("log")
                            .addDeploymentProperty("count", 2);

  Stream woodchuck = Stream.builder(dataFlowOperations).name("woodchuck")
          .source(source)
          .processor(processor)
          .sink(sink)
          .create()
          .deploy(deploymentProperties);

  waitAndDestroy(woodchuck)

}

The waitAndDestroy method uses the getStatus method to poll for the stream’s status.

private void waitAndDestroy(Stream stream) throws InterruptedException {

  while(!stream.getStatus().equals("deployed")){
    System.out.println("Wating for deployment of stream.");
    Thread.sleep(5000);
  }

  System.out.println("Letting the stream run for 2 minutes.");
  // Let the stream run for 2 minutes
  Thread.sleep(120000);

  System.out.println("Destroying stream");
  stream.destroy();
}

When using the definition style, the deployment properties are specified as a java.util.Map in the same manner as using the shell. The method createDeploymentProperties is defined as:

private Map<String, String> createDeploymentProperties() {
  Map<String, String> deploymentProperties = new HashMap<>();
  deploymentProperties.put("app.splitter.producer.partitionKeyExpression", "payload");
  deploymentProperties.put("deployer.log.count", "2");
  return deploymentProperties;
}

Is this case, application properties are also overridden at deployment time in addition to setting the deployer property count for the log application. When using the fluent style, the the deployment properties are added using the method addDeploymentProperty, e.g. new StreamApplication("log").addDeploymentProperty("count", 2) and you do not need to prefix the property with deployer.<app_name>.

[Note]Note

In order to create/deploy your streams, you need to make sure that the corresponding apps have been registered in the DataFlow server first. Attempting to create or deploy a stream that contains an unknown app will throw an exception. You can register application using the DataFlowTemplate, e.g.

dataFlowOperations.appRegistryOperations().importFromResource(
            "http://bit.ly/Celsius-RC1-stream-applications-rabbit-maven", true);

The Stream applications can also be beans within your application that are injected in other classes to create Streams. There are many ways to structure Spring applications, but one way to structure it is to have an @Configuration class define the StreamBuilder and StreamApplications.

@Configuration
public StreamConfiguration {

  @Bean
  public StreamBuilder builder() {
    return Stream.builder(new DataFlowTemplate(URI.create("http://localhost:9393")));
  }

  @Bean
  public StreamApplication httpSource(){
    return new StreamApplication("http");
  }

  @Bean
  public StreamApplication logSink(){
    return new StreamApplication("log");
  }
}

Then in another class you can @Autowire these classes and deploy a stream.

@Component
public MyStreamApps {

  @Autowired
  private StreamBuilder streamBuilder;

  @Autowired
  private StreamApplication httpSource;

  @Autowired
  private StreamApplication logSink;


  public void deploySimpleStream() {
    Stream simpleStream = streamBuilder.name("simpleStream")
                            .source(httpSource);
                            .sink(logSink)
                            .create()
                            .deploy();
  }
}

This style allows you to easily share StreamApplications across multiple Streams.