This section will describe the programming model of Spring Cloud Stream, which consists from a number of predefined annotations that can be used to declare bound inputs and output channels, as well as how to listen to them.
A Spring application becomes a Spring Cloud Stream application when the @EnableBinding
annotation is applied to one of its configuration classes. @EnableBinding
itself is meta-annotated with @Configuration
, and triggers the configuration of Spring Cloud Stream infrastructure as follows:
... @Import(...) @Configuration @EnableIntegration public @interface EnableBinding { ... Class<?>[] value() default {}; }
@EnableBinding
can be parameterized with one or more interface classes, containing methods that represent bindable components (typically message channels).
![]() | Note |
---|---|
As of version 1.0, the only supported bindable component is the Spring Messaging |
A Spring Cloud Stream application can have an arbitrary number of input and output channels defined as @Input
and @Output
methods in an interface, as follows:
public interface Barista { @Input SubscribableChannel orders(); @Output MessageChannel hotDrinks(); @Output MessageChannel coldDrinks(); }
Using this interface as a parameter to @EnableBinding
, as in the following example, will trigger the creation of three bound channels named orders
, hotDrinks
and coldDrinks
respectively.
@EnableBinding(Barista.class) public class CafeConfiguration { ... }
Both @Input and @Output allow specifying a customized name for the channel, as follows:
public interface Barista { ... @Input("inboundOrders") SubscribableChannel orders(); }
In this case, the name of the bound channel being created will be inboundOrders
.
For ease of addressing the most common use cases that involve either an input or an output channel, or both, out of the box Spring Cloud Stream provides three predefined interfaces.
Source
can be used for applications that have a single outbound channel.
public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); }
Sink
can be used for applications that have a single inbound channel.
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
Processor
can be used for applications that have both an inbound and an outbound channel.
public interface Processor extends Source, Sink { }
There is no special handling for either of these interfaces in Spring Cloud Stream, besides of the fact that they are provided out of the box.
For each of the bound interfaces, Spring Cloud Stream will generate a bean that implements it, and for which invoking an @Input
or @Output
annotated method will return the bound channel.
For example, the bean in the following example will send a message on the output channel every time its hello
method is invoked, using the injected Source
bean, and invoking output()
to retrieve the target channel.
@Component public class SendingBean { private Source source; @Autowired public SendingBean(Source source) { this.source = source; } public void sayHello(String name) { source.output().send(MessageBuilder.withPayload(body).build()); } }
Bound channels can be also injected directly. For example:
@Component public class SendingBean { private MessageChannel output; @Autowired public SendingBean(MessageChannel output) { this.output = output; } public void sayHello(String name) { output.send(MessageBuilder.withPayload(body).build()); } }
Note that if the name of the channel is customized on the declaring annotation, that name should be used instead of the method name. Considering this declaration:
public interface CustomSource { ... @Output("customOutput") MessageChannel output(); }
The channel will be injected as follows:
@Component public class SendingBean { @Autowired private MessageChannel output; @Autowired @Qualifier("customOutput") public SendingBean(MessageChannel output) { this.output = output; } public void sayHello(String name) { customOutput.send(MessageBuilder.withPayload(body).build()); } }
Spring Cloud Stream allows you to write applications by either using Spring Integration annotations or Spring Cloud Stream’s @StreamListener
annotation which is modeled after other Spring Messaging annotations (e.g. @MessageMapping
, @JmsListener
, @RabbitListener
, etc.) but add content type management and type coercion features.
Due to the fact that Spring Cloud Stream is Spring Integration based, it completely inherits its foundation and infrastructure, as well as the component. For example, the output channel of a Source
can be attached to a MessageSource
, as follows:
@EnableBinding(Source.class) public class TimerSource { @Value("${format}") private String format; @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1")) public MessageSource<String> timerMessageSource() { return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date())); } }
Or, the channels of a processor can be used in a transformer, as follows:
@EnableBinding(Processor.class) public class TransformProcessor { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpper(); } }
Complementary to the Spring Integration support, Spring Cloud Stream provides a @StreamListener
annotation of its own modeled by the other similar Spring Messaging annotations (e.g. @MessageMapping
, @JmsListener
, @RabbitListener
, etc.).
It provides a simpler model for handling inbound messages, especially for dealing with use cases that involve content type management and type coercion.
Spring Cloud Stream provides an extensible MessageConverter
mechanism for handling data conversion by bound channels and, in this case, for dispatching to @StreamListener
annotated methods.
For example, an application that processes external Vote
events can be declared as follows:
@EnableBinding(Sink.class) public class VoteHandler { @Autowired VotingService votingService; @StreamListener(Sink.INPUT) public void handle(Vote vote) { votingService.record(vote); } }
The distinction between this approach and a Spring Integration @ServiceActivator
becomes relevant if one considers an inbound Message
with a String
payload and a contentType
header of application/json
.
For @StreamListener
, the MessageConverter
mechanism will use the contentType
header to parse the String
into a Vote
object.
Just as with the other Spring Messaging methods, method arguments can be annotated with @Payload
, @Headers
and @Header
.
For methods that return data, @SendTo
must be used for specifying the output binding destination for data returned by the methods as follows:
@EnableBinding(Processor.class) public class TransformProcessor { @Autowired VotingService votingService; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public VoteResult handle(Vote vote) { return votingService.record(vote); } }
![]() | Note |
---|---|
Content type headers can be set by external applications in the case of Rabbit MQ, and they are supported as part of an extended internal protocol by Spring Cloud Stream for any type of transport (even the ones that do not support headers normally, like Kafka). |
As described above, Spring Cloud Stream provides a binder abstraction for connecting to physical destinations. This section will provide more information about the main concepts behind the Binder SPI, its main components, as well as details specific to different implementations.
A producer is any component that sends messages to a channel. That channel can be bound to an external message broker
via a Binder
implementation for that broker. When invoking the bindProducer
method, the first parameter is the name
of the destination within that broker. The second parameter is the local channel instance to which the producer will be
sending messages, and the third parameter contains properties to be used within the adapter that is created for that
channel, such as a partition key expression.
A consumer is any component that receives messages from a channel. As with the producer, the consumer’s channel can be
bound to an external message broker, and the first parameter for the bindConsumer
method is the destination name.
However, on the consumer side, a second parameter provides the name of a logical group of consumers. Each group
represented by consumer bindings for a given destination will receive a copy of each message that a producer sends to
that destination (i.e. pub/sub semantics). If there are multiple consumer instances bound using the same group name,
then messages will be load balanced across those consumer instances so that each message sent by a producer would only
be consumed by a single consumer instance within each group (i.e. queue semantics).
The Kafka Binder implementation maps the destination to a Kafka topic, and the consumer group maps directly to the same Kafka concept. Spring Cloud Stream does not use the high level consumer, but implements a similar concept for the simple consumer.
The RabbitMQ Binder implementation maps the destination to a TopicExchange
, and for each consumer group, a Queue
will be bound to that TopicExchange
. Each consumer instance that binds will trigger creation of a corresponding
RabbitMQ Consumer
instance for its group’s Queue
.