Class AsyncRabbitTemplate
- java.lang.Object
-
- org.springframework.amqp.rabbit.AsyncRabbitTemplate
-
- All Implemented Interfaces:
AsyncAmqpTemplate
,MessageListener
,RabbitTemplate.ConfirmCallback
,RabbitTemplate.ReturnCallback
,RabbitTemplate.ReturnsCallback
,ChannelAwareMessageListener
,Aware
,BeanNameAware
,Lifecycle
,Phased
,SmartLifecycle
public class AsyncRabbitTemplate extends Object implements AsyncAmqpTemplate, ChannelAwareMessageListener, RabbitTemplate.ReturnsCallback, RabbitTemplate.ConfirmCallback, BeanNameAware, SmartLifecycle
Provides asynchronous send and receive operations returning aListenableFuture
allowing the caller to obtain the reply later, usingget()
or a callback.When confirms are enabled, the future has a confirm property which is itself a
ListenableFuture
. If the reply is received before the publisher confirm, the confirm is discarded since the reply implicitly indicates the message was published.Returned (undeliverable) request messages are presented as a
AmqpMessageReturnedException
cause of anExecutionException
.Internally, the template uses a
RabbitTemplate
and anAbstractMessageListenerContainer
either provided or constructed internally (aSimpleMessageListenerContainer
). If an externalRabbitTemplate
is provided and confirms/returns are enabled, it must not previously have had callbacks registered because this object needs to be the callback.- Since:
- 1.6
- Author:
- Gary Russell, Artem Bilan
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description class
AsyncRabbitTemplate.RabbitConverterFuture<C>
AAsyncRabbitTemplate.RabbitFuture
with a return type of the template's generic parameter.class
AsyncRabbitTemplate.RabbitFuture<T>
Base class forListenableFuture
s returned byAsyncRabbitTemplate
.class
AsyncRabbitTemplate.RabbitMessageFuture
AAsyncRabbitTemplate.RabbitFuture
with a return type ofMessage
.
-
Field Summary
Fields Modifier and Type Field Description static int
DEFAULT_RECEIVE_TIMEOUT
-
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
-
Constructor Summary
Constructors Constructor Description AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
Construct an instance using the provided arguments.AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue)
Construct an instance using the provided arguments.AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue, String replyAddress)
Construct an instance using the provided arguments.AsyncRabbitTemplate(RabbitTemplate template)
Construct an instance using the provided arguments.AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container)
Construct an instance using the provided arguments.AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container, String replyAddress)
Construct an instance using the provided arguments.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
confirm(CorrelationData correlationData, boolean ack, String cause)
Confirmation callback.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(Object object)
Convert the object to a message and send it to the default exchange with the default routing key.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the default exchange with the default routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(String routingKey, Object object)
Convert the object to a message and send it to the default exchange with the provided routing key.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(String routingKey, Object object, MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the default exchange with the provided routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(String exchange, String routingKey, Object object)
Convert the object to a message and send it to the provided exchange and routing key.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceive(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor)
Convert the object to a message and send it to the provided exchange and routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the default routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the default routing key.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the provided routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(String routingKey, Object object, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the default exchange with the provided routing key.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the provided exchange and routing key after invoking theMessagePostProcessor
.<C> AsyncRabbitTemplate.RabbitConverterFuture<C>
convertSendAndReceiveAsType(String exchange, String routingKey, Object object, ParameterizedTypeReference<C> responseType)
Convert the object to a message and send it to the provided exchange and routing key.String
getBeanName()
ConnectionFactory
getConnectionFactory()
MessageConverter
getMessageConverter()
int
getPhase()
RabbitTemplate
getRabbitTemplate()
Return the underlyingRabbitTemplate
used for sending.boolean
isAutoStartup()
boolean
isRunning()
void
onMessage(Message message, com.rabbitmq.client.Channel channel)
Callback for processing a received Rabbit message.void
returnedMessage(ReturnedMessage returned)
Returned message callback.AsyncRabbitTemplate.RabbitMessageFuture
sendAndReceive(String exchange, String routingKey, Message message)
Send a message to the supplied exchange and routing key.AsyncRabbitTemplate.RabbitMessageFuture
sendAndReceive(String routingKey, Message message)
Send a message to the default exchange with the supplied routing key.AsyncRabbitTemplate.RabbitMessageFuture
sendAndReceive(Message message)
Send a message to the default exchange with the default routing key.void
setAutoStartup(boolean autoStartup)
void
setBeanName(String beanName)
void
setEnableConfirms(boolean enableConfirms)
Set to true to enable publisher confirms.void
setMandatory(boolean mandatory)
Set to true to enable the receipt of returned messages that cannot be delivered in the form of aAmqpMessageReturnedException
.void
setMandatoryExpression(Expression mandatoryExpression)
void
setMandatoryExpressionString(String mandatoryExpression)
void
setPhase(int phase)
void
setReceiveTimeout(long receiveTimeout)
Set the receive timeout - the future returned by the send and receive methods will be canceled when this timeout expires.void
setTaskScheduler(TaskScheduler taskScheduler)
Set the task scheduler to expire timed out futures.void
start()
void
stop()
String
toString()
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener
onMessage, onMessageBatch
-
Methods inherited from interface org.springframework.amqp.core.MessageListener
containerAckMode, isAsyncReplies, onMessageBatch
-
Methods inherited from interface org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnsCallback
delegate, returnedMessage
-
Methods inherited from interface org.springframework.context.SmartLifecycle
stop
-
-
-
-
Field Detail
-
DEFAULT_RECEIVE_TIMEOUT
public static final int DEFAULT_RECEIVE_TIMEOUT
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue)
Construct an instance using the provided arguments. Replies will be routed to the default exchange using the reply queue name as the routing key.- Parameters:
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.replyQueue
- the name of the reply queue to listen for replies.
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue, String replyAddress)
Construct an instance using the provided arguments. If 'replyAddress' is null, replies will be routed to the default exchange using the reply queue name as the routing key. Otherwise it should have the form exchange/routingKey and must cause messages to be routed to the reply queue.- Parameters:
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.replyQueue
- the name of the reply queue to listen for replies.replyAddress
- the reply address (exchange/routingKey).
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container)
Construct an instance using the provided arguments. The first queue the container is configured to listen to will be used as the reply queue. Replies will be routed using the default exchange with that queue name as the routing key.- Parameters:
template
- aRabbitTemplate
container
- aAbstractMessageListenerContainer
.
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container, String replyAddress)
Construct an instance using the provided arguments. The first queue the container is configured to listen to will be used as the reply queue. If 'replyAddress' is null, replies will be routed using the default exchange with that queue name as the routing key. Otherwise it should have the form exchange/routingKey and must cause messages to be routed to the reply queue.- Parameters:
template
- aRabbitTemplate
.container
- aAbstractMessageListenerContainer
.replyAddress
- the reply address.
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
Construct an instance using the provided arguments. "Direct replyTo" is used for replies.- Parameters:
connectionFactory
- the connection factory.exchange
- the default exchange to which requests will be sent.routingKey
- the default routing key.- Since:
- 2.0
-
AsyncRabbitTemplate
public AsyncRabbitTemplate(RabbitTemplate template)
Construct an instance using the provided arguments. "Direct replyTo" is used for replies.- Parameters:
template
- aRabbitTemplate
- Since:
- 2.0
-
-
Method Detail
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
- Parameters:
autoStartup
- true for auto start.- See Also:
isAutoStartup()
-
setPhase
public void setPhase(int phase)
- Parameters:
phase
- the phase.- See Also:
getPhase()
-
setMandatory
public void setMandatory(boolean mandatory)
Set to true to enable the receipt of returned messages that cannot be delivered in the form of aAmqpMessageReturnedException
.- Parameters:
mandatory
- true to enable returns.
-
setMandatoryExpression
public void setMandatoryExpression(Expression mandatoryExpression)
- Parameters:
mandatoryExpression
- a SpELExpression
to evaluate against each request message. The result of the evaluation must be aboolean
value.- Since:
- 2.0
-
setMandatoryExpressionString
public void setMandatoryExpressionString(String mandatoryExpression)
- Parameters:
mandatoryExpression
- a SpELExpression
to evaluate against each request message. The result of the evaluation must be aboolean
value.- Since:
- 2.0
-
setEnableConfirms
public void setEnableConfirms(boolean enableConfirms)
Set to true to enable publisher confirms. When enabled, theAsyncRabbitTemplate.RabbitFuture
returned by the send and receive operation will have aListenableFuture<Boolean>
in itsconfirm
property.- Parameters:
enableConfirms
- true to enable publisher confirms.
-
getBeanName
public String getBeanName()
-
setBeanName
public void setBeanName(String beanName)
- Specified by:
setBeanName
in interfaceBeanNameAware
-
getConnectionFactory
public ConnectionFactory getConnectionFactory()
- Returns:
- a reference to the underlying connection factory in the
RabbitTemplate
.
-
setReceiveTimeout
public void setReceiveTimeout(long receiveTimeout)
Set the receive timeout - the future returned by the send and receive methods will be canceled when this timeout expires.<= 0
means futures never expire. Beware that this will cause a memory leak if a reply is not received. Default: 30000 (30 seconds).- Parameters:
receiveTimeout
- the timeout in milliseconds.
-
setTaskScheduler
public void setTaskScheduler(TaskScheduler taskScheduler)
Set the task scheduler to expire timed out futures.- Parameters:
taskScheduler
- the task scheduler- See Also:
setReceiveTimeout(long)
-
getMessageConverter
public MessageConverter getMessageConverter()
- Returns:
- a reference to the underlying
RabbitTemplate
'sMessageConverter
.
-
getRabbitTemplate
public RabbitTemplate getRabbitTemplate()
Return the underlyingRabbitTemplate
used for sending.- Returns:
- the template.
- Since:
- 2.2
-
sendAndReceive
public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(Message message)
Description copied from interface:AsyncAmqpTemplate
Send a message to the default exchange with the default routing key. If the message contains a correlationId property, it must be unique.- Specified by:
sendAndReceive
in interfaceAsyncAmqpTemplate
- Parameters:
message
- the message.- Returns:
- the
ListenableFuture
.
-
sendAndReceive
public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(String routingKey, Message message)
Description copied from interface:AsyncAmqpTemplate
Send a message to the default exchange with the supplied routing key. If the message contains a correlationId property, it must be unique.- Specified by:
sendAndReceive
in interfaceAsyncAmqpTemplate
- Parameters:
routingKey
- the routing key.message
- the message.- Returns:
- the
ListenableFuture
.
-
sendAndReceive
public AsyncRabbitTemplate.RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message)
Description copied from interface:AsyncAmqpTemplate
Send a message to the supplied exchange and routing key. If the message contains a correlationId property, it must be unique.- Specified by:
sendAndReceive
in interfaceAsyncAmqpTemplate
- Parameters:
exchange
- the exchange.routingKey
- the routing key.message
- the message.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(Object object)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the default routing key.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
object
- the object to convert.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the provided routing key.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
routingKey
- the routing key.object
- the object to convert.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the provided exchange and routing key.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
exchange
- the exchange.routingKey
- the routing key.object
- the object to convert.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the default routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
object
- the object to convert.messagePostProcessor
- the post processor.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object, MessagePostProcessor messagePostProcessor)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the provided routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
routingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceive
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the provided exchange and routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceive
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
exchange
- the exchangeroutingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the default routing key.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
object
- the object to convert.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the provided routing key.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
routingKey
- the routing key.object
- the object to convert.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the provided exchange and routing key.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
exchange
- the exchange.routingKey
- the routing key.object
- the object to convert.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the default routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the default exchange with the provided routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
routingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
convertSendAndReceiveAsType
public <C> AsyncRabbitTemplate.RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType)
Description copied from interface:AsyncAmqpTemplate
Convert the object to a message and send it to the provided exchange and routing key after invoking theMessagePostProcessor
. If the post processor adds a correlationId property, it must be unique.- Specified by:
convertSendAndReceiveAsType
in interfaceAsyncAmqpTemplate
- Type Parameters:
C
- the expected result type.- Parameters:
exchange
- the exchangeroutingKey
- the routing key.object
- the object to convert.messagePostProcessor
- the post processor.responseType
- the response type.- Returns:
- the
ListenableFuture
.
-
getPhase
public int getPhase()
- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
onMessage
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
Description copied from interface:ChannelAwareMessageListener
Callback for processing a received Rabbit message.Implementors are supposed to process the given Message, typically sending reply messages through the given Session.
- Specified by:
onMessage
in interfaceChannelAwareMessageListener
- Parameters:
message
- the received AMQP message (nevernull
)channel
- the underlying Rabbit Channel (nevernull
unless called by the stream listener container).
-
returnedMessage
public void returnedMessage(ReturnedMessage returned)
Description copied from interface:RabbitTemplate.ReturnsCallback
Returned message callback.- Specified by:
returnedMessage
in interfaceRabbitTemplate.ReturnCallback
- Specified by:
returnedMessage
in interfaceRabbitTemplate.ReturnsCallback
- Parameters:
returned
- the returned message and metadata.
-
confirm
public void confirm(@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause)
Description copied from interface:RabbitTemplate.ConfirmCallback
Confirmation callback.- Specified by:
confirm
in interfaceRabbitTemplate.ConfirmCallback
- Parameters:
correlationData
- correlation data for the callback.ack
- true for ack, false for nackcause
- An optional cause, for nack, when available, otherwise null.
-
-