Event Routing

Event Routing, in the context of Spring Cloud Stream, is the ability to either a) route events to a particular event subscriber or b) route events produced by an event subscriber to a particular destination. Here we’ll refer to it as route ‘TO’ and route ‘FROM’.

Routing TO Consumer

Routing can be achieved by relying on RoutingFunction available in Spring Cloud Function 3.0. All you need to do is enable it via --spring.cloud.stream.function.routing.enabled=true application property or provide spring.cloud.function.routing-expression property. Once enabled RoutingFunction will be bound to input destination receiving all the messages and route them to other functions based on the provided instruction.

For the purposes of binding the name of the routing destination is functionRouter-in-0 (see RoutingFunction.FUNCTION_NAME and binding naming convention [Functional binding names]).

Instruction could be provided with individual messages as well as application properties.

Here are couple of samples:

Using message headers

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

By sending a message to the functionRouter-in-0 destination exposed by the binder (i.e., rabbit, kafka), such message will be routed to the appropriate (‘even’ or ‘odd’) Consumer.

By default RoutingFunction will look for a spring.cloud.function.definition or spring.cloud.function.routing-expression (for more dynamic scenarios with SpEL) header and if it is found, its value will be treated as the routing instruction.

For example, setting spring.cloud.function.routing-expression header to value T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' will end up semi-randomly routing request to either odd or even functions. Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well …​.routing-expression=headers['type']

Using application properties

The spring.cloud.function.routing-expression and/or spring.cloud.function.definition can be passed as application properties (e.g., spring.cloud.function.routing-expression=headers['type'].

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
Passing instructions via application properties is especially important for reactive functions given that a reactive function is only invoked once to pass the Publisher, so access to the individual items is limited.

Routing Function and output binding

RoutingFunction is a Function and as such treated no differently than any other function. Well. . . almost.

When RoutingFunction routes to another Function, its output is sent to the output binding of the RoutingFunction which is functionRouter-in-0 as expected. But what if RoutingFunction routes to a Consumer? In other words the result of invocation of the RoutingFunction may not produce anything to be sent to the output binding, thus making it necessary to even have one. So, we do treat RoutingFunction a little bit differently when we create bindings. And even though it is transparent to you as a user (there is really nothing for you to do), being aware of some of the mechanics would help you understand its inner workings.

So, the rule is; We never create output binding for the RoutingFunction, only input. So when you routing to Consumer, the RoutingFunction effectively becomes as a Consumer by not having any output bindings. However, if RoutingFunction happen to route to another Function which produces the output, the output binding for the RoutingFunction will be create dynamically at which point RoutingFunction will act as a regular Function with regards to bindings (having both input and output bindings).

Routing FROM Consumer

Aside from static destinations, Spring Cloud Stream lets applications send messages to dynamically bound destinations. This is useful, for example, when the target destination needs to be determined at runtime. Applications can do so in one of two ways.

spring.cloud.stream.sendto.destination

You can also delegate to the framework to dynamically resolve the output destination by specifying spring.cloud.stream.sendto.destination header set to the name of the destination to be resolved.

Consider the following example:

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

Albeit trivial you can clearly see in this example, our output is a Message with spring.cloud.stream.sendto.destination header set to the value of he input argument. The framework will consult this header and will attempt to create or discover a destination with that name and send output to it.

If destination names are known in advance, you can configure the producer properties as with any other destination. Alternatively, if you register a NewDestinationBindingCallback<> bean, it is invoked just before the binding is created. The callback takes the generic type of the extended producer properties used by the binder. It has one method:

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

The following example shows how to use the RabbitMQ binder:

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
If you need to support dynamic destinations with multiple binder types, use Object for the generic type and cast the extended argument as needed.

Also, please see [Using StreamBridge] section to see how yet another option (StreamBridge) can be utilized for similar cases.