public class AsyncRabbitTemplate extends Object implements AsyncAmqpTemplate, ChannelAwareMessageListener, RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback, BeanNameAware, SmartLifecycle
ListenableFuture
allowing the caller to obtain the reply later, using get()
or a callback.
When confirms are enabled, the future has a confirm property which is itself a
ListenableFuture
. If the reply is received before the publisher confirm,
the confirm is discarded since the reply implicitly indicates the message was
published.
Returned (undeliverable) request messages are presented as a
AmqpMessageReturnedException
cause of an
ExecutionException
.
Internally, the template uses a RabbitTemplate
and an
AbstractMessageListenerContainer
either provided or constructed internally
(a SimpleMessageListenerContainer
).
If an external RabbitTemplate
is provided and confirms/returns are enabled,
it must not previously have had callbacks registered because this object needs to
be the callback.
Modifier and Type | Class and Description |
---|---|
class |
AsyncRabbitTemplate.RabbitConverterFuture<C>
A
AsyncRabbitTemplate.RabbitFuture with a return type of the template's
generic parameter. |
class |
AsyncRabbitTemplate.RabbitFuture<T>
Base class for
ListenableFuture s returned by AsyncRabbitTemplate . |
class |
AsyncRabbitTemplate.RabbitMessageFuture
A
AsyncRabbitTemplate.RabbitFuture with a return type of Message . |
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_RECEIVE_TIMEOUT |
DEFAULT_PHASE
Constructor and Description |
---|
AsyncRabbitTemplate(ConnectionFactory connectionFactory,
String exchange,
String routingKey)
Construct an instance using the provided arguments.
|
AsyncRabbitTemplate(ConnectionFactory connectionFactory,
String exchange,
String routingKey,
String replyQueue)
Construct an instance using the provided arguments.
|
AsyncRabbitTemplate(ConnectionFactory connectionFactory,
String exchange,
String routingKey,
String replyQueue,
String replyAddress)
Construct an instance using the provided arguments.
|
AsyncRabbitTemplate(RabbitTemplate template)
Construct an instance using the provided arguments.
|
AsyncRabbitTemplate(RabbitTemplate template,
AbstractMessageListenerContainer container)
Construct an instance using the provided arguments.
|
AsyncRabbitTemplate(RabbitTemplate template,
AbstractMessageListenerContainer container,
String replyAddress)
Construct an instance using the provided arguments.
|
Modifier and Type | Method and Description |
---|---|
void |
confirm(CorrelationData correlationData,
boolean ack,
String cause)
Confirmation callback.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(Object object)
Convert the object to a message and send it to the default exchange with the
default routing key.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(Object object,
MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the default exchange with the
default routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(String routingKey,
Object object)
Convert the object to a message and send it to the default exchange with the
provided routing key.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(String routingKey,
Object object,
MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the default exchange with the
provided routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(String exchange,
String routingKey,
Object object)
Convert the object to a message and send it to the provided exchange and
routing key.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceive(String exchange,
String routingKey,
Object object,
MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the provided exchange and
routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(Object object,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the
default routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(Object object,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the
default routing key.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(String routingKey,
Object object,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the
provided routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(String routingKey,
Object object,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the
provided routing key.
|
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(String exchange,
String routingKey,
Object object,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the provided exchange and
routing key after invoking the
MessagePostProcessor . |
<C> AsyncRabbitTemplate.RabbitConverterFuture<C> |
convertSendAndReceiveAsType(String exchange,
String routingKey,
Object object,
ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the provided exchange and
routing key.
|
String |
getBeanName() |
ConnectionFactory |
getConnectionFactory() |
MessageConverter |
getMessageConverter() |
int |
getPhase() |
boolean |
isAutoStartup() |
boolean |
isRunning() |
void |
onMessage(Message message,
com.rabbitmq.client.Channel channel)
Callback for processing a received Rabbit message.
|
void |
returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey)
Returned message callback.
|
AsyncRabbitTemplate.RabbitMessageFuture |
sendAndReceive(Message message)
Send a message to the default exchange with the default routing key.
|
AsyncRabbitTemplate.RabbitMessageFuture |
sendAndReceive(String routingKey,
Message message)
Send a message to the default exchange with the supplied routing key.
|
AsyncRabbitTemplate.RabbitMessageFuture |
sendAndReceive(String exchange,
String routingKey,
Message message)
Send a message to the supplied exchange and routing key.
|
void |
setAutoStartup(boolean autoStartup) |
void |
setBeanName(String beanName) |
void |
setEnableConfirms(boolean enableConfirms)
Set to true to enable publisher confirms.
|
void |
setMandatory(boolean mandatory)
Set to true to enable the receipt of returned messages that cannot be delivered
in the form of a
AmqpMessageReturnedException . |
void |
setMandatoryExpression(Expression mandatoryExpression) |
void |
setMandatoryExpressionString(String mandatoryExpression) |
void |
setPhase(int phase) |
void |
setReceiveTimeout(long receiveTimeout)
Set the receive timeout - the future returned by the send and receive
methods will be canceled when this timeout expires.
|
void |
setTaskScheduler(TaskScheduler taskScheduler)
Set the task scheduler to expire timed out futures.
|
void |
start() |
void |
stop() |
String |
toString() |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
onMessage
containerAckMode
stop
public static final int DEFAULT_RECEIVE_TIMEOUT
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue)
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.replyQueue
- the name of the reply queue to listen for replies.public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue, String replyAddress)
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.replyQueue
- the name of the reply queue to listen for replies.replyAddress
- the reply address (exchange/routingKey).public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container)
template
- a RabbitTemplate
container
- a AbstractMessageListenerContainer
.public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container, String replyAddress)
template
- a RabbitTemplate
.container
- a AbstractMessageListenerContainer
.replyAddress
- the reply address.public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.public AsyncRabbitTemplate(RabbitTemplate template)
template
- a RabbitTemplate
public void setAutoStartup(boolean autoStartup)
autoStartup
- true for auto start.isAutoStartup()
public void setPhase(int phase)
phase
- the phase.getPhase()
public void setMandatory(boolean mandatory)
AmqpMessageReturnedException
.mandatory
- true to enable returns.public void setMandatoryExpression(Expression mandatoryExpression)
mandatoryExpression
- a SpEL Expression
to evaluate against each request
message. The result of the evaluation must be a boolean
value.public void setMandatoryExpressionString(String mandatoryExpression)
mandatoryExpression
- a SpEL Expression
to evaluate against each request
message. The result of the evaluation must be a boolean
value.public void setEnableConfirms(boolean enableConfirms)
AsyncRabbitTemplate.RabbitFuture
returned by the send and receive operation will have a
ListenableFuture<Boolean>
in its confirm
property.enableConfirms
- true to enable publisher confirms.public String getBeanName()
public void setBeanName(String beanName)
setBeanName
in interface BeanNameAware
public ConnectionFactory getConnectionFactory()
RabbitTemplate
.public void setReceiveTimeout(long receiveTimeout)
<= 0
means
futures never expire. Beware that this will cause a memory leak if a
reply is not received. Default: 30000 (30 seconds).receiveTimeout
- the timeout in milliseconds.public void setTaskScheduler(TaskScheduler taskScheduler)
taskScheduler
- the task schedulersetReceiveTimeout(long)
public MessageConverter getMessageConverter()
RabbitTemplate
's
MessageConverter
.public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(Message message)
AsyncAmqpTemplate
sendAndReceive
in interface AsyncAmqpTemplate
message
- the message.ListenableFuture
.public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(String routingKey, Message message)
AsyncAmqpTemplate
sendAndReceive
in interface AsyncAmqpTemplate
routingKey
- the routing key.message
- the message.ListenableFuture
.public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message)
AsyncAmqpTemplate
sendAndReceive
in interface AsyncAmqpTemplate
exchange
- the exchange.routingKey
- the routing key.message
- the message.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(Object object)
AsyncAmqpTemplate
convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.object
- the object to convert.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object)
AsyncAmqpTemplate
convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.routingKey
- the routing key.object
- the object to convert.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object)
AsyncAmqpTemplate
convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.exchange
- the exchange.routingKey
- the routing key.object
- the object to convert.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.object
- the object to convert.messagePostProcessor
- the post processor.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object, MessagePostProcessor messagePostProcessor)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.routingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceive
in interface AsyncAmqpTemplate
C
- the expected result type.exchange
- the exchangeroutingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.object
- the object to convert.responseType
- the response type.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.routingKey
- the routing key.object
- the object to convert.responseType
- the response type.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.exchange
- the exchange.routingKey
- the routing key.object
- the object to convert.responseType
- the response type.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.routingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.ListenableFuture
.public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
AsyncAmqpTemplate
MessagePostProcessor
.
If the post processor adds a correlationId property, it must be unique.convertSendAndReceiveAsType
in interface AsyncAmqpTemplate
C
- the expected result type.exchange
- the exchangeroutingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.ListenableFuture
.public int getPhase()
getPhase
in interface Phased
getPhase
in interface SmartLifecycle
public boolean isAutoStartup()
isAutoStartup
in interface SmartLifecycle
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
ChannelAwareMessageListener
Implementors are supposed to process the given Message, typically sending reply messages through the given Session.
onMessage
in interface ChannelAwareMessageListener
message
- the received AMQP message (never null
)channel
- the underlying Rabbit Channel (never null
)public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
RabbitTemplate.ReturnCallback
returnedMessage
in interface RabbitTemplate.ReturnCallback
message
- the returned message.replyCode
- the reply code.replyText
- the reply text.exchange
- the exchange.routingKey
- the routing key.public void confirm(@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause)
RabbitTemplate.ConfirmCallback
confirm
in interface RabbitTemplate.ConfirmCallback
correlationData
- correlation data for the callback.ack
- true for ack, false for nackcause
- An optional cause, for nack, when available, otherwise null.