Initial Consumer Support for the RabbitMQ Stream Plugin

Basic support for the RabbitMQ Stream Plugin is now provided. To enable this feature, you must add the spring-rabbit-stream jar to the class path - it must be the same version as spring-amqp and spring-rabbit.

The consumer properties described above are not supported when you set the containerType property to stream; concurrency is supported for super streams only. Only a single stream queue can be consumed by each binding.

To configure the binder to use containerType=stream, Spring Boot will automatically configure an Environment @Bean from the application properties. You can, optionally, add a customizer to customize the listener container.

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

The name argument passed to the customizer is destination + '.' + group + '.container'.

The stream name() (for the purpose of offset tracking) is set to the binding destination + '.' + group. It can be changed using a ConsumerCustomizer shown above. If you decide to use manual offset tracking, the Context is available as a message header:

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and consumer builder.

Consumer Support for the RabbitMQ Super Streams

See Super Streams for information about super streams.

Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream.

Configuration example:

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

The framework will create a super stream named super, with 9 partitions. Up to 3 instances of this application can be deployed.