This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Producing and Consuming Messages

You can write a Spring Cloud Stream application by simply writing functions and exposing them as @Bean s. You can also use Spring Integration annotations based configuration or Spring Cloud Stream annotation based configuration, although starting with spring-cloud-stream 3.x we recommend using functional implementations.

Spring Cloud Function support

Overview

Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where they can be expressed as beans of type java.util.function.[Supplier/Function/Consumer].

To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.function.definition property.

In the event you only have single bean of type java.util.function.[Supplier/Function/Consumer], you can skip the spring.cloud.function.definition property, since such functional bean will be auto-discovered. However, it is considered best practice to use such property to avoid any confusion. Some time this auto-discovery can get in the way, since single bean of type java.util.function.[Supplier/Function/Consumer] could be there for purposes other then handling messages, yet being single it is auto-discovered and auto-bound. For these rare scenarios you can disable auto-discovery by providing spring.cloud.stream.function.autodetect property with value set to false.

Here is the example of the application exposing message handler as java.util.function.Function effectively supporting pass-thru semantics by acting as consumer and producer of data.

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

In the preceding example, we define a bean of type java.util.function.Function called toUpperCase to be acting as message handler whose 'input' and 'output' must be bound to the external destinations exposed by the provided destination binder. By default the 'input' and 'output' binding names will be toUpperCase-in-0 and toUpperCase-out-0. Please see Functional Binding Names section for details on naming convention used to establish binding names.

Below are the examples of simple functional applications to support other semantics:

Here is the example of a source semantics exposed as java.util.function.Supplier

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

Here is the example of a sink semantics exposed as java.util.function.Consumer

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

Suppliers (Sources)

Function and Consumer are pretty straightforward when it comes to how their invocation is triggered. They are triggered based on data (events) sent to the destination they are bound to. In other words, they are classic event-driven components.

However, Supplier is in its own category when it comes to triggering. Since it is, by definition, the source (the origin) of the data, it does not subscribe to any in-bound destination and, therefore, has to be triggered by some other mechanism(s). There is also a question of Supplier implementation, which could be imperative or reactive and which directly relates to the triggering of such suppliers.

Consider the following sample:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

The preceding Supplier bean produces a string whenever its get() method is invoked. However, who invokes this method and how often? The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?"). In other words, the above configuration produces a single message every second and each message is sent to an output destination that is exposed by the binder. To learn how to customize the polling mechanism, see Polling Configuration Properties section.

Consider a different example:

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

The preceding Supplier bean adopts the reactive programming style. Typically, and unlike the imperative supplier, it should be triggered only once, given that the invocation of its get() method produces (supplies) the continuous stream of messages and not an individual message.

The framework recognizes the difference in the programming style and guarantees that such a supplier is triggered only once.

However, imagine the use case where you want to poll some data source and return a finite stream of data representing the result set. The reactive programming style is a perfect mechanism for such a Supplier. However, given the finite nature of the produced stream, such Supplier still needs to be invoked periodically.

Consider the following sample, which emulates such use case by producing a finite stream of data:

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

The bean itself is annotated with PollableBean annotation (sub-set of @Bean), thus signaling to the framework that although the implementation of such a supplier is reactive, it still needs to be polled.

There is a splittable attribute defined in PollableBean which signals to the post processors of this annotation that the result produced by the annotated component has to be split and is set to true by default. It means that the framework will split the returning sending out each item as an individual message. If this is not he desired behavior you can set it to false at which point such supplier will simply return the produced Flux without splitting it.

Supplier & threading

As you have learned by now, unlike Function and Consumer, which are triggered by an event (they have input data), Supplier does not have any input and thus triggered by a different mechanism - poller, which may have an unpredictable threading mechanism. And while the details of the threading mechanism most of the time are not relevant to the downstream execution of the function it may present an issue in certain cases especially with integrated frameworks that may have certain expectations to thread affinity. For example, Spring Cloud Sleuth which relies on tracing data stored in thread local. For those cases we have another mechanism via StreamBridge, where user has more control over threading mechanism. You can get more details in Sending arbitrary data to an output (e.g. Foreign event-driven sources) section.

