For using the RabbitMQ binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
Alternatively, you can also use the Spring Cloud Stream RabbitMQ Starter.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
A simplified diagram of how the RabbitMQ binder operates can be seen below.
The RabbitMQ Binder implementation maps each destination to a TopicExchange
.
For each consumer group, a Queue
will be bound to that TopicExchange
.
Each consumer instance have a corresponding RabbitMQ Consumer
instance for its group’s Queue
.
For partitioned producers/consumers the queues are suffixed with the partition index and use the partition index as routing key.
Using the autoBindDlq
option, you can optionally configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange DLX
).
The dead letter queue has the name of the destination, appended with .dlq
.
If retry is enabled (maxAttempts > 1
) failed messages will be delivered to the DLQ.
If retry is disabled (maxAttempts = 1
), you should set requeueRejected
to false so the failed message will be routed to the DLQ, instead of being requeued.
In addition, republishToDlq
causes the binder to publish a failed message to the DLQ (instead of rejecting it); this enables additional information to be added to the message in headers, such as the stack trace in the x-exception-stacktrace
header.
This option does not need retry enabled or the requeueRejected
property set to true
.
See Section 13.3.1, “RabbitMQ Binder Properties” for more information about these properties.
The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue). Some options are described in Section 13.4, “Dead-Letter Queue Processing”.
This section contains settings specific to the RabbitMQ Binder and bound channels.
For general binding configuration options and properties, please refer to the Spring Cloud Stream core documentation.
By default, the RabbitMQ binder uses Spring Boot’s ConnectionFactory
, and it therefore supports all Spring Boot configuration options for RabbitMQ.
(For reference, consult the Spring Boot documentation.)
RabbitMQ configuration options use the spring.rabbitmq
prefix.
In addition to Spring Boot options, the RabbitMQ binder supports the following properties:
A comma-separated list of RabbitMQ management plugin URLs.
Only used when nodes
contains more than one entry.
Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses
.
Default: empty.
A comma-separated list of RabbitMQ node names.
When more than one entry, used to locate the server address where a queue is located.
Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses
.
Default: empty.
Compression level for compressed bindings.
See java.util.zip.Deflater
.
Default: 1
(BEST_LEVEL).
The following properties are available for Rabbit consumers only and
must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
.
The acknowledge mode.
Default: AUTO
.
Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
Whether subscription should be durable.
Only effective if group
is also set.
Default: true
.
1
.Prefetch count.
Default: 1
.
A prefix to be added to the name of the destination
and queues.
Default: "".
The interval between connection recovery attempts, in milliseconds.
Default: 5000
.
Whether delivery failures should be requeued.
Default: true
.
The request headers to be transported.
Default: [STANDARD_REQUEST_HEADERS,'*']
.
The reply headers to be transported.
Default: [STANDARD_REPLY_HEADERS,'*']
.
true
, the bus will republish failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.Whether to use transacted channels.
Default: false
.
The number of deliveries between acks.
Default: 1
.
The following properties are available for Rabbit producers only and
must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.producer.
.
Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
Whether to enable message batching by producers.
Default: false
.
The number of messages to buffer when batching is enabled.
Default: 100
.
10000
.5000
.Whether data should be compressed when sent.
Default: false
.
Delivery mode.
Default: PERSISTENT
.
A prefix to be added to the name of the destination
exchange.
Default: "".
The request headers to be transported.
Default: [STANDARD_REQUEST_HEADERS,'*']
.
The reply headers to be transported.
Default: [STANDARD_REPLY_HEADERS,'*']
.
Note | |
---|---|
In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport (including transports, such as Kafka, that do not normally support headers). |
Because it can’t be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following spring-boot
application is an example of how to route those messages back to the original queue, but moves them to a third "parking lot" queue after three attempts.
The second example utilizes the RabbitMQ Delayed Message Exchange to introduce a delay to the requeued message.
In this example, the delay increases for each attempt.
These examples use a @RabbitListener
to receive messages from the DLQ, you could also use RabbitTemplate.receive()
in a batch process.
The examples assume the original destination is so8400in
and the consumer group is so8400
.
The first two examples are when the destination is not partitioned.
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1); this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String DELAY_EXCHANGE = "dlqReRouter"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); headers.put("x-delay", 5000 * retriesHeader); this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public DirectExchange delayExchange() { DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE); exchange.setDelayed(true); return exchange; } @Bean public Binding bindOriginalToDelay() { return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE); } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
With partitioned destinations, there is one DLQ for all partitions and we determine the original queue from the headers.
When republishToDlq
is false
, RabbitMQ publishes the message to the DLX/DLQ with an x-death
header containing information about the original destination.
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_DEATH_HEADER = "x-death"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @SuppressWarnings("unchecked") @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER); String exchange = (String) xDeath.get(0).get("exchange"); List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys"); this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
When republishToDlq
is true
, the republishing recoverer adds the original exchange and routing key to headers.
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE; private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER); String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER); this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }