3. Programming model

This section will describe the programming model of Spring Cloud Stream, which consists from a number of predefined annotations that can be used to declare bound inputs and output channels, as well as how to listen to them.

3.1 Declaring and binding channels

3.1.1 Triggering binding via @EnableBinding

A Spring application becomes a Spring Cloud Stream application when the @EnableBinding annotation is applied to one of its configuration classes. @EnableBinding itself is meta-annotated with @Configuration, and triggers the configuration of Spring Cloud Stream infrastructure as follows:

...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}

@EnableBinding can be parameterized with one or more interface classes, containing methods that represent bindable components (typically message channels).

[Note]Note

As of version 1.0, the only supported bindable component is the Spring Messaging MessageChannel and its extensions SubscribableChannel and PollableChannel. It is intended for future versions to extend support to other types of components, using the same mechanism. In this documentation, we will continue to refer to channels.

3.1.2 @Input and @Output

A Spring Cloud Stream application can have an arbitrary number of input and output channels defined as @Input and @Output methods in an interface, as follows:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

Using this interface as a parameter to @EnableBinding, as in the following example, will trigger the creation of three bound channels named orders, hotDrinks and coldDrinks respectively.

@EnableBinding(Barista.class)
public class CafeConfiguration {

   ...
}

Customizing channel names

Both @Input and @Output allow specifying a customized name for the channel, as follows:

public interface Barista {
    ...
    @Input("inboundOrders")
    SubscribableChannel orders();
}

In this case, the name of the bound channel being created will be inboundOrders.

Source, Sink, and Processor

For ease of addressing the most common use cases that involve either an input or an output channel, or both, out of the box Spring Cloud Stream provides three predefined interfaces.

Source can be used for applications that have a single outbound channel.

public interface Source {

	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();

}

Sink can be used for applications that have a single inbound channel.

public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}

Processor can be used for applications that have both an inbound and an outbound channel.

public interface Processor extends Source, Sink {
}

There is no special handling for either of these interfaces in Spring Cloud Stream, besides of the fact that they are provided out of the box.

3.1.3 Accessing bound channels

Injecting the bound interfaces

For each of the bound interfaces, Spring Cloud Stream will generate a bean that implements it, and for which invoking an @Input or @Output annotated method will return the bound channel. For example, the bean in the following example will send a message on the output channel every time its hello method is invoked, using the injected Source bean, and invoking output() to retrieve the target channel.

@Component
public class SendingBean {

    private Source source;

    @Autowired
    public SendingBean(Source source) {
        this.source = source;
    }

    public void sayHello(String name) {
		     source.output().send(MessageBuilder.withPayload(body).build());
	  }
}

Injecting channels directly

Bound channels can be also injected directly. For example:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
		     output.send(MessageBuilder.withPayload(body).build());
	  }
}

Note that if the name of the channel is customized on the declaring annotation, that name should be used instead of the method name. Considering this declaration:

public interface CustomSource {
    ...
    @Output("customOutput")
    MessageChannel output();
}

The channel will be injected as follows:

@Component
public class SendingBean {

    @Autowired
    private MessageChannel output;

    @Autowired @Qualifier("customOutput")
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
		     customOutput.send(MessageBuilder.withPayload(body).build());
	  }
}

3.1.4 Programming model

Spring Cloud Stream allows you to write applications by either using Spring Integration annotations or Spring Cloud Stream’s @StreamListener annotation which is modeled after other Spring Messaging annotations (e.g. @MessageMapping, @JmsListener, @RabbitListener, etc.) but add content type management and type coercion features.

Native Spring Integration support

Due to the fact that Spring Cloud Stream is Spring Integration based, it completely inherits its foundation and infrastructure, as well as the component. For example, the output channel of a Source can be attached to a MessageSource, as follows:

@EnableBinding(Source.class)
public class TimerSource {

  @Value("${format}")
  private String format;

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
  }
}

Or, the channels of a processor can be used in a transformer, as follows:

@EnableBinding(Processor.class)
public class TransformProcessor {
	@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
	public Object transform(String message) {
		return message.toUpper();
	}
}

@StreamListener for automatic content type handling

Complementary to the Spring Integration support, Spring Cloud Stream provides a @StreamListener annotation of its own modeled by the other similar Spring Messaging annotations (e.g. @MessageMapping, @JmsListener, @RabbitListener, etc.). It provides a simpler model for handling inbound messages, especially for dealing with use cases that involve content type management and type coercion. Spring Cloud Stream provides an extensible MessageConverter mechanism for handling data conversion by bound channels and, in this case, for dispatching to @StreamListener annotated methods.

For example, an application that processes external Vote events can be declared as follows:

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
	public void handle(Vote vote) {
		votingService.record(vote);
	}
}

The distinction between this approach and a Spring Integration @ServiceActivator becomes relevant if one considers an inbound Message with a String payload and a contentType header of application/json. For @StreamListener, the MessageConverter mechanism will use the contentType header to parse the String into a Vote object.

Just as with the other Spring Messaging methods, method arguments can be annotated with @Payload, @Headers and @Header. For methods that return data, @SendTo must be used for specifying the output binding destination for data returned by the methods as follows:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
	public VoteResult handle(Vote vote) {
		return votingService.record(vote);
	}
}
[Note]Note

Content type headers can be set by external applications in the case of Rabbit MQ, and they are supported as part of an extended internal protocol by Spring Cloud Stream for any type of transport (even the ones that do not support headers normally, like Kafka).

3.2 Binder SPI

As described above, Spring Cloud Stream provides a binder abstraction for connecting to physical destinations. This section will provide more information about the main concepts behind the Binder SPI, its main components, as well as details specific to different implementations.

3.2.1 Producers and Consumers

Figure 3.1. Producers and Consumers

producers consumers

A producer is any component that sends messages to a channel. That channel can be bound to an external message broker via a Binder implementation for that broker. When invoking the bindProducer method, the first parameter is the name of the destination within that broker. The second parameter is the local channel instance to which the producer will be sending messages, and the third parameter contains properties to be used within the adapter that is created for that channel, such as a partition key expression.

A consumer is any component that receives messages from a channel. As with the producer, the consumer’s channel can be bound to an external message broker, and the first parameter for the bindConsumer method is the destination name. However, on the consumer side, a second parameter provides the name of a logical group of consumers. Each group represented by consumer bindings for a given destination will receive a copy of each message that a producer sends to that destination (i.e. pub/sub semantics). If there are multiple consumer instances bound using the same group name, then messages will be load balanced across those consumer instances so that each message sent by a producer would only be consumed by a single consumer instance within each group (i.e. queue semantics).

3.2.2 Kafka Binder

Figure 3.2. Kafka Binder

kafka binder

The Kafka Binder implementation maps the destination to a Kafka topic, and the consumer group maps directly to the same Kafka concept. Spring Cloud Stream does not use the high level consumer, but implements a similar concept for the simple consumer.

3.2.3 RabbitMQ Binder

Figure 3.3. RabbitMQ Binder

rabbit binder

The RabbitMQ Binder implementation maps the destination to a TopicExchange, and for each consumer group, a Queue will be bound to that TopicExchange. Each consumer instance that binds will trigger creation of a corresponding RabbitMQ Consumer instance for its group’s Queue.