Using the RabbitMQ Stream Plugin
Version 2.4 introduces initial support for the RabbitMQ Stream Plugin Java Client for the RabbitMQ Stream Plugin.
-
RabbitStreamTemplate
-
StreamListenerContainer
Add the spring-rabbit-stream
dependency to your project:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.2.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.0'
You can provision the queues as normal, using a RabbitAdmin
bean, using the QueueBuilder.stream()
method to designate the queue type.
For example:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
However, this will only work if you are also using non-stream components (such as the SimpleMessageListenerContainer
or DirectMessageListenerContainer
) because the admin is triggered to declare the defined beans when an AMQP connection is opened.
If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a StreamAdmin
instead:
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
Refer to the RabbitMQ documentation for more information about the StreamCreator
.
Sending Messages
The RabbitStreamTemplate
provides a subset of the RabbitTemplate
(AMQP) functionality.
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
The RabbitStreamTemplate
implementation has the following constructor and properties:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
The MessageConverter
is used in the convertAndSend
methods to convert the object to a Spring AMQP Message
.
The StreamMessageConverter
is used to convert from a Spring AMQP Message
to a native stream Message
.
You can also send native stream Message
s directly; with the messageBuilder()
method providing access to the Producer
's message builder.
The ProducerCustomizer
provides a mechanism to customize the producer before it is built.
Refer to the Java Client Documentation about customizing the Environment
and Producer
.
Receiving Messages
Asynchronous message reception is provided by the StreamListenerContainer
(and the StreamRabbitListenerContainerFactory
when using @RabbitListener
).
The listener container requires an Environment
as well as a single stream name.
You can either receive Spring AMQP Message
s using the classic MessageListener
, or you can receive native stream Message
s using a new interface:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
See Message Listener Container Configuration for information about supported properties.
Similar the template, the container has a ConsumerCustomizer
property.
Refer to the Java Client Documentation about customizing the Environment
and Consumer
.
When using @RabbitListener
, configure a StreamRabbitListenerContainerFactory
; at this time, most @RabbitListener
properties (concurrency
, etc) are ignored. Only id
, queues
, autoStartup
and containerFactory
are supported.
In addition, queues
can only contain one stream name.
Examples
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
Version 2.4.5 added the adviceChain
property to the StreamListenerContainer
(and its factory).
A new factory bean is also provided to create a stateless retry interceptor with an optional StreamMessageRecoverer
for use when consuming raw stream messages.
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
Stateful retry is not supported with this container. |
Super Streams
A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument x-super-stream: true
.
Provisioning
For convenience, a super stream can be provisioned by defining a single bean of type SuperStream
.
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
The RabbitAdmin
detects this bean and will declare the exchange (my.super.stream
) and 3 queues (partitions) - my.super-stream-n
where n
is 0
, 1
, 2
, bound with routing keys equal to n
.
If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
The number of keys must equal the number of partitions.
Producing to a SuperStream
You must add a superStreamRoutingFunction
to the RabbitStreamTemplate
:
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
You can also publish over AMQP, using the RabbitTemplate
.
Consuming Super Streams with Single Active Consumers
Invoke the superStream
method on the listener container to enable a single active consumer on a super stream.
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the Environment ; to achieve full concurrency, set the environment’s maxConsumersByConnection to 1.
See Configuring the Environment.
|
Micrometer Observation
Using Micrometer for observation is now supported, since version 3.0.5, for the RabbitStreamTemplate
and the stream listener container.
The container now also supports Micrometer timers (when observation is not enabled).
Set observationEnabled
on each component to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.
When using annotated listeners, set observationEnabled
on the container factory.
Refer to Micrometer Tracing for more information.
To add tags to timers/traces, configure a custom RabbitStreamTemplateObservationConvention
or RabbitStreamListenerObservationConvention
to the template or listener container, respectively.
The default implementations add the name
tag for template observations and listener.id
tag for containers.
You can either subclass DefaultRabbitStreamTemplateObservationConvention
or DefaultStreamRabbitListenerObservationConvention
or provide completely new implementations.
See Micrometer Observation Documentation for more details.