Consumer (Reactive)

Reactive Consumer is a little bit special because it has a void return type, leaving framework with no reference to subscribe to. Most likely you will not need to write Consumer<Flux<?>>, and instead write it as a Function<Flux<?>, Mono<Void>> invoking then operator as the last operator on your stream.

For example:

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

But if you do need to write an explicit Consumer<Flux<?>>, remember to subscribe to the incoming Flux.

Also, keep in mind that the same rule applies for function composition when mixing reactive and imperative functions. Spring Cloud Function indeed supports composing reactive functions with imperative, however you must be aware of certain limitations. For example, assume you have composed reactive function with imperative consumer. The result of such composition is a reactive Consumer. However, there is no way to subscribe to such consumer as discussed earlier in this section, so this limitation can only be addressed by either making your consumer reactive and subscribing manually (as discussed earlier), or changing your function to be imperative.

Polling Configuration Properties

The following properties are exposed by Spring Cloud Stream and are prefixed with the spring.integration.poller.:

fixedDelay

Fixed delay for default poller in milliseconds.

Default: 1000L.

maxMessagesPerPoll

Maximum messages for each polling event of the default poller.

Default: 1L.

cron

Cron expression value for the Cron Trigger.

Default: none.

initialDelay

Initial delay for periodic triggers.

Default: 0.

timeUnit

The TimeUnit to apply to delay values.

Default: MILLISECONDS.

For example --spring.integration.poller.fixed-delay=2000 sets the poller interval to poll every two seconds.

Per-binding polling configuration

The previous section shows how to configure a single default poller that will be applied to all bindings. While it fits well with the model of microservices spring-cloud-stream designed for where each microservice represents a single component (e.g., Supplier) and thus default poller configuration is enough, there are edge cases where you may have several components that require different polling configurations

For such cases please use per-binding way of configuring poller. For example, assume you have an output binding supply-out-0. In this case you can configure poller for such binding using spring.cloud.stream.bindings.supply-out-0.producer.poller.. prefix (e.g., spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000).

Sending arbitrary data to an output (e.g. Foreign event-driven sources)

There are cases where the actual source of data may be coming from the external (foreign) system that is not a binder. For example, the source of the data may be a classic REST endpoint. How do we bridge such source with the functional mechanism used by spring-cloud-stream?

Spring Cloud Stream provides two mechanisms, so let’s look at them in more details

Here, for both samples we’ll use a standard MVC endpoint method called delegateToSupplier bound to the root web context, delegating incoming requests to stream via StreamBridge mechanism.

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

Here we autowire a StreamBridge bean which allows us to send data to an output binding effectively bridging non-stream application with spring-cloud-stream. Note that preceding example does not have any source functions defined (e.g., Supplier bean) leaving the framework with no trigger to create source bindings in advance, which would be typical for cases where configuration contains function beans. And that is fine, since StreamBridge will initiate creation of output bindings (as well as destination auto-provisioning if necessary) for non existing bindings on the first call to its send(..) operation caching it for subsequent reuse (see StreamBridge and Dynamic Destinations for more details).

However, if you want to pre-create an output binding at the initialization (startup) time you can benefit from spring.cloud.stream.output-bindings property where you can declare the name of your sources. The provided name will be used as a trigger to create a source binding. You can use ; to signify multiple sources (multiple output bindings) (e.g., --spring.cloud.stream.output-bindings=foo;bar)

Also, note that streamBridge.send(..) method takes an Object for data. This means you can send POJO or Message to it and it will go through the same routine when sending output as if it was from any Function or Supplier providing the same level of consistency as with functions. This means the output type conversion, partitioning etc are honored as if it was from the output produced by functions.

StreamBridge with async send

StreamBridge uses sending mechanism provided by Spring Integration framework which is at the core of the Spring Cloud Stream. By default this mechanism uses the sender’s thread. In other words, the send is blocking. While this is ok for many cases, there are cases when you want such send to be async. To do that use setAsync(true) method of the StreamBridge before invoking one of the send methods.

