15. RabbitMQ Binder

15.1 Usage

For using the RabbitMQ binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Alternatively, you can also use the Spring Cloud Stream RabbitMQ Starter.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

15.2 RabbitMQ Binder Overview

A simplified diagram of how the RabbitMQ binder operates can be seen below.

Figure 15.1. RabbitMQ Binder

rabbit binder

The RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue will be bound to that TopicExchange. Each consumer instance have a corresponding RabbitMQ Consumer instance for its group’s Queue. For partitioned producers/consumers the queues are suffixed with the partition index and use the partition index as routing key.

Using the autoBindDlq option, you can optionally configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange DLX). The dead letter queue has the name of the destination, appended with .dlq. If retry is enabled (maxAttempts > 1) failed messages will be delivered to the DLQ. If retry is disabled (maxAttempts = 1), you should set requeueRejected to false (default) so that a failed message will be routed to the DLQ, instead of being requeued. In addition, republishToDlq causes the binder to publish a failed message to the DLQ (instead of rejecting it); this enables additional information to be added to the message in headers, such as the stack trace in the x-exception-stacktrace header. This option does not need retry enabled; you can republish a failed message after just one attempt. Starting with version 1.2, you can configure the delivery mode of republished messsages; see property republishDeliveryMode.

[Important]Important

Setting requeueRejected to true will cause the message to be requeued and redelivered continually, which is likely not what you want unless the failure issue is transient. In general, it’s better to enable retry within the binder by setting maxAttempts to greater than one, or set republishToDlq to true.

See Section 15.3.1, “RabbitMQ Binder Properties” for more information about these properties.

The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue). Some options are described in Section 15.5, “Dead-Letter Queue Processing”.

[Note]Note

When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from RabbitAutoConfiguration being applied to the two binders.

15.3 Configuration Options

This section contains settings specific to the RabbitMQ Binder and bound channels.

For general binding configuration options and properties, please refer to the Spring Cloud Stream core documentation.

15.3.1 RabbitMQ Binder Properties

By default, the RabbitMQ binder uses Spring Boot’s ConnectionFactory, and it therefore supports all Spring Boot configuration options for RabbitMQ. (For reference, consult the Spring Boot documentation.) RabbitMQ configuration options use the spring.rabbitmq prefix.

In addition to Spring Boot options, the RabbitMQ binder supports the following properties:

spring.cloud.stream.rabbit.binder.adminAddresses

A comma-separated list of RabbitMQ management plugin URLs. Only used when nodes contains more than one entry. Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses.

Default: empty.

spring.cloud.stream.rabbit.binder.nodes

A comma-separated list of RabbitMQ node names. When more than one entry, used to locate the server address where a queue is located. Each entry in this list must have a corresponding entry in spring.rabbitmq.addresses.

Default: empty.

spring.cloud.stream.rabbit.binder.compressionLevel

Compression level for compressed bindings. See java.util.zip.Deflater.

Default: 1 (BEST_LEVEL).

15.3.2 RabbitMQ Consumer Properties

The following properties are available for Rabbit consumers only and must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.consumer..

acknowledgeMode

The acknowledge mode.

Default: AUTO.

autoBindDlq

Whether to automatically declare the DLQ and bind it to the binder DLX.

Default: false.

bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). for partitioned destinations -<instanceIndex> will be appended.

Default: #.

bindQueue

Whether to bind the queue to the destination exchange; set to false if you have set up your own infrastructure and have previously created/bound the queue.

Default: true.

deadLetterQueueName

name of the DLQ

Default: prefix+destination.dlq

deadLetterExchange

a DLX to assign to the queue; if autoBindDlq is true

Default: 'prefix+DLX'

deadLetterRoutingKey

a dead letter routing key to assign to the queue; if autoBindDlq is true

Default: destination

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange - requires the delayed message exchange plugin on the broker. The x-delayed-type argument is set to the exchangeType.

Default: false.

dlqDeadLetterExchange

if a DLQ is declared, a DLX to assign to that queue

Default: none

dlqDeadLetterRoutingKey

if a DLQ is declared, a dead letter routing key to assign to that queue; default none

