While Spring Cloud Stream makes it easy for individual Spring Boot applications to connect to messaging systems, the typical scenario for Spring Cloud Stream is the creation of multi-application pipelines, where microservice applications send data to each other. You can achieve this scenario by correlating the input and output destinations of adjacent applications.
Supposing that a design calls for the Time Source application to send data to the Log Sink application, you can use a common destination named ticktock
for bindings within both applications.
Time Source (that has the channel name output
) will set the following property:
spring.cloud.stream.bindings.output.destination=ticktock
Log Sink (that has the channel name input
) will set the following property:
spring.cloud.stream.bindings.input.destination=ticktock
When scaling up Spring Cloud Stream applications, each instance can receive information about how many other instances of the same application exist and what its own instance index is.
Spring Cloud Stream does this through the spring.cloud.stream.instanceCount
and spring.cloud.stream.instanceIndex
properties.
For example, if there are three instances of a HDFS sink application, all three instances will have spring.cloud.stream.instanceCount
set to 3
, and the individual applications will have spring.cloud.stream.instanceIndex
set to 0
, 1
, and 2
, respectively.
When Spring Cloud Stream applications are deployed via Spring Cloud Data Flow, these properties are configured automatically; when Spring Cloud Stream applications are launched independently, these properties must be set correctly.
By default, spring.cloud.stream.instanceCount
is 1
, and spring.cloud.stream.instanceIndex
is 0
.
In a scaled-up scenario, correct configuration of these two properties is important for addressing partitioning behavior (see below) in general, and the two properties are always required by certain binders (e.g., the Kafka binder) in order to ensure that data are split correctly across multiple consumer instances.
An output binding is configured to send partitioned data by setting one and only one of its partitionKeyExpression
or partitionKeyExtractorName
(see next paragraph) properties, as well as its partitionCount
property.
For example, the following is a valid and typical configuration:
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id spring.cloud.stream.bindings.output.producer.partitionCount=5
Based on the above example configuration, data will be sent to the target partition using the following logic.
A partition key’s value is calculated for each message sent to a partitioned output channel based on the partitionKeyExpression
.
The partitionKeyExpression
is a SpEL expression which is evaluated against the outbound message for extracting the partitioning key.
If a SpEL expression is not sufficient for your needs, you can instead calculate the partition key value by providing implementation of org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
and configuring it as a bean (i.e., @Bean
). In the event you have more then one bean of type org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
available in the Application Context you can further filter it by specifying its name via partitionKeyExtractorName
property:
--spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor --spring.cloud.stream.bindings.output.producer.partitionCount=5 . . . @Bean public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() { return new CustomPartitionKeyExtractorClass(); }
![]() | Note |
---|---|
In previous versions of Spring Cloud Stream you could specify the implementation of |
Ones the message key is calculated, the partition selection process will determine the target partition as a value between 0
and partitionCount - 1
.
The default calculation, applicable in most scenarios, is based on the formula key.hashCode() % partitionCount
.
This can be customized on the binding, either by setting a SpEL expression to be evaluated against the 'key' (via the partitionSelectorExpression
property) or by configuring an implementation of org.springframework.cloud.stream.binder.PartitionSelectorStrategy
as a bean (i.e., @Bean). And similarly to the PartitionKeyExtractorStrategy
you can further filter it using spring.cloud.stream.bindings.output.producer.partitionSelectorName
property in the event there are more then one bean of this type is available in the Application Context.
--spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector . . . @Bean public CustomPartitionSelectorClass customPartitionSelector() { return new CustomPartitionSelectorClass(); }
![]() | Note |
---|---|
In previous versions of Spring Cloud Stream you could specify the implementation of |
An input binding (with the channel name input
) is configured to receive partitioned data by setting its partitioned
property, as well as the instanceIndex
and instanceCount
properties on the application itself, as in the following example:
spring.cloud.stream.bindings.input.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
The instanceCount
value represents the total number of application instances between which the data need to be partitioned, and the instanceIndex
must be a unique value across the multiple instances, between 0
and instanceCount - 1
.
The instance index helps each application instance to identify the unique partition(s) from which it receives data.
It is required by binders using technology that doesn’t support partitioning natively, for example, with RabbitMQ, there is a queue for each partition, with the queue name containing the instance index.
With Kafka, if autoRebalanceEnabled
is true
(default), Kafka will take care of distributing partitions across instances and these properties are not required.
If autoRebalanceEnabled
is set to false, the instanceCount
and instanceIndex
are used by the binder to determine which partition(s) the instance will subscribe to (you must have at least as many partitions as there are instances).
The binder will allocate the partitions instead of Kafka.
This might be useful if you want messages for a particular partition to always go to the same instance.
When a binder configuration that requires them, it is important to set both values correctly in order to ensure that all of the data is consumed and that the application instances receive mutually exclusive datasets.
While a scenario which using multiple instances for partitioned data processing may be complex to set up in a standalone case, Spring Cloud Dataflow can simplify the process significantly by populating both the input and output values correctly as well as relying on the runtime infrastructure to provide information about the instance index and instance count.