While Spring Cloud Stream makes it easy for individual boot apps to connect to messaging systems, the typical scenario for Spring Cloud Stream is the creation of multi-app pipelines, where microservice apps are sending data to each other. This can be achieved by correlating the input and output destinations of adjacent apps, as in the following example.
Supposing that the design calls for the time-source
app to send data to the log-sink
app, we will use a
common destination named ticktock
for bindings within both apps. time-source
will set
spring.cloud.stream.bindings.output.destination=ticktock
, and log-sink
will set
spring.cloud.stream.bindings.input.destination=ticktock
.
When scaling up Spring Cloud Stream applications, each instance can receive information about how many other instances
of the same application exist and what its own instance index is. This is done through the
spring.cloud.stream.instanceCount
and spring.cloud.stream.instanceIndex
properties. For example, if there are 3
instances of the HDFS sink application, all three will have spring.cloud.stream.instanceCount
set to 3, and the
applications will have spring.cloud.stream.instanceIndex
set to 0, 1 and 2, respectively. When Spring Cloud Stream
applications are deployed via Spring Cloud Data Flow, these properties are configured automatically, but when Spring
Cloud Stream applications are launched independently, these properties must be set correctly. By default
spring.cloud.stream.instanceCount
is 1, and spring.cloud.stream.instanceIndex
is 0.
Setting up the two properties correctly on scale up scenarios is important for addressing partitioning behavior in general (see below), and they are always required by certain types of binders (e.g. the Kafka binder) in order to ensure that data is split correctly across multiple consumer instances.
An output binding is configured to send partitioned data, by setting one and only one of its partitionKeyExpression
or partitionKeyExtractorClass
properties, as well as its partitionCount
property. For example, setting
spring.cloud.stream.bindings.output.partitionKeyExpression=payload.id
,spring.cloud.stream.bindings.output.partitionCount=5
is a valid and typical configuration.
Based on this configuration, the data will be sent to the target partition using the following logic. A partition key’s
value is calculated for each message sent to a partitioned output channel based on the partitionKeyExpression
. The
partitionKeyExpression
is a SpEL expression that is evaluated against the outbound message for extracting the
partitioning key. If a SpEL expression is not sufficient for your needs, you can instead calculate the partition key
value by setting the property partitionKeyExtractorClass
. This class must implement the interface
org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
. While, in general, the SpEL expression should
suffice, more complex cases may use the custom implementation strategy.
Once the message key is calculated, the partition selection process will determine the target partition as a value
between 0
and partitionCount - 1
. The default calculation, applicable in most scenarios is based on the formula
key.hashCode() % partitionCount
. This can be customized on the binding, either by setting a SpEL expression to be
evaluated against the key via the partitionSelectorExpression
property, or by setting a
org.springframework.cloud.stream.binder.PartitionSelectorStrategy
implementation via the partitionSelectorClass
property.
Additional properties can be configured for more advanced scenarios, as described in the following section.
An input binding is configured to receive partitioned data by setting its partitioned
property, as well as the
instance index and instance count properties on the app itself, as follows:
spring.cloud.stream.bindings.input.partitioned=true
,spring.cloud.stream.instanceIndex=3
,spring.cloud.stream.instanceCount=5
.
The instance count value represents the total number of app instances between which the data needs to be partitioned,
whereas instance index must be a unique value across the multiple instances, between 0
and instanceCount - 1
. The
instance index helps each app instance to identify the unique partition (or in the case of Kafka, the partition set)
from which it receives data. It is important that both values are set correctly in order to ensure that all the data is
consumed, and that the app instances receive mutually exclusive datasets.
While setting up multiple instances for partitioned data processing may be complex in the standalone case, Spring Cloud Data Flow can simplify the process significantly, by populating both the input and output values correctly, as well as relying on the runtime infrastructure to provide information about the instance index and instance count.