Spring Cloud Stream supports general configuration options as well as configuration for bindings and binders. Some binders allow additional binding properties to support middleware-specific features.
Configuration options can be provided to Spring Cloud Stream applications via any mechanism supported by Spring Boot. This includes application arguments, environment variables, and YAML or .properties files.
The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
Default: 1
.
0
to instanceCount
-1.
Used for partitioning and with Kafka.
Automatically set in Cloud Foundry to match the application’s instance index.A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.
Default: empty (allowing any destination to be bound).
The default binder to use, if multiple binders are configured. See Multiple Binders on the Classpath.
Default: empty.
This property is only applicable when the cloud
profile is active and Spring Cloud Connectors are provided with the application.
If the property is false (the default), the binder will detect a suitable bound service (e.g. a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and will use it for creating connections (usually via Spring Cloud Connectors).
When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (e.g. relying on the spring.rabbitmq.*
properties provided in the environment for the RabbitMQ binder).
The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
Default: false.
Binding properties are supplied using the format spring.cloud.stream.bindings.<channelName>.<property>=<value>
.
The <channelName>
represents the name of the channel being configured (e.g., output
for a Source
).
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format spring.cloud.stream.default.<property>=<value>
.
In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>.
prefix and focus just on the property name, with the understanding that the prefix will be included at runtime.
The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>.
, e.g. spring.cloud.stream.bindings.input.destination=ticktock
.
Default values can be set by using the prefix spring.cloud.stream.default
, e.g. spring.cloud.stream.default.contentType=application/json
.
The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.
Default: null (indicating an anonymous consumer).
The content type of the channel.
Default: null (so that no type coercion is performed).
The binder used by this binding. See Section 4.4, “Multiple Binders on the Classpath” for details.
Default: null (the default binder will be used, if one exists).
The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer.
, e.g. spring.cloud.stream.bindings.input.consumer.concurrency=3
.
Default values can be set by using the prefix spring.cloud.stream.default.consumer
, e.g. spring.cloud.stream.default.consumer.headerMode=raw
.
The concurrency of the inbound consumer.
Default: 1
.
Whether the consumer receives data from a partitioned producer.
Default: false
.
When set to raw
, disables header parsing on input.
Effective only for messaging middleware that does not support message headers natively and requires header embedding.
Useful when inbound data is coming from outside Spring Cloud Stream applications.
Default: embeddedHeaders
.
If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.
Default: 3
.
The backoff initial interval on retry.
Default: 1000
.
The maximum backoff interval.
Default: 10000
.
The backoff multiplier.
Default: 2.0
.
When set to a value greater than equal to zero, allows customizing the instance index of this consumer (if different from spring.cloud.stream.instanceIndex
).
When set to a negative value, it will default to spring.cloud.stream.instanceIndex
.
Default: -1
.
When set to a value greater than equal to zero, allows customizing the instance count of this consumer (if different from spring.cloud.stream.instanceCount
).
When set to a negative value, it will default to spring.cloud.stream.instanceCount
.
Default: -1
.
The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer.
, e.g. spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
.
Default values can be set by using the prefix spring.cloud.stream.default.producer
, e.g. spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
.
A SpEL expression that determines how to partition outbound data.
If set, or if partitionKeyExtractorClass
is set, outbound data on this channel will be partitioned, and partitionCount
must be set to a value greater than 1 to be effective.
The two options are mutually exclusive.
See Section 2.5, “Partitioning Support”.
Default: null.
A PartitionKeyExtractorStrategy
implementation.
If set, or if partitionKeyExpression
is set, outbound data on this channel will be partitioned, and partitionCount
must be set to a value greater than 1 to be effective.
The two options are mutually exclusive.
See Section 2.5, “Partitioning Support”.
Default: null.
A PartitionSelectorStrategy
implementation.
Mutually exclusive with partitionSelectorExpression
.
If neither is set, the partition will be selected as the hashCode(key) % partitionCount
, where key
is computed via either partitionKeyExpression
or partitionKeyExtractorClass
.
Default: null.
A SpEL expression for customizing partition selection.
Mutually exclusive with partitionSelectorClass
.
If neither is set, the partition will be selected as the hashCode(key) % partitionCount
, where key
is computed via either partitionKeyExpression
or partitionKeyExtractorClass
.
Default: null.
The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, interpreted as a hint; the larger of this and the partition count of the target topic is used instead.
Default: 1
.
When set to raw
, disables header embedding on output.
Effective only for messaging middleware that does not support message headers natively and requires header embedding.
Useful when producing data for non-Spring Cloud Stream applications.
Default: embeddedHeaders
.
When set to true
, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer).
When this configuration is being used, the outbound message marshalling is not based on the contentType
of the binding.
When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message.
Also, when native encoding/decoding is used the headerMode
property is ignored and headers will not be embedded into the message.
Default: false
.
Besides the channels defined via @EnableBinding
, Spring Cloud Stream allows applications to send messages to dynamically bound destinations.
This is useful, for example, when the target destination needs to be determined at runtime.
Applications can do so by using the BinderAwareChannelResolver
bean, registered automatically by the @EnableBinding
annotation.
The property 'spring.cloud.stream.dynamicDestinations' can be used for restricting the dynamic destination names to a set known beforehand (whitelisting). If the property is not set, any destination can be bound dynamicaly.
The BinderAwareChannelResolver
can be used directly as in the following example, in which a REST controller uses a path variable to decide the target channel.
@EnableBinding @Controller public class SourceWithDynamicDestination { @Autowired private BinderAwareChannelResolver resolver; @RequestMapping(path = "/{target}", method = POST, consumes = "*/*") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest(@RequestBody String body, @PathVariable("target") target, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { sendMessage(body, target, contentType); } private void sendMessage(String body, String target, Object contentType) { resolver.resolveDestination(target).send(MessageBuilder.createMessage(body, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); } }
After starting the application on the default port 8080, when sending the following data:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
The destinations 'customers' and 'orders' are created in the broker (for example: exchange in case of Rabbit or topic in case of Kafka) with the names 'customers' and 'orders', and the data is published to the appropriate destinations.
The BinderAwareChannelResolver
is a general purpose Spring Integration DestinationResolver
and can be injected in other components.
For example, in a router using a SpEL expression based on the target
field of an incoming JSON message.
@EnableBinding @Controller public class SourceWithDynamicDestination { @Autowired private BinderAwareChannelResolver resolver; @RequestMapping(path = "/", method = POST, consumes = "application/json") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { sendMessage(body, contentType); } private void sendMessage(Object body, Object contentType) { routerChannel().send(MessageBuilder.createMessage(body, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); } @Bean(name = "routerChannel") public MessageChannel routerChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "routerChannel") public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target")); router.setDefaultOutputChannelName("default-output"); router.setChannelResolver(resolver); return router; } }