Observability Context propagation with asynchronous send

When using Observability support provided by the framework as well as supporting Spring frameworks, breaking thread boundaries will affect consistency of Observability context, thus your tracing history. To avoid that all you need is to add context-propagation dependency form Micrometer (see below)

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>context-propagation</artifactId>
    <version>1.1.0</version>
</dependency>

StreamBridge and Dynamic Destinations

StreamBridge can also be used for cases when output destination(s) are not known ahead of time similar to the use cases described in Routing FROM consumer section.

Let’s look at the example

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

As you can see the preceding example is very similar to the previous one with the exception of explicit binding instruction provided via spring.cloud.stream.output-bindings property (which is not provided). Here we’re sending data to myDestination name which does not exist as a binding. Therefore such name will be treated as dynamic destination as described in Routing FROM consumer section.

In the preceding example, we are using ApplicationRunner as a foreign source to feed the stream.

A more practical example, where the foreign source is REST endpoint.

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

As you can see inside of delegateToSupplier method we’re using StreamBridge to send data to myBinding binding. And here you’re also benefiting from the dynamic features of StreamBridge where if myBinding doesn’t exist it will be created automatically and cached, otherwise existing binding will be used.

Caching dynamic destinations (bindings) could result in memory leaks in the event there are many dynamic destinations. To have some level of control we provide a self-evicting caching mechanism for output bindings with default cache size of 10. This means that if your dynamic destination size goes above that number, there is a possibility that an existing binding will be evicted and thus would need to be recreated which could cause minor performance degradation. You can increase the cache size via spring.cloud.stream.dynamic-destination-cache-size property setting it to the desired value.
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/

By showing two examples we want to emphasize the approach will work with any type of foreign sources.

If you are using the Solace PubSub+ binder, Spring Cloud Stream has reserved the scst_targetDestination header (retrievable via BinderHeaders.TARGET_DESTINATION), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header. This allows for the binder to manage the resources necessary to publish to dynamic destinations, relieving the framework from having to do so, and avoids the caching issues mentioned in the previous Note. More info here.

Output Content Type with StreamBridge

You can also provide specific content type if necessary with the following method signature public boolean send(String bindingName, Object data, MimeType outputContentType). Or if you send data as a Message, its content type will be honored.

Using specific binder type with StreamBridge

Spring Cloud Stream supports multiple binder scenarios. For example you may be receiving data from Kafka and sending it to RabbitMQ.

For more information on multiple binders scenarios, please see Binders section and specifically Multiple Binders on the Classpath

In the event you are planning to use StreamBridge and have more then one binder configured in your application you must also tell StreamBridge which binder to use. And for that there are two more variations of send method:

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

As you can see there is one additional argument that you can provide - binderType, telling BindingService which binder to use when creating dynamic binding.

For cases where spring.cloud.stream.output-bindings property is used or the binding was already created under different binder, the binderType argument will have no effect.

Using channel interceptors with StreamBridge

Since StreamBridge uses a MessageChannel to establish the output binding, you can activate channel interceptors when sending data through StreamBridge. It is up to the application to decide which channel interceptors to apply on StreamBridge. Spring Cloud Stream does not inject all the channel interceptors detected into StreamBridge unless they are annoatated with @GlobalChannelInterceptor(patterns = "*").

Let us assume that you have the following two different StreamBridge bindings in the application.

streamBridge.send("foo-out-0", message);

and

streamBridge.send("bar-out-0", message);

Now, if you want a channel interceptor applied on both the StreamBridge bindings, then you can declare the following GlobalChannelInterceptor bean.

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

However, if you don’t like the global approach above and want to have a dedicated interceptor for each binding, then you can do the following.

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

and

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

You have the flexibility to make the patterns more strict or customized to your business needs.

With this approach, the application gets the ability to decide which interceptors to inject in StreamBridge rather than applying all the available interceptors.

StreamBridge provides a contract through the StreamOperations interface that contains all the send methods of StreamBridge. Therefore, applications may choose to autowire using StreamOperations. This is handy when it comes to unit testing code that uses StreamBridge by providing a mock or similar mechanisms for the StreamOperations interface.