Default: none

dlqExpires

how long before an unused dead letter queue is deleted (ms)

Default: no expiration

dlqMaxLength

maximum number of messages in the dead letter queue

Default: no limit

dlqMaxLengthBytes

maximum number of total bytes in the dead letter queue from all messages

Default: no limit

dlqMaxPriority

maximum priority of messages in the dead letter queue (0-255)

Default: none

dlqTtl

default time to live to apply to the dead letter queue when declared (ms)

Default: no limit

durableSubscription

Whether subscription should be durable. Only effective if group is also set.

Default: true.

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-delete (removed after the last queue is removed).

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (survives broker restart).

Default: true.

exchangeType

The exchange type; direct, fanout or topic for non-partitioned destinations; direct or topic for partitioned destinations.

Default: topic.

expires

how long before an unused queue is deleted (ms)

Default: no expiration

headerPatterns

Patterns for headers to be mapped from inbound messages.

Default: ['*'] (all headers).

maxConcurrency

the maximum number of consumers

Default: 1.

maxLength

maximum number of messages in the queue

Default: no limit

maxLengthBytes

maximum number of total bytes in the queue from all messages

Default: no limit

maxPriority
maximum priority of messages in the queue (0-255)
Default
none
prefetch

Prefetch count.

Default: 1.

prefix

A prefix to be added to the name of the destination and queues.

Default: "".

recoveryInterval

The interval between connection recovery attempts, in milliseconds.

Default: 5000.

requeueRejected

Whether delivery failures should be requeued when retry is disabled or republishToDlq is false.

Default: false.

republishDeliveryMode

When republishToDlq is true, specify the delivery mode of the republished message.

Default: DeliveryMode.PERSISTENT

republishToDlq

By default, messages which fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ will route the failed message (unchanged) to the DLQ. If set to true, the binder will republish failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.

Default: false

transacted

Whether to use transacted channels.

Default: false.

ttl

default time to live to apply to the queue when declared (ms)

Default: no limit

txSize

The number of deliveries between acks.

Default: 1.

15.3.3 Rabbit Producer Properties

The following properties are available for Rabbit producers only and must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.producer..

autoBindDlq

Whether to automatically declare the DLQ and bind it to the binder DLX.

Default: false.

batchingEnabled

Whether to enable message batching by producers.

Default: false.

batchSize

The number of messages to buffer when batching is enabled.

Default: 100.

batchBufferLimit
Default: 10000.
batchTimeout
Default: 5000.
bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). Only applies to non-partitioned destinations. Only applies if requiredGroups are provided and then only to those groups.

Default: #.

bindQueue

Whether to bind the queue to the destination exchange; set to false if you have set up your own infrastructure and have previously created/bound the queue. Only applies if requiredGroups are provided and then only to those groups.

Default: true.

compress

Whether data should be compressed when sent.

Default: false.

deadLetterQueueName

name of the DLQ Only applies if requiredGroups are provided and then only to those groups.

Default: prefix+destination.dlq

deadLetterExchange

a DLX to assign to the queue; if autoBindDlq is true Only applies if requiredGroups are provided and then only to those groups.

Default: 'prefix+DLX'

deadLetterRoutingKey

a dead letter routing key to assign to the queue; if autoBindDlq is true Only applies if requiredGroups are provided and then only to those groups.

Default: destination

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delay

A SpEL expression to evaluate the delay to apply to the message (x-delay header) - has no effect if the exchange is not a delayed message exchange.

Default: No x-delay header is set.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange - requires the delayed message exchange plugin on the broker. The x-delayed-type argument is set to the exchangeType.

Default: false.

deliveryMode

Delivery mode.

Default: PERSISTENT.

dlqDeadLetterExchange

if a DLQ is declared, a DLX to assign to that queue Only applies if requiredGroups are provided and then only to those groups.

Default: none

dlqDeadLetterRoutingKey

if a DLQ is declared, a dead letter routing key to assign to that queue; default none Only applies if requiredGroups are provided and then only to those groups.

Default: none

dlqExpires

how long before an unused dead letter queue is deleted (ms) Only applies if requiredGroups are provided and then only to those groups.

