Initial Producer Support for the RabbitMQ Stream Plugin
Basic support for the RabbitMQ Stream Plugin is now provided.
To enable this feature, you must add the spring-rabbit-stream
jar to the class path - it must be the same version as spring-amqp
and spring-rabbit
.
The producer properties described above are not supported when you set the producerType property to STREAM_SYNC or STREAM_ASYNC .
|
To configure the binder to use a stream ProducerType
, Spring Boot will configure an Environment
@Bean
from the applicaation properties.
You can, optionally, add a customizer to customize the message handler.
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.
Producer Support for the RabbitMQ Super Streams
See Super Streams for information about super streams.
Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream. Using Spring Cloud Stream, you can publish to a super stream either over AMQP, or using the stream client.
The super stream must already exist; creating a super stream is not supported by producer bindings. |
Publishing to a super stream over AMQP:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
Publishing to a super stream using the stream client:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
When using the stream client, if you set a confirmAckChannel
, a copy of a successfully sent message will be sent to that channel.