This section describes Spring Cloud Stream’s programming model. Spring Cloud Stream provides a number of predefined annotations for declaring bound input and output channels as well as how to listen to channels.
You can turn a Spring application into a Spring Cloud Stream application by applying the @EnableBinding annotation to one of the application’s configuration classes.
The @EnableBinding annotation itself is meta-annotated with @Configuration and triggers the configuration of Spring Cloud Stream infrastructure:
... @Import(...) @Configuration @EnableIntegration public @interface EnableBinding { ... Class<?>[] value() default {}; }
The @EnableBinding annotation can take as parameters one or more interface classes that contain methods which represent bindable components (typically message channels).
![]() | Note |
|---|---|
In Spring Cloud Stream 1.0, the only supported bindable components are the Spring Messaging |
A Spring Cloud Stream application can have an arbitrary number of input and output channels defined in an interface as @Input and @Output methods:
public interface Barista { @Input SubscribableChannel orders(); @Output MessageChannel hotDrinks(); @Output MessageChannel coldDrinks(); }
Using this interface as a parameter to @EnableBinding will trigger the creation of three bound channels named orders, hotDrinks, and coldDrinks, respectively.
@EnableBinding(Barista.class) public class CafeConfiguration { ... }
Using the @Input and @Output annotations, you can specify a customized channel name for the channel, as shown in the following example:
public interface Barista { ... @Input("inboundOrders") SubscribableChannel orders(); }
In this example, the created bound channel will be named inboundOrders.
For easy addressing of the most common use cases, which involve either an input channel, an output channel, or both, Spring Cloud Stream provides three predefined interfaces out of the box.
Source can be used for an application which has a single outbound channel.
public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); }
Sink can be used for an application which has a single inbound channel.
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
Processor can be used for an application which has both an inbound channel and an outbound channel.
public interface Processor extends Source, Sink { }
Spring Cloud Stream provides no special handling for any of these interfaces; they are only provided out of the box.
For each bound interface, Spring Cloud Stream will generate a bean that implements the interface.
Invoking a @Input-annotated or @Output-annotated method of one of these beans will return the relevant bound channel.
The bean in the following example sends a message on the output channel when its hello method is invoked.
It invokes output() on the injected Source bean to retrieve the target channel.
@Component public class SendingBean { private Source source; @Autowired public SendingBean(Source source) { this.source = source; } public void sayHello(String name) { source.output().send(MessageBuilder.withPayload(body).build()); } }
Bound channels can be also injected directly:
@Component public class SendingBean { private MessageChannel output; @Autowired public SendingBean(MessageChannel output) { this.output = output; } public void sayHello(String name) { output.send(MessageBuilder.withPayload(body).build()); } }
If the name of the channel is customized on the declaring annotation, that name should be used instead of the method name. Given the following declaration:
public interface CustomSource { ... @Output("customOutput") MessageChannel output(); }
The channel will be injected as shown in the following example:
@Component public class SendingBean { @Autowired private MessageChannel output; @Autowired @Qualifier("customOutput") public SendingBean(MessageChannel output) { this.output = output; } public void sayHello(String name) { customOutput.send(MessageBuilder.withPayload(body).build()); } }
You can write a Spring Cloud Stream application using either Spring Integration annotations or Spring Cloud Stream’s @StreamListener annotation.
The @StreamListener annotation is modeled after other Spring Messaging annotations (such as @MessageMapping, @JmsListener, @RabbitListener, etc.) but adds content type management and type coercion features.
Because Spring Cloud Stream is based on Spring Integration, Stream completely inherits Integration’s foundation and infrastructure as well as the component itself.
For example, you can attach the output channel of a Source to a MessageSource:
@EnableBinding(Source.class) public class TimerSource { @Value("${format}") private String format; @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1")) public MessageSource<String> timerMessageSource() { return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date())); } }
Or you can use a processor’s channels in a transformer:
@EnableBinding(Processor.class) public class TransformProcessor { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpper(); } }
Complementary to its Spring Integration support, Spring Cloud Stream provides its own @StreamListener annotation, modeled after other Spring Messaging annotations (e.g. @MessageMapping, @JmsListener, @RabbitListener, etc.).
The @StreamListener annotation provides a simpler model for handling inbound messages, especially when dealing with use cases that involve content type management and type coercion.
Spring Cloud Stream provides an extensible MessageConverter mechanism for handling data conversion by bound channels and for, in this case, dispatching to methods annotated with @StreamListener.
The following is an example of an application which processes external Vote events:
@EnableBinding(Sink.class) public class VoteHandler { @Autowired VotingService votingService; @StreamListener(Sink.INPUT) public void handle(Vote vote) { votingService.record(vote); } }
The distinction between @StreamListener and a Spring Integration @ServiceActivator is seen when considering an inbound Message that has a String payload and a contentType header of application/json.
In the case of @StreamListener, the MessageConverter mechanism will use the contentType header to parse the String payload into a Vote object.
As with other Spring Messaging methods, method arguments can be annotated with @Payload, @Headers and @Header.
![]() | Note |
|---|---|
For methods which return data, you must use the @EnableBinding(Processor.class) public class TransformProcessor { @Autowired VotingService votingService; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public VoteResult handle(Vote vote) { return votingService.record(vote); } } |
![]() | Note |
|---|---|
In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport (including transports, such as Kafka, that do not normally support headers). |
Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:
output, typically having a single binding of the type org.springframework.cloud.stream.messaging.Sourceinput, typically having a single binding of the type org.springframework.cloud.stream.messaging.Sinkinput and a single output channel named output, typically having a single binding of the type org.springframework.cloud.stream.messaging.Processor.They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.
Depending on the nature of the starting and ending element, the sequence may have one or more bindable channels, as follows:
input channel of the aggregate and will be bound accordinglyoutput channel of the aggregate and will be bound accordinglyAggregation is performed using the AggregateApplicationBuilder utility class, as in the following example.
Let’s consider a project in which we have source, processor and a sink, which may be defined in the project, or may be contained in one of the project’s dependencies.
@SpringBootApplication @EnableBinding(Sink.class) public class SinkApplication { private static Logger logger = LoggerFactory.getLogger(SinkModuleDefinition.class); @ServiceActivator(inputChannel=Sink.INPUT) public void loggerSink(Object payload) { logger.info("Received: " + payload); } }
@SpringBootApplication @EnableBinding(Processor.class) public class ProcessorApplication { @Transformer public String loggerSink(String payload) { return payload.toUpperCase(); } }
@SpringBootApplication @EnableBinding(Source.class) public class SourceApplication { @Bean @InboundChannelAdapter(value = Source.OUTPUT) public String timerMessageSource() { return new SimpleDateFormat().format(new Date()); } }
Each configuration can be used for running a separate component, but in this case they can be aggregated together as follows:
@SpringBootApplication public class SampleAggregateApplication { public static void main(String[] args) { new AggregateApplicationBuilder() .from(SourceApplication.class).args("--fixedDelay=5000") .via(ProcessorApplication.class) .to(SinkApplication.class).args("--debug=true").run(args); } }
The starting component of the sequence is provided as argument to the from() method.
The ending component of the sequence is provided as argument to the to() method.
Intermediate processors are provided as argument to the via() method.
Multiple processors of the same type can be chained together (e.g. for pipelining transformations with different configurations).
For each component, the builder can provide runtime arguments for Spring Boot configuration.
Spring Cloud Stream provides support for RxJava-based processors through the RxJavaProcessor available in spring-cloud-stream-rxjava.
public interface RxJavaProcessor<I, O> { Observable<O> process(Observable<I> input); }
An implementation of RxJavaProcessor will receive Observable as an input that represents the flow of inbound message payloads.
The process method is invoked once at startup for setting up the data flow.
You can enable the use of RxJava-based processors and use them in your processor application by using the @EnableRxJavaProcessor annotation.
@EnableRxJavaProcessor is meta-annotated with @EnableBinding(Processor.class) and will create the Processor binding.
Here is an example of an RxJava-based processor:
@EnableRxJavaProcessor public class RxJavaTransformer { private static Logger logger = LoggerFactory.getLogger(RxJavaTransformer.class); @Bean public RxJavaProcessor<String,String> processor() { return inputStream -> inputStream.map(data -> { logger.info("Got data = " + data); return data; }) .buffer(5) .map(data -> String.valueOf(avg(data))); } private static Double avg(List<String> data) { double sum = 0; double count = 0; for(String d : data) { count++; sum += Double.valueOf(d); } return sum/count; } }
![]() | Note |
|---|---|
When implementing an RxJava processor, it is important to handle exceptions as part of your processing flow.
Uncaught exceptions will be treated as errors by RxJava and will cause the |