Using the RabbitMQ Stream Plugin

Version 2.4 introduces initial support for the RabbitMQ Stream Plugin Java Client for the RabbitMQ Stream Plugin.

  • RabbitStreamTemplate

  • StreamListenerContainer

Add the spring-rabbit-stream dependency to your project:

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.2.0</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.0'

You can provision the queues as normal, using a RabbitAdmin bean, using the QueueBuilder.stream() method to designate the queue type. For example:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

However, this will only work if you are also using non-stream components (such as the SimpleMessageListenerContainer or DirectMessageListenerContainer) because the admin is triggered to declare the defined beans when an AMQP connection is opened. If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a StreamAdmin instead:

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

Refer to the RabbitMQ documentation for more information about the StreamCreator.

Sending Messages

The RabbitStreamTemplate provides a subset of the RabbitTemplate (AMQP) functionality.

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

The RabbitStreamTemplate implementation has the following constructor and properties:

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

The MessageConverter is used in the convertAndSend methods to convert the object to a Spring AMQP Message.

The StreamMessageConverter is used to convert from a Spring AMQP Message to a native stream Message.

You can also send native stream Message s directly; with the messageBuilder() method providing access to the Producer 's message builder.

The ProducerCustomizer provides a mechanism to customize the producer before it is built.

Refer to the Java Client Documentation about customizing the Environment and Producer.

Receiving Messages

Asynchronous message reception is provided by the StreamListenerContainer (and the StreamRabbitListenerContainerFactory when using @RabbitListener).

The listener container requires an Environment as well as a single stream name.

You can either receive Spring AMQP Message s using the classic MessageListener, or you can receive native stream Message s using a new interface:

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

See Message Listener Container Configuration for information about supported properties.

Similar the template, the container has a ConsumerCustomizer property.

Refer to the Java Client Documentation about customizing the Environment and Consumer.

When using @RabbitListener, configure a StreamRabbitListenerContainerFactory; at this time, most @RabbitListener properties (concurrency, etc) are ignored. Only id, queues, autoStartup and containerFactory are supported. In addition, queues can only contain one stream name.

Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

Version 2.4.5 added the adviceChain property to the StreamListenerContainer (and its factory). A new factory bean is also provided to create a stateless retry interceptor with an optional StreamMessageRecoverer for use when consuming raw stream messages.

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
Stateful retry is not supported with this container.

Super Streams

A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument x-super-stream: true.

Provisioning

For convenience, a super stream can be provisioned by defining a single bean of type SuperStream.

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

The RabbitAdmin detects this bean and will declare the exchange (my.super.stream) and 3 queues (partitions) - my.super-stream-n where n is 0, 1, 2, bound with routing keys equal to n.

If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

The number of keys must equal the number of partitions.

Producing to a SuperStream

You must add a superStreamRoutingFunction to the RabbitStreamTemplate:

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

You can also publish over AMQP, using the RabbitTemplate.

Consuming Super Streams with Single Active Consumers

Invoke the superStream method on the listener container to enable a single active consumer on a super stream.

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the Environment; to achieve full concurrency, set the environment’s maxConsumersByConnection to 1. See Configuring the Environment.

Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0.5, for the RabbitStreamTemplate and the stream listener container. The container now also supports Micrometer timers (when observation is not enabled).

Set observationEnabled on each component to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation. When using annotated listeners, set observationEnabled on the container factory.

Refer to Micrometer Tracing for more information.

To add tags to timers/traces, configure a custom RabbitStreamTemplateObservationConvention or RabbitStreamListenerObservationConvention to the template or listener container, respectively.

The default implementations add the name tag for template observations and listener.id tag for containers.

You can either subclass DefaultRabbitStreamTemplateObservationConvention or DefaultStreamRabbitListenerObservationConvention or provide completely new implementations.

See Micrometer Observation Documentation for more details.