Reactive Functions support

Since Spring Cloud Function is build on top of Project Reactor there isn’t much you need to do to benefit from reactive programming model while implementing Supplier, Function or Consumer.

For example:

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

Few important things must be understood when choosing reactive or imperative programming model.

Fully reactive or just API?

Using reactive API does not necessarily imply that you can benefit from all of the reactive features of such API. In other words things like back-pressure and other advanced features will only work when they are working with compatible system - such as Reactive Kafka binder. In the event you are using regular Kafka or Rabbit or any other non-reactive binder, you can only benefit from the conveniences of the reactive API itself and not its advanced features, since the actual sources or targets of the stream are not reactive.

Error handling and retries

Throughout this manual you will see several reference on the framework-based error handling, retries and other features as well as configuration properties associated with them. It is important to understand that they only effect the imperative functions and you should NOT have the same expectations when it comes to reactive functions. And here is why. . . There is a fundamental difference between reactive and imperative functions. Imperative function is a message handler that is invoked by the framework on each message it receives. So for N messages there will be N invocations of such function and because of that we can wrap such function and add additional functionality such as error handling, retries etc. Reactive function is initialization function. It is invoked only once to get a reference to a Flux/Mono provided by the user to be connected with the one provided by the framework. After that we (the framework) have absolutely no visibility nor control of the stream. Therefore, with reactive functions you must rely on the richness of the reactive API when it comes to error handling and retries (i.e., doOnError(), .onError*() etc).

Functional Composition

Using functional programming model you can also benefit from functional composition where you can dynamically compose complex handlers from a set of simple functions. As an example let’s add the following function bean to the application defined above

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

and modify the spring.cloud.function.definition property to reflect your intention to compose a new function from both ‘toUpperCase’ and ‘wrapInQuotes’. To do so Spring Cloud Function relies on | (pipe) symbol. So, to finish our example our property will now look like this:

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
One of the great benefits of functional composition support provided by Spring Cloud Function is the fact that you can compose reactive and imperative functions.

The result of a composition is a single function which, as you may guess, could have a very long and rather cryptic name (e.g., foo|bar|baz|xyz. . .) presenting a great deal of inconvenience when it comes to other configuration properties. This is where descriptive binding names feature described in Functional Binding Names section can help.

For example, if we want to give our toUpperCase|wrapInQuotes a more descriptive name we can do so with the following property spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput allowing other configuration properties to refer to that binding name (e.g., spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination).

Functional Composition and Cross-cutting Concerns

Function composition effectively allows you to address complexity by breaking it down to a set of simple and individually manageable/testable components that could still be represented as one at runtime. But that is not the only benefit.

You can also use composition to address certain cross-cutting non-functional concerns, such as content enrichment. For example, assume you have an incoming message that may be lacking certain headers, or some headers are not in the exact state your business function would expect. You can now implement a separate function that addresses those concerns and then compose it with the main business function.

Let’s look at the example

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

While trivial, this example demonstrates how one function enriches the incoming Message with the additional header(s) (non-functional concern), so the other function - echo - can benefit form it. The echo function stays clean and focused on business logic only. You can also see the usage of spring.cloud.stream.function.bindings property to simplify composed binding name.

Functions with multiple input and output arguments

Starting with version 3.0 spring-cloud-stream provides support for functions that have multiple inputs and/or multiple outputs (return values). What does this actually mean and what type of use cases it is targeting?

  • Big Data: Imagine the source of data you’re dealing with is highly un-organized and contains various types of data elements (e.g., orders, transactions etc) and you effectively need to sort it out.

  • Data aggregation: Another use case may require you to merge data elements from 2+ incoming _streams.

The above describes just a few use cases where you may need to use a single function to accept and/or produce multiple streams of data. And that is the type of use cases we are targeting here.

Also, note a slightly different emphasis on the concept of streams here. The assumption is that such functions are only valuable if they are given access to the actual streams of data (not the individual elements). So for that we are relying on abstractions provided by Project Reactor (i.e., Flux and Mono) which is already available on the classpath as part of the dependencies brought in by spring-cloud-functions.

