To use the RabbitMQ binder, you can add it to your Spring Cloud Stream application, by using the following Maven coordinates:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
Alternatively, you can use the Spring Cloud Stream RabbitMQ Starter, as follows:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
The following simplified diagram shows how the RabbitMQ binder operates:
By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange
.
For each consumer group, a Queue
is bound to that TopicExchange
.
Each consumer instance has a corresponding RabbitMQ Consumer
instance for its group’s Queue
.
For partitioned producers and consumers, the queues are suffixed with the partition index and use the partition index as the routing key.
For anonymous consumers (those with no group
property), an auto-delete queue (with a randomized unique name) is used.
By using the optional autoBindDlq
option, you can configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange DLX
, as well as routing infrastructure).
By default, the dead letter queue has the name of the destination, appended with .dlq
.
If retry is enabled (maxAttempts > 1
), failed messages are delivered to the DLQ after retries are exhausted.
If retry is disabled (maxAttempts = 1
), you should set requeueRejected
to false
(the default) so that failed messages are routed to the DLQ, instead of being re-queued.
In addition, republishToDlq
causes the binder to publish a failed message to the DLQ (instead of rejecting it).
This feature lets additional information (such as the stack trace in the x-exception-stacktrace
header) be added to the message in headers.
This option does not need retry enabled.
You can republish a failed message after just one attempt.
Starting with version 1.2, you can configure the delivery mode of republished messages.
See the republishDeliveryMode
property.
Important | |
---|---|
Setting |
See Section 18.3.1, “RabbitMQ Binder Properties” for more information about these properties.
The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue). Some options are described in Section 18.6, “Dead-Letter Queue Processing”.
Note | |
---|---|
When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from |
Starting with version 2.0, the RabbitMessageChannelBinder
sets the RabbitTemplate.userPublisherConnection
property to true
so that the non-transactional producers avoid deadlocks on consumers, which can happen if cached connections are blocked because of a memory alarm on the broker.
Note | |
---|---|
Currently, a |
This section contains settings specific to the RabbitMQ Binder and bound channels.
For general binding configuration options and properties, see the Spring Cloud Stream core documentation.
By default, the RabbitMQ binder uses Spring Boot’s ConnectionFactory
.
Conseuqently, it supports all Spring Boot configuration options for RabbitMQ.
(For reference, see the Spring Boot documentation).
RabbitMQ configuration options use the spring.rabbitmq
prefix.
In addition to Spring Boot options, the RabbitMQ binder supports the following properties:
A comma-separated list of RabbitMQ management plugin URLs.
Only used when nodes
contains more than one entry.
Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses
.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
Default: empty.
A comma-separated list of RabbitMQ node names.
When more than one entry, used to locate the server address where a queue is located.
Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses
.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
Default: empty.
The compression level for compressed bindings.
See java.util.zip.Deflater
.
Default: 1
(BEST_LEVEL).
A connection name prefix used to name the connection(s) created by this binder.
The name is this prefix followed by #n
, where n
increments each time a new connection is opened.
Default: none (Spring AMQP default).
The following properties are available for Rabbit consumers only and must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
.
The acknowledge mode.
Default: AUTO
.
Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
The routing key with which to bind the queue to the exchange (if bindQueue
is true
).
For partitioned destinations, -<instanceIndex>
is appended.
Default: #
.
Whether to bind the queue to the destination exchange.
Set it to false
if you have set up your own infrastructure and have previously created and bound the queue.
Default: true
.
Used to create the consumer tag(s); will be appended by #n
where n
increments for each consumer created.
Example: ${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}
.
Default: none - the broker will generate random consumer tags.
The name of the DLQ
Default: prefix+destination.dlq
A DLX to assign to the queue.
Relevant only if autoBindDlq
is true
.
Default: 'prefix+DLX'
The type of the DLX to assign to the queue.
Relevant only if autoBindDlq
is true
.
Default: 'direct'
A dead letter routing key to assign to the queue.
Relevant only if autoBindDlq
is true
.
Default: destination
Whether to declare the dead letter exchange for the destination.
Relevant only if autoBindDlq
is true
.
Set to false
if you have a pre-configured DLX.
Default: true
.
Whether to declare the exchange for the destination.
Default: true
.
Whether to declare the exchange as a Delayed Message Exchange
.
Requires the delayed message exchange plugin on the broker.
The x-delayed-type
argument is set to the exchangeType
.
Default: false
.
If a DLQ is declared, a DLX to assign to that queue.
Default: none
If a DLQ is declared, a dead letter routing key to assign to that queue.
Default: none
How long before an unused dead letter queue is deleted (in milliseconds).
Default: no expiration
Declare the dead letter queue with the x-queue-mode=lazy
argument.
See “Lazy Queues”.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Default: false
.
Maximum number of messages in the dead letter queue.
Default: no limit
Maximum number of total bytes in the dead letter queue from all messages.
Default: no limit
Maximum priority of messages in the dead letter queue (0-255).
Default: none
Action to take when dlqMaxLength
or dlqMaxLengthBytes
is exceeded; currently drop-head
or reject-publish
but refer to the RabbitMQ documentation.
Default: none
Default time to live to apply to the dead letter queue when declared (in milliseconds).
Default: no limit
Whether the subscription should be durable.
Only effective if group
is also set.
Default: true
.
If declareExchange
is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).
Default: true
.
If declareExchange
is true, whether the exchange should be durable (that is, it survives broker restart).
Default: true
.
The exchange type: direct
, fanout
or topic
for non-partitioned destinations and direct
or topic
for partitioned destinations.
Default: topic
.
Whether to create an exclusive consumer.
Concurrency should be 1 when this is true
.
Often used when strict ordering is required but enabling a hot standby instance to take over after a failure.
See recoveryInterval
, which controls how often a standby instance attempts to consume.
Default: false
.
How long before an unused queue is deleted (in milliseconds).
Default: no expiration
The interval (in milliseconds) between attempts to consume from a queue if it is missing.
Default: 5000
Patterns for headers to be mapped from inbound messages.
Default: ['*']
(all headers).
Declare the queue with the x-queue-mode=lazy
argument.
See “Lazy Queues”.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Default: false
.
The maximum number of consumers.
Default: 1
.
The maximum number of messages in the queue.
Default: no limit
The maximum number of total bytes in the queue from all messages.
Default: no limit
The maximum priority of messages in the queue (0-255).
Default: none
When the queue cannot be found, whether to treat the condition as fatal and stop the listener container.
Defaults to false
so that the container keeps trying to consume from the queue — for example, when using a cluster and the node hosting a non-HA queue is down.
Default: false
Action to take when maxLength
or maxLengthBytes
is exceeded; currently drop-head
or reject-publish
but refer to the RabbitMQ documentation.
Default: none
Prefetch count.
Default: 1
.
A prefix to be added to the name of the destination
and queues.
Default: "".
The number of times to retry consuming from a queue if it is missing.
Relevant only when missingQueuesFatal
is true
.
Otherwise, the container keeps retrying indefinitely.
Default: 3
When true, consume from a queue with a name equal to the group
.
Otherwise the queue name is destination.group
.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
Default: false.
The interval between connection recovery attempts, in milliseconds.
Default: 5000
.
Whether delivery failures should be re-queued when retry is disabled or republishToDlq
is false
.
Default: false
.
When republishToDlq
is true
, specifies the delivery mode of the republished message.
Default: DeliveryMode.PERSISTENT
By default, messages that fail after retries are exhausted are rejected.
If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ.
If set to true
, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.
Default: false
Whether to use transacted channels.
Default: false
.
Default time to live to apply to the queue when declared (in milliseconds).
Default: no limit
The number of deliveries between acks.
Default: 1
.
To set listener container properties that are not exposed as binder or binding properties, add a single bean of type ListenerContainerCustomizer
to the application context.
The binder and binding properties will be set and then the customizer will be called.
The customizer (configure()
method) is provided with the queue name as well as the consumer group as arguments.
The following properties are available for Rabbit producers only and
must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.producer.
.
Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
Whether to enable message batching by producers.
Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', batchBufferLimit
, and batchTimeout
.
See Batching for more information.
Default: false
.
The number of messages to buffer when batching is enabled.
Default: 100
.
The maximum buffer size when batching is enabled.
Default: 10000
.
The batch timeout when batching is enabled.
Default: 5000
.
The routing key with which to bind the queue to the exchange (if bindQueue
is true
).
Only applies to non-partitioned destinations.
Only applies if requiredGroups
are provided and then only to those groups.
Default: #
.
Whether to bind the queue to the destination exchange.
Set it to false
if you have set up your own infrastructure and have previously created and bound the queue.
Only applies if requiredGroups
are provided and then only to those groups.
Default: true
.
Whether data should be compressed when sent.
Default: false
.
The name of the DLQ
Only applies if requiredGroups
are provided and then only to those groups.
Default: prefix+destination.dlq
A DLX to assign to the queue.
Relevant only when autoBindDlq
is true
.
Applies only when requiredGroups
are provided and then only to those groups.
Default: 'prefix+DLX'
The type of the DLX to assign to the queue.
Relevant only if autoBindDlq
is true
.
Applies only when requiredGroups
are provided and then only to those groups.
Default: 'direct'
A dead letter routing key to assign to the queue.
Relevant only when autoBindDlq
is true
.
Applies only when requiredGroups
are provided and then only to those groups.
Default: destination
Whether to declare the dead letter exchange for the destination.
Relevant only if autoBindDlq
is true
.
Set to false
if you have a pre-configured DLX.
Applies only when requiredGroups
are provided and then only to those groups.
Default: true
.
Whether to declare the exchange for the destination.
Default: true
.
A SpEL expression to evaluate the delay to apply to the message (x-delay
header).
It has no effect if the exchange is not a delayed message exchange.
Default: No x-delay
header is set.
Whether to declare the exchange as a Delayed Message Exchange
.
Requires the delayed message exchange plugin on the broker.
The x-delayed-type
argument is set to the exchangeType
.
Default: false
.
The delivery mode.
Default: PERSISTENT
.
When a DLQ is declared, a DLX to assign to that queue.
Applies only if requiredGroups
are provided and then only to those groups.
Default: none
When a DLQ is declared, a dead letter routing key to assign to that queue.
Applies only when requiredGroups
are provided and then only to those groups.
Default: none
How long (in milliseconds) before an unused dead letter queue is deleted.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no expiration
x-queue-mode=lazy
argument.
See “Lazy Queues”.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when requiredGroups
are provided and then only to those groups.Maximum number of messages in the dead letter queue.
Applies only if requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum number of total bytes in the dead letter queue from all messages.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum priority of messages in the dead letter queue (0-255)
Applies only when requiredGroups
are provided and then only to those groups.
Default: none
Default time (in milliseconds) to live to apply to the dead letter queue when declared.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
If declareExchange
is true
, whether the exchange should be auto-delete (it is removed after the last queue is removed).
Default: true
.
If declareExchange
is true
, whether the exchange should be durable (survives broker restart).
Default: true
.
The exchange type: direct
, fanout
or topic
for non-partitioned destinations and direct
or topic
for partitioned destinations.
Default: topic
.
How long (in milliseconds) before an unused queue is deleted.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no expiration
Patterns for headers to be mapped to outbound messages.
Default: ['*']
(all headers).
Declare the queue with the x-queue-mode=lazy
argument.
See “Lazy Queues”.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when requiredGroups
are provided and then only to those groups.
Default: false
.
Maximum number of messages in the queue.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum number of total bytes in the queue from all messages.
Only applies if requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum priority of messages in the queue (0-255).
Only applies if requiredGroups
are provided and then only to those groups.
Default: none
A prefix to be added to the name of the destination
exchange.
Default: "".
When true
, consume from a queue with a name equal to the group
.
Otherwise the queue name is destination.group
.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
Applies only when requiredGroups
are provided and then only to those groups.
Default: false.
A SpEL expression to determine the routing key to use when publishing messages.
For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey'
in a properties file or routingKeyExpression: '''my.routingKey'''
in a YAML file.
Default: destination
or destination-<partition>
for partitioned destinations.
Whether to use transacted channels.
Default: false
.
Default time (in milliseconds) to live to apply to the queue when declared.
Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
Note | |
---|---|
In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport — including transports, such as Kafka (prior to 0.11), that do not natively support headers. |
When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured. This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread. An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself. See “Section 18.3.1, “RabbitMQ Binder Properties”” for more information about the properties discussed here. You can use the following example configuration to enable this feature:
autoBindDlq
to true
.
The binder create a DLQ.
Optionally, you can specify a name in deadLetterQueueName
.dlqTtl
to the back off time you want to wait between redeliveries.dlqDeadLetterExchange
to the default exchange.
Expired messages from the DLQ are routed to the original queue, because the default deadLetterRoutingKey
is the queue name (destination.group
).
Setting to the default exchange is achieved by setting the property with no value, as shown in the next example.To force a message to be dead-lettered, either throw an AmqpRejectAndDontRequeueException
or set requeueRejected
to true
(the default) and throw any exception.
The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts.
Fortunately, RabbitMQ provides the x-death
header, which lets you determine how many cycles have occurred.
To acknowledge a message after giving up, throw an ImmediateAcknowledgeAmqpException
.
The following configuration creates an exchange myDestination
with queue myDestination.consumerGroup
bound to a topic exchange with a wildcard routing key #
:
--- spring.cloud.stream.bindings.input.destination=myDestination spring.cloud.stream.bindings.input.group=consumerGroup #disable binder retries spring.cloud.stream.bindings.input.consumer.max-attempts=1 #dlx/dlq setup spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000 spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange= ---
This configuration creates a DLQ bound to a direct exchange (DLX
) with a routing key of myDestination.consumerGroup
.
When messages are rejected, they are routed to the DLQ.
After 5 seconds, the message expires and is routed to the original queue by using the queue name as the routing key, as shown in the following example:
Spring Boot application.
@SpringBootApplication @EnableBinding(Sink.class) public class XDeathApplication { public static void main(String[] args) { SpringApplication.run(XDeathApplication.class, args); } @StreamListener(Sink.INPUT) public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) { if (death != null && death.get("count").equals(3L)) { // giving up - don't send to DLX throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts"); } throw new AmqpRejectAndDontRequeueException("failed"); } }
Notice that the count property in the x-death
header is a Long
.
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel. See “???” for more information.
RabbitMQ has two types of send failures:
The latter is rare. According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".
As well as enabling producer error channels (as described in “???”), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows:
ccf.setPublisherConfirms(true);
ccf.setPublisherReturns(true);
When using Spring Boot configuration for the connection factory, set the following properties:
spring.rabbitmq.publisher-confirms
spring.rabbitmq.publisher-returns
The payload of the ErrorMessage
for a returned message is a ReturnedAmqpMessageException
with the following properties:
failedMessage
: The spring-messaging Message<?>
that failed to be sent.amqpMessage
: The raw spring-amqp Message
.replyCode
: An integer value indicating the reason for the failure (for example, 312 - No route).replyText
: A text value indicating the reason for the failure (for example, NO_ROUTE
).exchange
: The exchange to which the message was published.routingKey
: The routing key used when the message was published.For negatively acknowledged confirmations, the payload is a NackedAmqpMessageException
with the following properties:
failedMessage
: The spring-messaging Message<?>
that failed to be sent.nackReason
: A reason (if available — you may need to examine the broker logs for more information).There is no automatic handling of these exceptions (such as sending to a dead-letter queue). You can consume these exceptions with your own Spring Integration flow.
Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third “parking lot” queue after three attempts.
The second example uses the RabbitMQ Delayed Message Exchange to introduce a delay to the re-queued message.
In this example, the delay increases for each attempt.
These examples use a @RabbitListener
to receive messages from the DLQ.
You could also use RabbitTemplate.receive()
in a batch process.
The examples assume the original destination is so8400in
and the consumer group is so8400
.
The first two examples are for when the destination is not partitioned:
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1); this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String DELAY_EXCHANGE = "dlqReRouter"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); headers.put("x-delay", 5000 * retriesHeader); this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public DirectExchange delayExchange() { DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE); exchange.setDelayed(true); return exchange; } @Bean public Binding bindOriginalToDelay() { return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE); } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.
When republishToDlq
is false
, RabbitMQ publishes the message to the DLX/DLQ with an x-death
header containing information about the original destination, as shown in the following example:
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_DEATH_HEADER = "x-death"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @SuppressWarnings("unchecked") @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER); String exchange = (String) xDeath.get(0).get("exchange"); List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys"); this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
When republishToDlq
is true
, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:
@SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE; private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER); String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER); this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
RabbitMQ does not support partitioning natively.
Sometimes, it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.
The RabbitMessageChannelBinder
provides partitioning by binding a queue for each partition to the destination exchange.
The following Java and YAML examples show how to configure the producer:
Producer.
@SpringBootApplication @EnableBinding(Source.class) public class RabbitPartitionProducerApplication { private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final String[] data = new String[] { "abc1", "def1", "qux1", "abc2", "def2", "qux2", "abc3", "def3", "qux3", "abc4", "def4", "qux4", }; public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionProducerApplication.class) .web(false) .run(args); } @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000")) public Message<?> generate() { String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); return MessageBuilder.withPayload(value) .setHeader("partitionKey", value) .build(); } }
application.yml.
spring: cloud: stream: bindings: output: destination: partitioned.destination producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 required-groups: - myGroup
Note | |
---|---|
The configuration in the prececing example uses the default partitioning ( The |
The following configuration provisions a topic exchange:
The following queues are bound to that exchange:
The following bindings associate the queues to the exchange:
The following Java and YAML examples continue the previous examples and show how to configure the consumer:
Consumer.
@SpringBootApplication @EnableBinding(Sink.class) public class RabbitPartitionConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class) .web(false) .run(args); } @StreamListener(Sink.INPUT) public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { System.out.println(in + " received from queue " + queue); } }
application.yml.
spring: cloud: stream: bindings: input: destination: partitioned.destination group: myGroup consumer: partitioned: true instance-index: 0
Important | |
---|---|
The |