Default: no expiration

dlqMaxLength

maximum number of messages in the dead letter queue Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxLengthBytes

maximum number of total bytes in the dead letter queue from all messages Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxPriority

maximum priority of messages in the dead letter queue (0-255) Only applies if requiredGroups are provided and then only to those groups.

Default: none

dlqTtl

default time to live to apply to the dead letter queue when declared (ms) Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-delete (removed after the last queue is removed).

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (survives broker restart).

Default: true.

exchangeType

The exchange type; direct, fanout or topic for non-partitioned destinations; direct or topic for partitioned destinations.

Default: topic.

expires

how long before an unused queue is deleted (ms) Only applies if requiredGroups are provided and then only to those groups.

Default: no expiration

headerPatterns

Patterns for headers to be mapped to outbound messages.

Default: ['*'] (all headers).

maxLength

maximum number of messages in the queue Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

maxLengthBytes

maximum number of total bytes in the queue from all messages Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

maxPriority
maximum priority of messages in the queue (0-255) Only applies if requiredGroups are provided and then only to those groups.
Default
none
prefix

A prefix to be added to the name of the destination exchange.

Default: "".

routingKeyExpression

A SpEL expression to determine the routing key to use when publishing messages.

Default: destination or destination-<partition> for partitioned destinations.

transacted

Whether to use transacted channels.

Default: false.

ttl

default time to live to apply to the queue when declared (ms) Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

[Note]Note

In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport (including transports, such as Kafka, that do not normally support headers).

15.4 Retry With the RabbitMQ Binder

15.4.1 Overview

When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured. This might be important when strict ordering is required with a single consumer but for other use cases it prevents other messages from being processed on that thread. An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ), as well as dead-letter configuration on the DLQ itself. See Section 15.3.1, “RabbitMQ Binder Properties” for more information about the properties discussed here. Example configuration to enable this feature:

  • Set autoBindDlq to true - the binder will create a DLQ; you can optionally specify a name in deadLetterQueueName
  • Set dlqTtl to the back off time you want to wait between redeliveries
  • Set the dlqDeadLetterExchange to the default exchange - expired messages from the DLQ will be routed to the original queue since the default deadLetterRoutingKey is the queue name (destination.group)

To force a message to be dead-lettered, either throw an AmqpRejectAndDontRequeueException, or set requeueRejected to true and throw any exception.

The loop will continue without end, which is fine for transient problems but you may want to give up after some number of attempts. Fortunately, RabbitMQ provides the x-death header which allows you to determine how many cycles have occurred.

To acknowledge a message after giving up, throw an ImmediateAcknowledgeAmqpException.

15.4.2 Putting it All Together

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

This configuration creates an exchange myDestination with queue myDestination.consumerGroup bound to a topic exchange with a wildcard routing key #. It creates a DLQ bound to a direct exchange DLX with routing key myDestination.consumerGroup. When messages are rejected, they are routed to the DLQ. After 5 seconds, the message expires and is routed to the original queue using the queue name as the routing key.

Spring Boot application. 

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

Notice that the count property in the x-death header is a Long.

15.5 Dead-Letter Queue Processing

Because it can’t be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue. However, if the problem is a permanent issue, that could cause an infinite loop. The following spring-boot application is an example of how to route those messages back to the original queue, but moves them to a third "parking lot" queue after three attempts. The second example utilizes the RabbitMQ Delayed Message Exchange to introduce a delay to the requeued message. In this example, the delay increases for each attempt. These examples use a @RabbitListener to receive messages from the DLQ, you could also use RabbitTemplate.receive() in a batch process.

The examples assume the original destination is so8400in and the consumer group is so8400.

15.5.1 Non-Partitioned Destinations

The first two examples are when the destination is not partitioned.

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

15.5.2 Partitioned Destinations

With partitioned destinations, there is one DLQ for all partitions and we determine the original queue from the headers.

republishToDlq=false

When republishToDlq is false, RabbitMQ publishes the message to the DLX/DLQ with an x-death header containing information about the original destination.

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

When republishToDlq is true, the republishing recoverer adds the original exchange and routing key to headers.

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}