Another important aspect is representation of multiple input and outputs. While java provides variety of different abstractions to represent multiple of something those abstractions are a) unbounded, b) lack arity and c) lack type information which are all important in this context. As an example, let’s look at Collection or an array which only allows us to describe multiple of a single type or up-cast everything to an Object, affecting the transparent type conversion feature of spring-cloud-stream and so on.

So to accommodate all these requirements the initial support is relying on the signature which utilizes another abstraction provided by Project Reactor - Tuples. However, we are working on allowing a more flexible signatures.

Please refer to Binding and Binding names section to understand the naming convention used to establish binding names used by such application.

Let’s look at the few samples:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

The above example demonstrates function which takes two inputs (first of type String and second of type Integer) and produces a single output of type String.

So, for the above example the two input bindings will be gather-in-0 and gather-in-1 and for consistency the output binding also follows the same convention and is named gather-out-0.

Knowing that will allow you to set binding specific properties. For example, the following will override content-type for gather-in-0 binding:

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

The above example is somewhat of a the opposite from the previous sample and demonstrates function which takes single input of type Integer and produces two outputs (both of type String).

So, for the above example the input binding is scatter-in-0 and the output bindings are scatter-out-0 and scatter-out-1.

And you test it with the following code:

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}

Multiple functions in a single application

There may also be a need for grouping several message handlers in a single application. You would do so by defining several functions.

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

In the above example we have configuration which defines two functions uppercase and reverse. So first, as mentioned before, we need to notice that there is a a conflict (more then one function) and therefore we need to resolve it by providing spring.cloud.function.definition property pointing to the actual function we want to bind. Except here we will use ; delimiter to point to both functions (see test case below).

As with functions with multiple inputs/outputs, please refer to [Binding and Binding names] section to understand the naming convention used to establish binding names used by such application.

And you test it with the following code:

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

Batch Consumers

When using a MessageChannelBinder that supports batch listeners, and the feature is enabled for the consumer binding, you can set spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode to true to enable the entire batch of messages to be passed to the function in a List.

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}

Batch Type Conversion

Similarly to type conversion for a single message consumer, batching requires every message in the batch to be converted to the requested type. For instance, in the previous example, this type is Person.

It is also important to understand that the headers for each message in a batch are provided separately in the MessageHeaders of the Message representing the entire batch. These Messages and their corresponding batched headers are created by the respective binders, and their structure may differ. Therefore, you should refer to the binder documentation to understand the structure of batched headers. For Kafka and Rabbit, you can search for amqp_batchedHeaders and kafka_batchConvertedHeaders, respectively.

In short, if you have a message representing a batch with 5 payloads, the same message will contain a set of headers where each header corresponds to a payload with the same index.

However, what happens if a particular payload fails to convert? In a single message scenario, we simply return null and invoke your method with the unconverted message, which either results in an exception or allows you to handle the raw message, depending on your function’s signature.

In the case of batch processing, things are a bit more complex. Returning null for an unconverted payload effectively reduces the batch size. For example, if the original batch contained 5 messages, and 2 failed to convert, the converted batch will only contain 3 messages. This might be acceptable, but what about the corresponding batched headers? There will still be 5 headers, as they were created when the binder formed the initial batch. This discrepancy makes it difficult to correlate the headers with their corresponding payloads.

To address this issue, we provide the MessageConverterHelper interface.

public interface MessageConverterHelper {

	/**
	 * This method will be called by the framework in cases when a message failed to convert.
	 * It allows you to signal to the framework if such failure should be considered fatal or not.
	 *
	 * @param message failed message
	 * @return true if conversion failure must be considered fatal.
	 */
	default boolean shouldFailIfCantConvert(Message<?> message) {
		return false;
	}

	/**
	 * This method will be called by the framework in cases when a single message within batch of messages failed to convert.
	 * It provides a place for providing post-processing logic before message converter returns.
	 *
	 * @param message failed message.
	 * @param index index of failed message within the batch
	 */
	default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
	}
}

