|
This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 4.0.2! |
Generic AMQP 1.0 Support
Version 4.1 introduces spring-amqp-client module for AMQP 1.0 protocol support.
This artifact is based on the Qpid ProtonJ2 Client Library and can work with any peers supporting AMQP 1.0 protocol, including RabbitMQ broker.
This dependency has to be added to the project to be able to interact with AMQP 1.0 support:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp-client</artifactId>
<version>4.1.0-M1</version>
</dependency>
implementation 'org.springframework.amqp:spring-amqp-client:4.1.0-M1'
AMQP 1.0 Environment
The org.apache.qpid:protonj2-client is so flexible and comes with a convenient API that it can be used in Spring applications even without any dedicated Spring implementations.
For example, the org.apache.qpid.protonj2.client.Message class is a builder pattern implementation.
Either way, the spring-amqp-client provides high-level Spring pattern implementations for connection lifecycle management and AMQP 1.0 protocol interaction for sending and receiving messages operations.
The AMQP 1.0 environment starts with a org.apache.qpid.protonj2.client.Client instance:
@Bean
Client protonClient() {
return Client.create();
}
The same Client can be used for connecting to different brokers, the connection settings must be provided on a specific connection.
See also AMQP 1.0 Connection Factory and The AMQP 1.0 Annotation Configuration below.
AMQP 1.0 Connection Factory
The org.springframework.amqp.client.AmqpConnectionFactory abstraction was introduced to manage org.apache.qpid.protonj2.client.Connection.
The SingleAmqpConnectionFactory implementation is present to manage one connection and its settings.
The same Connection can be shared between many producers and consumers.
The multiplexing is handled by the link abstraction for AMQP 1.0 protocol implementation internally in the AMQP client library.
The Connection has recovery capabilities.
In most cases it is enough to add this bean into the project:
@Bean
AmqpConnectionFactory connectionFactory(Client protonClient) {
return new SingleAmqpConnectionFactory(protonClient);
}
See SingleAmqpConnectionFactory setters for all connection-specific options, including host, port, user.
The more advance configuration could be done by the setConnectionOptions(ConnectionOptions) which is a builder API from ProtonJ library.
The mentioned SingleAmqpConnectionFactory instance with all the defaults would connect to the localhost:5672 with infinite number of reconnections.
The Client injection can be omitted for the SingleAmqpConnectionFactory, and respective single bean is resolved from the BeanFactory on demand internally, when getConnection() is called.
|
AmqpClient
The AmqpClient is a fluent API to perform send and receive operations on messages against AMQP 1.0 protocol.
Requires an AmqpConnectionFactory and can be configured with some defaults via AmqpClient.Builder API.
The DefaultAmqpClient is an internal implementation of the AmqpClient contract.
Even if org.apache.qpid:protonj2-client library comes with a org.apache.qpid.protonj2.client.Message implementation, the AmqpClient still exposes an API based on the well-known org.springframework.amqp.core.Message with all the supporting classes like MessageProperties and MessageConverter abstractions.
The conversion to/from org.apache.qpid.protonj2.client.Message is done internally in the AmqpClient implementation via ProtonUtils supporting class.
All send and receive methods return a CompletableFuture to get operation results eventually.
An interaction with plain objects require message body conversion and SimpleMessageConverter is used by default.
See AmqpClient.Builder.messageConverter(MessageConverter) configuration and Message Converters for more information about conversions.
Usually, just one bean like this is enough to perform all the possible send and receive operations:
@Bean
AmqpClient amqpClient(AmqpConnectionFactory connectionFactory) {
return AmqpClient.builder(connectionFactory)
.defaultToAddress("/queues/some_queue_as_default")
.messageConverter(new JacksonJsonMessageConverter())
.build();
}
In the example above the /queues/ prefix used for a defaultToAddress is an example of the destination address convention in RabbitMQ.
|
Here are some samples of AmqpClient operations:
CompletableFuture<Boolean> sendFuture = this.amqpClient.send(Message.create("test_data"));
CompletableFuture<Boolean> sendFuture =
this.amqpClient
.to("/queues/test_queue")
.message(new org.springframework.amqp.core.Message("test_data2".getBytes()))
.send();
CompletableFuture<Boolean> sendFuture =
this.amqpClient
.to("/queues/test_queue")
.body("convert")
.priority(7)
.header("test_header", "test_value")
.messageId("some_id")
.userId("guest")
.send();
The AmqpClient also provides a fluent API to receive messages from addresses on demand.
For that purpose an AmqpClient.from(String fromAddress) method should be called returning a ReceiveSpec for various receive operations behavior.
All of them produce a CompletableFuture with either native ProtonJ message, a Spring AMQP message, or just payload converted from the message’s body.
The receiveAndConvert() with non-Object.class generic argument requires a messageConverter in the AmqpClient to be as a SmartMessageConverter.
For example, when JacksonJsonMessageConverter is provided for the AmqpClient, the following example would yield a proper data model conversion:
record TestData(String data) { }
this.amqpClient
.to("/queues/test_queue")
.body(new TestData("convert"))
.send();
CompletableFuture<TestData> receiveFuture =
this.amqpClient.from("/queues/test_queue")
.receiveAndConvert();
See Javadocs for those methods for more information about the fluent API exposed by the AmqpClient.
The AMQP 1.0 Consumer
The event-driven consumer for AMQP 1.0 protocol is implemented as an AmqpMessageListenerContainer similar to all other listener container implementations in the framework.
The AmqpMessageListenerContainer requires an AmqpConnectionFactory and uses org.apache.qpid.protonj2.client.Receiver instances internally to consume messages from the provided queueNames (essentially, AMQP 1.o addresses).
The 100 initial credits are used for receiver links by default.
Every settled delivery then replenishes some number of credits for new upcoming messages.
The consumed messages are handled by the provided MessageListener, and if the autoAccept is set to false, an AmqpAcknowledgment implementation is populated to respective Spring AMQP message property for manual delivery settlement in the target message listener logic.
The consumersPerQueue option (1 by default) implements a concurrency behavior for each provided address to consume.
The Duration receiveTimeout option (1 second by default) controls the blocking Receiver.receive() operation, which is called in a loop until consumer is stopped.
Each consumer (an internal AmqpMessageListenerContainer.AmqpConsumer instance) is scheduled for running via Executor.
The AmqpMessageListenerContainer.taskExecutor property is set to a SimpleAsyncTaskExecutor by default.
The message processing errors can be handled by the ErrorHandler configuration.
The MessageListener can be proxied in the AmqpMessageListenerContainer if some AOP interceptors are provided, e.g. TransactionInterceptor.
The AmqpMessageListenerContainer also provides pause() and resume() API to set AMQP link credits to 0 and replenish, respectively.
This functionality leaves a Receiver active, but no delivery is coming from the AMQP peer.
The following example demonstrates a simple configuration for AmqpMessageListenerContainer:
BlockingQueue<Message> receivedMessages = new LinkedBlockingQueue<>();
@Bean
AmqpMessageListenerContainer amqpMessageListenerContainer(AmqpConnectionFactory connectionFactory) {
var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
amqpMessageListenerContainer.setQueueNames("address1", "address2");
amqpMessageListenerContainer.setConsumersPerQueue(3);
amqpMessageListenerContainer.setAutoAccept(false);
amqpMessageListenerContainer.setReceiveTimeout(Duration.ofMillis(100));
amqpMessageListenerContainer.setupMessageListener(this.receivedMessages::add);
return amqpMessageListenerContainer;
}
ProtonJ Delivery Consumption
For convenience, a ProtonDeliveryListener contract is provided to handle native ProtonJ Delivery objects instead of Spring AMQP messages.
It could be useful in scenarios where full control over the Delivery instance is required.
For example, adding some reject conditions, handling received data as an InputStream, replenishing link credits dynamically, according to some target application logic.
Another scenario of native Delivery handling, when both producer and consumer deal with native ProtonJ Message contract, including ProtonJ body encoding/decoding internal mechanism (unlike the Spring AMQP message which accepts only byte[] bodies).
The manual delivery settlement still can be skipped, if the AmqpMessageListenerContainer is configured for autoAccept = true (default).
And the AmqpMessageListenerContainer performs credits replenishment automatically.
|
The ProtonDeliveryListener implementation should be injected into the AmqpMessageListenerContainer as a regular MessageListener:
BlockingQueue<Delivery> receivedDeliveries = new LinkedBlockingQueue<>();
@Bean
AmqpMessageListenerContainer protonDeliveryListenerContainer(AmqpConnectionFactory connectionFactory) {
var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
amqpMessageListenerContainer.setQueueNames(TEST_QUEUE_FOR_NATIVE_PROTON);
amqpMessageListenerContainer.setAutoAccept(false);
amqpMessageListenerContainer.setupMessageListener((ProtonDeliveryListener) this.receivedDeliveries::add);
return amqpMessageListenerContainer;
}
The AMQP 1.0 Annotation Configuration
The spring-amqp-client provides a convenient way to configure the AMQP 1.0 infrastructure via annotations.
The @EnableAmqp annotation can be set on a @Configuration class in the target project to trigger that infrastructure registration.
The imported by the @EnableAmqp annotation AmqpDefaultConfiguration provides conditional beans (may be configured in the target project manually):
-
The
org.apache.qpid.protonj2.client.Clientwhere itsorg.apache.qpid.protonj2.client.ClientOptionsproperty is based on the respective attributes of the imported metadata from the@EnableAmqp.