8. Inter-Application Communication

8.1 Connecting Multiple Application Instances

While Spring Cloud Stream makes it easy for individual Spring Boot applications to connect to messaging systems, the typical scenario for Spring Cloud Stream is the creation of multi-application pipelines, where microservice applications send data to each other. You can achieve this scenario by correlating the input and output destinations of adjacent applications.

Supposing that a design calls for the Time Source application to send data to the Log Sink application, you can use a common destination named ticktock for bindings within both applications.

Time Source (that has the channel name output) will set the following property:

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink (that has the channel name input) will set the following property:

spring.cloud.stream.bindings.input.destination=ticktock

8.2 Instance Index and Instance Count

When scaling up Spring Cloud Stream applications, each instance can receive information about how many other instances of the same application exist and what its own instance index is. Spring Cloud Stream does this through the spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex properties. For example, if there are three instances of a HDFS sink application, all three instances will have spring.cloud.stream.instanceCount set to 3, and the individual applications will have spring.cloud.stream.instanceIndex set to 0, 1, and 2, respectively.

When Spring Cloud Stream applications are deployed via Spring Cloud Data Flow, these properties are configured automatically; when Spring Cloud Stream applications are launched independently, these properties must be set correctly. By default, spring.cloud.stream.instanceCount is 1, and spring.cloud.stream.instanceIndex is 0.

In a scaled-up scenario, correct configuration of these two properties is important for addressing partitioning behavior (see below) in general, and the two properties are always required by certain binders (e.g., the Kafka binder) in order to ensure that data are split correctly across multiple consumer instances.

8.3 Partitioning

8.3.1 Configuring Output Bindings for Partitioning

An output binding is configured to send partitioned data by setting one and only one of its partitionKeyExpression or partitionKeyExtractorClass properties, as well as its partitionCount property. For example, the following is a valid and typical configuration:

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

Based on the above example configuration, data will be sent to the target partition using the following logic.

A partition key’s value is calculated for each message sent to a partitioned output channel based on the partitionKeyExpression. The partitionKeyExpression is a SpEL expression which is evaluated against the outbound message for extracting the partitioning key.

If a SpEL expression is not sufficient for your needs, you can instead calculate the partition key value by setting the property partitionKeyExtractorClass to a class which implements the org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy interface. While the SpEL expression should usually suffice, more complex cases may use the custom implementation strategy. In that case, the property 'partitionKeyExtractorClass' can be set as follows:

spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=com.example.MyKeyExtractor
spring.cloud.stream.bindings.output.producer.partitionCount=5

Once the message key is calculated, the partition selection process will determine the target partition as a value between 0 and partitionCount - 1. The default calculation, applicable in most scenarios, is based on the formula key.hashCode() % partitionCount. This can be customized on the binding, either by setting a SpEL expression to be evaluated against the 'key' (via the partitionSelectorExpression property) or by setting a org.springframework.cloud.stream.binder.PartitionSelectorStrategy implementation (via the partitionSelectorClass property).

The binding level properties for 'partitionSelectorExpression' and 'partitionSelectorClass' can be specified similar to the way 'partitionKeyExpression' and 'partitionKeyExtractorClass' properties are specified in the above examples. Additional properties can be configured for more advanced scenarios, as described in the following section.

Spring-managed custom PartitionKeyExtractorClass implementations

In the example above, a custom strategy such as MyKeyExtractor is instantiated by the Spring Cloud Stream directly. In some cases, it is necessary for such a custom strategy implementation to be created as a Spring bean, for being able to be managed by Spring, so that it can perform dependency injection, property binding, etc. This can be done by configuring it as a @Bean in the application context and using the fully qualified class name as the bean’s name, as in the following example.

@Bean(name="com.example.MyKeyExtractor")
public MyKeyExtractor extractor() {
    return new MyKeyExtractor();
}

As a Spring bean, the custom strategy benefits from the full lifecycle of a Spring bean. For example, if the implementation need access to the application context directly, it can make implement 'ApplicationContextAware'.

Configuring Input Bindings for Partitioning

An input binding (with the channel name input) is configured to receive partitioned data by setting its partitioned property, as well as the instanceIndex and instanceCount properties on the application itself, as in the following example:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

The instanceCount value represents the total number of application instances between which the data need to be partitioned, and the instanceIndex must be a unique value across the multiple instances, between 0 and instanceCount - 1. The instance index helps each application instance to identify the unique partition (or, in the case of Kafka, the partition set) from which it receives data. It is important to set both values correctly in order to ensure that all of the data is consumed and that the application instances receive mutually exclusive datasets.

While a scenario which using multiple instances for partitioned data processing may be complex to set up in a standalone case, Spring Cloud Dataflow can simplify the process significantly by populating both the input and output values correctly as well as relying on the runtime infrastructure to provide information about the instance index and instance count.