If implemented, this interface is called by the framework’s message converter logic to perform post-processing on the batched message when a particular payload cannot be converted.

The default implementations for Kafka and Rabbit automatically remove the corresponding batch headers to maintain the correlation between batched payloads and their headers. However, you can provide your own implementation and register it as a bean if you need to add custom behavior for such cases.

Additionally, the interface offers a method that allows for more deterministic handling of conversion failures. By default, this method returns false, but you can customize the implementation if you prefer to fail the entire process when a conversion error occurs.

Batch Producers

You can also use the concept of batching on the producer side by returning a collection of Messages which effectively provides an inverse effect where each message in the collection will be sent individually by the binder.

Consider the following function:

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

Each message in the returned list will be sent individually resulting in four messages sent to output destination.

Spring Integration flow as functions

When you implement a function, you may have complex requirements that fit the category of Enterprise Integration Patterns (EIP). These are best handled by using a framework such as Spring Integration (SI), which is a reference implementation of EIP.

Thankfully SI already provides support for exposing integration flows as functions via Integration flow as gateway Consider the following sample:

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
				.<String, String>transform(String::toUpperCase)
				.log(LoggingHandler.Level.WARN)
				.bridge()
				.get();
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

For those who are familiar with SI you can see we define a bean of type IntegrationFlow where we declare an integration flow that we want to expose as a Function<String, String> (using SI DSL) called uppercase. The MessageFunction interface lets us explicitly declare the type of the inputs and outputs for proper type conversion. See [Content Type Negotiation] section for more on type conversion.

To receive raw input you can use from(Function.class, …​).

The resulting function is bound to the input and output destinations exposed by the target binder.

Please refer to [Binding and Binding names] section to understand the naming convention used to establish binding names used by such application.

For more details on interoperability of Spring Integration and Spring Cloud Stream specifically around functional programming model you may find this post very interesting, as it dives a bit deeper into various patterns you can apply by merging the best of Spring Integration and Spring Cloud Stream/Functions.

Using Polled Consumers

Overview

When using polled consumers, you poll the PollableMessageSource on demand. To define binding for polled consumer you need to provide spring.cloud.stream.pollable-source property.

Consider the following example of a polled consumer binding:

--spring.cloud.stream.pollable-source=myDestination

The pollable-source name myDestination in the preceding example will result in myDestination-in-0 binding name to stay consistent with functional programming model.

Given the polled consumer in the preceding example, you might use it as follows:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

A less manual and more Spring-like alternative would be to configure a scheduled task bean. For example,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

The PollableMessageSource.poll() method takes a MessageHandler argument (often a lambda expression, as shown here). It returns true if the message was received and successfully processed.

As with message-driven consumers, if the MessageHandler throws an exception, messages are published to error channels, as discussed in Error Handling.

Normally, the poll() method acknowledges the message when the MessageHandler exits. If the method exits abnormally, the message is rejected (not re-queued), but see Handling Errors. You can override that behavior by taking responsibility for the acknowledgment, as shown in the following example:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
You must ack (or nack) the message at some point, to avoid resource leaks.
Some messaging systems (such as Apache Kafka) maintain a simple offset in a log. If a delivery fails and is re-queued with StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);, any later successfully ack’d messages are redelivered.

There is also an overloaded poll method, for which the definition is as follows:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

The type is a conversion hint that allows the incoming message payload to be converted, as shown in the following example:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

Handling Errors

By default, an error channel is configured for the pollable source; if the callback throws an exception, an ErrorMessage is sent to the error channel (<destination>.<group>.errors); this error channel is also bridged to the global Spring Integration errorChannel.

You can subscribe to either error channel with a @ServiceActivator to handle errors; without a subscription, the error will simply be logged and the message will be acknowledged as successful. If the error channel service activator throws an exception, the message will be rejected (by default) and won’t be redelivered. If the service activator throws a RequeueCurrentMessageException, the message will be requeued at the broker and will be again retrieved on a subsequent poll.

If the listener throws a RequeueCurrentMessageException directly, the message will be requeued, as discussed above, and will not be sent to the error channels.