public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware
Helper class that simplifies synchronous RabbitMQ access (sending and receiving messages).
The default settings are for non-transactional messaging, which reduces the amount of data exchanged with the broker.
To use a new transaction for every send or receive set the channelTransacted
flag. To extend the transaction over multiple invocations (more efficient), you can use a Spring transaction to
bracket the calls (with channelTransacted=true
as well).
The only mandatory property is the ConnectionFactory
. There are
strategies available for converting messages to and from Java objects (
MessageConverter
) and for converting message headers (known as message
properties in AMQP, see MessagePropertiesConverter
). The defaults probably do something sensible for typical use cases, as long as the message content-type is set
appropriately.
The "send" methods all have overloaded versions that allow you to explicitly target an exchange and a routing key, or you can set default values to be used in all send operations. The plain "receive" methods allow you to explicitly target a queue to receive from, or you can set a default value for the template that applies to all explicit receives. The convenience methods for send and receive use the sender defaults if no exchange or routing key is specified, but they always use a temporary queue for the receive leg, so the default queue is ignored.
Modifier and Type | Class and Description |
---|---|
static interface |
RabbitTemplate.ConfirmCallback
A callback for publisher confirmations.
|
static interface |
RabbitTemplate.ReturnCallback
A callback for returned messages.
|
protected static class |
RabbitTemplate.TemplateConsumer
Adds
RabbitTemplate.TemplateConsumer.toString() to the DefaultConsumer . |
RabbitOperations.OperationsCallback<T>
logger
Constructor and Description |
---|
RabbitTemplate()
Convenient constructor for use with setter injection.
|
RabbitTemplate(ConnectionFactory connectionFactory)
Create a rabbit template with default strategies and settings.
|
Modifier and Type | Method and Description |
---|---|
void |
addAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Add
MessagePostProcessor that will be invoked immediately after a Channel#basicGet()
and before any message conversion is performed. |
void |
addBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors)
Add
MessagePostProcessor that will be invoked immediately before invoking
Channel#basicPublish() , after all other processing, except creating the
AMQP.BasicProperties from MessageProperties . |
void |
addListener(com.rabbitmq.client.Channel channel)
Add this template as a confirms listener for the provided channel.
|
void |
convertAndSend(Object object)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a default routing key. |
void |
convertAndSend(Object message,
MessagePostProcessor messagePostProcessor)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a default routing key. |
void |
convertAndSend(Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a default routing key. |
void |
convertAndSend(String routingKey,
Object object)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a specific routing key. |
void |
convertAndSend(String routingKey,
Object object,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a specific routing key. |
void |
convertAndSend(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a specific routing key. |
void |
convertAndSend(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a specific routing key. |
void |
convertAndSend(String exchange,
String routingKey,
Object object)
Convert a Java object to an Amqp
Message and send it to a specific exchange
with a specific routing key. |
void |
convertAndSend(String exchange,
String routingKey,
Object object,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a specific exchange
with a specific routing key. |
void |
convertAndSend(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
Convert a Java object to an Amqp
Message and send it to a specific exchange
with a specific routing key. |
void |
convertAndSend(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a specific exchange
with a specific routing key. |
protected Message |
convertMessageIfNecessary(Object object) |
Object |
convertSendAndReceive(Object message)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(Object message,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(Object message,
MessagePostProcessor messagePostProcessor)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String routingKey,
Object message)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String routingKey,
Object message,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String exchange,
String routingKey,
Object message)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String exchange,
String routingKey,
Object message,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
Basic RPC pattern with conversion.
|
Object |
convertSendAndReceive(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(Object message,
CorrelationData correlationData,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(Object message,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(Object message,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String routingKey,
Object message,
CorrelationData correlationData,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String routingKey,
Object message,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
<T> T |
convertSendAndReceiveAsType(String exchange,
String routingKey,
Object message,
ParameterizedTypeReference<T> responseType)
Basic RPC pattern with conversion.
|
protected Message |
convertSendAndReceiveRaw(String exchange,
String routingKey,
Object message,
MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
Convert and send a message and return the raw reply message, or null.
|
void |
correlationConvertAndSend(Object object,
CorrelationData correlationData)
Convert a Java object to an Amqp
Message and send it to a default exchange
with a default routing key. |
void |
determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) |
protected Message |
doReceiveNoWait(String queueName)
Non-blocking receive.
|
void |
doSend(com.rabbitmq.client.Channel channel,
String exchangeArg,
String routingKeyArg,
Message message,
boolean mandatory,
CorrelationData correlationData)
Send the given message to the specified exchange.
|
protected Message |
doSendAndReceive(String exchange,
String routingKey,
Message message,
CorrelationData correlationData)
Send a message and wait for a reply.
|
protected Message |
doSendAndReceiveWithFixed(String exchange,
String routingKey,
Message message,
CorrelationData correlationData) |
protected Message |
doSendAndReceiveWithTemporary(String exchange,
String routingKey,
Message message,
CorrelationData correlationData) |
protected void |
doStart()
Perform additional start actions.
|
protected void |
doStop()
Perform additional stop actions.
|
<T> T |
execute(ChannelCallback<T> action)
Execute the callback with a channel and reliably close the channel afterwards.
|
Collection<String> |
expectedQueueNames()
Invoked by the container during startup so it can verify the queue is correctly
configured (if a simple reply queue name is used instead of exchange/routingKey).
|
Collection<MessagePostProcessor> |
getAfterReceivePostProcessors()
Return configured after receive
MessagePostProcessor s or null . |
String |
getEncoding()
The encoding used when converting between byte arrays and Strings in message properties.
|
String |
getExchange() |
MessageConverter |
getMessageConverter()
Return the message converter for this template.
|
protected MessagePropertiesConverter |
getMessagePropertiesConverter()
Return the properties converter.
|
String |
getRoutingKey() |
Collection<CorrelationData> |
getUnconfirmed(long age)
Gets unconfirmed correlation data older than age and removes them.
|
int |
getUnconfirmedCount()
Gets unconfirmed messages count.
|
String |
getUUID()
Returns the UUID used to identify this Listener for returns.
|
void |
handleConfirm(PendingConfirm pendingConfirm,
boolean ack)
Invoked by the channel when a confirm is received.
|
void |
handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
com.rabbitmq.client.AMQP.BasicProperties properties,
byte[] body) |
protected void |
initDefaultStrategies()
Set up the default strategies.
|
<T> T |
invoke(RabbitOperations.OperationsCallback<T> action,
com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks)
Invoke operations on the same channel.
|
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
template's Channel handling and not by an external transaction coordinator.
|
boolean |
isConfirmListener() |
Boolean |
isMandatoryFor(Message message)
Return whether the provided message should be sent with the mandatory flag set.
|
boolean |
isReturnListener() |
boolean |
isRunning() |
boolean |
isUsePublisherConnection()
True if separate publisher connection(s) are used.
|
void |
onMessage(Message message) |
Message |
receive()
Receive a message if there is one from a default queue.
|
Message |
receive(long timeoutMillis)
Receive a message from a default queue, waiting up to the specified wait time if
necessary for a message to become available.
|
Message |
receive(String queueName)
Receive a message if there is one from a specific queue.
|
Message |
receive(String queueName,
long timeoutMillis)
Receive a message from a specific queue, waiting up to the specified wait time if
necessary for a message to become available.
|
Object |
receiveAndConvert()
Receive a message if there is one from a default queue and convert it to a Java
object.
|
Object |
receiveAndConvert(long timeoutMillis)
Receive a message if there is one from a default queue and convert it to a Java
object.
|
<T> T |
receiveAndConvert(long timeoutMillis,
ParameterizedTypeReference<T> type)
Receive a message if there is one from a default queue and convert it to a Java
object.
|
<T> T |
receiveAndConvert(ParameterizedTypeReference<T> type)
Receive a message if there is one from a default queue and convert it to a Java
object.
|
Object |
receiveAndConvert(String queueName)
Receive a message if there is one from a specific queue and convert it to a Java
object.
|
Object |
receiveAndConvert(String queueName,
long timeoutMillis)
Receive a message if there is one from a specific queue and convert it to a Java
object.
|
<T> T |
receiveAndConvert(String queueName,
long timeoutMillis,
ParameterizedTypeReference<T> type)
Receive a message if there is one from a specific queue and convert it to a Java
object.
|
<T> T |
receiveAndConvert(String queueName,
ParameterizedTypeReference<T> type)
Receive a message if there is one from a specific queue and convert it to a Java
object.
|
<R,S> boolean |
receiveAndReply(ReceiveAndReplyCallback<R,S> callback)
Receive a message if there is one from a default queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the replyTo Address
from MessageProperties or to default exchange
and default routingKey. |
<R,S> boolean |
receiveAndReply(ReceiveAndReplyCallback<R,S> callback,
ReplyToAddressCallback<S> replyToAddressCallback)
Receive a message if there is one from a default queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the replyTo Address
from result of ReplyToAddressCallback . |
<R,S> boolean |
receiveAndReply(ReceiveAndReplyCallback<R,S> callback,
String exchange,
String routingKey)
Receive a message if there is one from default queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the provided exchange and routingKey . |
<R,S> boolean |
receiveAndReply(String queueName,
ReceiveAndReplyCallback<R,S> callback)
Receive a message if there is one from provided queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the replyTo Address
from MessageProperties or to default exchange
and default routingKey. |
<R,S> boolean |
receiveAndReply(String queueName,
ReceiveAndReplyCallback<R,S> callback,
ReplyToAddressCallback<S> replyToAddressCallback)
Receive a message if there is one from provided queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the replyTo Address
from result of ReplyToAddressCallback . |
<R,S> boolean |
receiveAndReply(String queueName,
ReceiveAndReplyCallback<R,S> callback,
String replyExchange,
String replyRoutingKey)
Receive a message if there is one from provided queue, invoke provided
ReceiveAndReplyCallback and send reply message, if the callback
returns one, to the provided exchange and routingKey . |
boolean |
removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
Remove the provided
MessagePostProcessor from the afterReceivePostProcessors list. |
boolean |
removeBeforePublishPostProcessor(MessagePostProcessor beforePublishPostProcessor)
Remove the provided
MessagePostProcessor from the beforePublishPostProcessors list. |
protected void |
replyTimedOut(String correlationId)
Subclasses can implement this to be notified that a reply has timed out.
|
void |
revoke(com.rabbitmq.client.Channel channel)
When called, this listener should remove all references to the
channel - it will no longer be invoked by the channel.
|
void |
send(Message message)
Send a message to a default exchange with a default routing key.
|
void |
send(String routingKey,
Message message)
Send a message to a default exchange with a specific routing key.
|
void |
send(String exchange,
String routingKey,
Message message)
Send a message to a specific exchange with a specific routing key.
|
void |
send(String exchange,
String routingKey,
Message message,
CorrelationData correlationData)
Send a message to a specific exchange with a specific routing key.
|
Message |
sendAndReceive(Message message)
Basic RPC pattern.
|
Message |
sendAndReceive(Message message,
CorrelationData correlationData) |
Message |
sendAndReceive(String routingKey,
Message message)
Basic RPC pattern.
|
Message |
sendAndReceive(String routingKey,
Message message,
CorrelationData correlationData) |
Message |
sendAndReceive(String exchange,
String routingKey,
Message message)
Basic RPC pattern.
|
Message |
sendAndReceive(String exchange,
String routingKey,
Message message,
CorrelationData correlationData) |
protected void |
sendToRabbit(com.rabbitmq.client.Channel channel,
String exchange,
String routingKey,
boolean mandatory,
Message message) |
void |
setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Set a
MessagePostProcessor that will be invoked immediately after a Channel#basicGet()
and before any message conversion is performed. |
void |
setBeanFactory(BeanFactory beanFactory) |
void |
setBeanName(String name) |
void |
setBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors)
Set
MessagePostProcessor s that will be invoked immediately before invoking
Channel#basicPublish() , after all other processing, except creating the
AMQP.BasicProperties from MessageProperties . |
void |
setConfirmCallback(RabbitTemplate.ConfirmCallback confirmCallback) |
void |
setCorrelationDataPostProcessor(CorrelationDataPostProcessor correlationDataPostProcessor)
Set a
CorrelationDataPostProcessor to be invoked before publishing a message. |
void |
setCorrelationKey(String correlationKey)
If set to 'correlationId' (default) the correlationId property
will be used; otherwise the supplied key will be used.
|
void |
setDefaultReceiveQueue(String queue)
The name of the default queue to receive messages from when none is specified explicitly.
|
void |
setEncoding(String encoding)
The encoding to use when converting between byte arrays and Strings in message properties.
|
void |
setExchange(String exchange)
The name of the default exchange to use for send operations when none is specified.
|
void |
setMandatory(boolean mandatory)
Set the mandatory flag when sending messages; only applies if a
returnCallback had been provided. |
void |
setMandatoryExpression(Expression mandatoryExpression) |
void |
setMandatoryExpressionString(String mandatoryExpression) |
void |
setMessageConverter(MessageConverter messageConverter)
Set the message converter for this template.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this template. |
void |
setNoLocalReplyConsumer(boolean noLocalReplyConsumer)
Set to true for a no-local consumer.
|
void |
setQueue(String queue)
Deprecated.
in favor of
setDefaultReceiveQueue(String) . |
void |
setReceiveConnectionFactorySelectorExpression(Expression receiveConnectionFactorySelectorExpression)
A SpEL
Expression to evaluate
against each receive queueName , if the provided RabbitAccessor.getConnectionFactory()
is an instance of AbstractRoutingConnectionFactory . |
void |
setReceiveTimeout(long receiveTimeout)
Specify the receive timeout in milliseconds when using
receive() methods (for sendAndReceive()
methods, refer to replyTimeout . |
void |
setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
Add a
RecoveryCallback which is used for the retryTemplate.execute . |
void |
setReplyAddress(String replyAddress)
An address for replies; if not provided, a temporary exclusive, auto-delete queue will
be used for each reply, unless RabbitMQ supports 'amq.rabbitmq.reply-to' - see
https://www.rabbitmq.com/direct-reply-to.html
|
void |
setReplyErrorHandler(ErrorHandler replyErrorHandler)
When using a direct reply-to container for request/reply operations, set an error
handler to be invoked when a reply delivery fails (e.g.
|
void |
setReplyTimeout(long replyTimeout)
Specify the timeout in milliseconds to be used when waiting for a reply Message when using one of the
sendAndReceive methods.
|
void |
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Add a
RetryTemplate which will be used for all rabbit operations. |
void |
setReturnCallback(RabbitTemplate.ReturnCallback returnCallback) |
void |
setRoutingKey(String routingKey)
The value of a default routing key to use for send operations when none is specified.
|
void |
setSendConnectionFactorySelectorExpression(Expression sendConnectionFactorySelectorExpression)
A SpEL
Expression to evaluate
against each request message, if the provided RabbitAccessor.getConnectionFactory()
is an instance of AbstractRoutingConnectionFactory . |
void |
setTaskExecutor(Executor taskExecutor)
Set a task executor to use when using a
DirectReplyToMessageListenerContainer . |
void |
setUseDirectReplyToContainer(boolean useDirectReplyToContainer)
Set whether or not to use a
DirectReplyToMessageListenerContainer when
direct reply-to is available and being used. |
void |
setUsePublisherConnection(boolean usePublisherConnection)
To avoid deadlocked connections, it is generally recommended to use
a separate connection for publishers and consumers (except when a publisher
is participating in a consumer transaction).
|
void |
setUserCorrelationId(boolean userCorrelationId)
Set to true to use correlation id provided by the message instead of generating
the correlation id for request/reply scenarios.
|
void |
setUserIdExpression(Expression userIdExpression)
Set an expression to be evaluated to set the userId message property if it
evaluates to a non-null value and the property is not already set in the
message to be sent.
|
void |
setUserIdExpressionString(String userIdExpression)
Set an expression to be evaluated to set the userId message property if it
evaluates to a non-null value and the property is not already set in the
message to be sent.
|
void |
setUseTemporaryReplyQueues(boolean value)
By default, when the broker supports it and no
replyAddress is provided, send/receive
methods will use Direct reply-to (https://www.rabbitmq.com/direct-reply-to.html). |
void |
start() |
void |
stop() |
protected boolean |
useDirectReplyTo()
Override this method use some other criteria to decide whether or not to use
direct reply-to (https://www.rabbitmq.com/direct-reply-to.html).
|
boolean |
waitForConfirms(long timeout)
Delegate to the underlying dedicated channel to wait for confirms.
|
void |
waitForConfirmsOrDie(long timeout)
Delegate to the underlying dedicated channel to wait for confirms.
|
afterPropertiesSet, convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
convertSendAndReceiveAsType, getConnectionFactory, invoke
containerAckMode
public RabbitTemplate()
public RabbitTemplate(ConnectionFactory connectionFactory)
connectionFactory
- the connection factory to useprotected void initDefaultStrategies()
public void setExchange(@Nullable String exchange)
""
which is the default exchange in the broker (per the AMQP specification).exchange
- the exchange name to use for send operationspublic String getExchange()
public void setRoutingKey(String routingKey)
routingKey
- the default routing key to use for send operationspublic String getRoutingKey()
@Deprecated public void setQueue(String queue)
setDefaultReceiveQueue(String)
.queue
- the default queue name to use for receivepublic void setDefaultReceiveQueue(String queue)
queue
- the default queue name to use for receivepublic void setEncoding(String encoding)
encoding
- the encoding to setpublic String getEncoding()
public void setReplyAddress(String replyAddress)
The address can be a simple queue name (in which case the reply will be routed via the default
exchange), or with the form exchange/routingKey
to route the reply using an explicit
exchange and routing key.
replyAddress
- the replyAddress to setpublic void setReceiveTimeout(long receiveTimeout)
receive()
methods (for sendAndReceive()
methods, refer to replyTimeout
. By default, the value is zero, which
means the receive()
methods will return null
immediately if there is no message
available. Set to less than zero to wait for a message indefinitely.receiveTimeout
- the timeout.public void setReplyTimeout(long replyTimeout)
DEFAULT_REPLY_TIMEOUT
. A negative value
indicates an indefinite timeout. Not used in the plain receive methods because there is no blocking receive
operation defined in the protocol.replyTimeout
- the reply timeout in millisecondssendAndReceive(String, String, Message)
,
convertSendAndReceive(String, String, Object)
public void setMessageConverter(MessageConverter messageConverter)
The default converter is a SimpleMessageConverter, which is able to handle byte arrays, Strings, and Serializable Objects depending on the message content type header.
messageConverter
- The message converter.convertAndSend(java.lang.Object)
,
receiveAndConvert()
,
SimpleMessageConverter
public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter
for this template. This converter is used to convert between raw byte
content in the message headers and plain Java objects. In particular there are limitations when dealing with very
long string headers, which hopefully are rare in practice, but if you need to use long headers you might need to
inject a special converter here.messagePropertiesConverter
- The message properties converter.protected MessagePropertiesConverter getMessagePropertiesConverter()
public MessageConverter getMessageConverter()
ChannelCallback
implementations.public void setConfirmCallback(RabbitTemplate.ConfirmCallback confirmCallback)
public void setReturnCallback(RabbitTemplate.ReturnCallback returnCallback)
public void setMandatory(boolean mandatory)
returnCallback
had been provided.mandatory
- the mandatory to set.public void setMandatoryExpression(Expression mandatoryExpression)
mandatoryExpression
- a SpEL Expression
to evaluate against each
request message, if a returnCallback
has
been provided. The result of the evaluation must be a boolean
value.public void setMandatoryExpressionString(String mandatoryExpression)
mandatoryExpression
- a SpEL Expression
to evaluate against each
request message, if a returnCallback
has
been provided. The result of the evaluation must be a boolean
value.public void setSendConnectionFactorySelectorExpression(Expression sendConnectionFactorySelectorExpression)
Expression
to evaluate
against each request message, if the provided RabbitAccessor.getConnectionFactory()
is an instance of AbstractRoutingConnectionFactory
.
The result of this expression is used as lookupKey
to get the target
ConnectionFactory
from AbstractRoutingConnectionFactory
directly.
If this expression is evaluated to null
, we fallback to the normal
AbstractRoutingConnectionFactory
logic.
If there is no target ConnectionFactory
with the evaluated lookupKey
,
we fallback to the normal AbstractRoutingConnectionFactory
logic
only if its property lenientFallback == true
.
This expression is used for send
operations.
sendConnectionFactorySelectorExpression
- a SpEL Expression
to evaluatepublic void setReceiveConnectionFactorySelectorExpression(Expression receiveConnectionFactorySelectorExpression)
Expression
to evaluate
against each receive
queueName
, if the provided RabbitAccessor.getConnectionFactory()
is an instance of AbstractRoutingConnectionFactory
.
The result of this expression is used as lookupKey
to get the target
ConnectionFactory
from AbstractRoutingConnectionFactory
directly.
If this expression is evaluated to null
, we fallback to the normal
AbstractRoutingConnectionFactory
logic.
If there is no target ConnectionFactory
with the evaluated lookupKey
,
we fallback to the normal AbstractRoutingConnectionFactory
logic
only if its property lenientFallback == true
.
This expression is used for receive
operations.
receiveConnectionFactorySelectorExpression
- a SpEL Expression
to evaluatepublic void setCorrelationKey(String correlationKey)
correlationKey
- the correlationKey to setpublic void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
RetryTemplate
which will be used for all rabbit operations.retryTemplate
- The retry template.public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
RecoveryCallback
which is used for the retryTemplate.execute
.
If retryTemplate
isn't provided recoveryCallback
is ignored.
RecoveryCallback
should produce result compatible with
execute(ChannelCallback)
return type.recoveryCallback
- The retry recoveryCallback.public void setBeanFactory(BeanFactory beanFactory) throws BeansException
setBeanFactory
in interface BeanFactoryAware
BeansException
public void setBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors)
MessagePostProcessor
s that will be invoked immediately before invoking
Channel#basicPublish()
, after all other processing, except creating the
AMQP.BasicProperties
from MessageProperties
. May be used for operations
such as compression. Processors are invoked in order, depending on PriorityOrder
,
Order
and finally unordered.beforePublishPostProcessors
- the post processor.addBeforePublishPostProcessors(MessagePostProcessor...)
public void addBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors)
MessagePostProcessor
that will be invoked immediately before invoking
Channel#basicPublish()
, after all other processing, except creating the
AMQP.BasicProperties
from MessageProperties
. May be used for operations
such as compression. Processors are invoked in order, depending on PriorityOrder
,
Order
and finally unordered.
In contrast to setBeforePublishPostProcessors(MessagePostProcessor...)
, this
method does not override the previously added beforePublishPostProcessors.
beforePublishPostProcessors
- the post processor.public boolean removeBeforePublishPostProcessor(MessagePostProcessor beforePublishPostProcessor)
MessagePostProcessor
from the beforePublishPostProcessors
list.beforePublishPostProcessor
- the MessagePostProcessor to remove.addBeforePublishPostProcessors(MessagePostProcessor...)
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
that will be invoked immediately after a Channel#basicGet()
and before any message conversion is performed.
May be used for operations such as decompression. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.afterReceivePostProcessors
- the post processor.addAfterReceivePostProcessors(MessagePostProcessor...)
@Nullable public Collection<MessagePostProcessor> getAfterReceivePostProcessors()
MessagePostProcessor
s or null
.MessagePostProcessor
s or null
.public void addAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
that will be invoked immediately after a Channel#basicGet()
and before any message conversion is performed.
May be used for operations such as decompression. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.
In contrast to setAfterReceivePostProcessors(MessagePostProcessor...)
, this
method does not override the previously added afterReceivePostProcessors.
afterReceivePostProcessors
- the post processor.public boolean removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
MessagePostProcessor
from the afterReceivePostProcessors
list.afterReceivePostProcessor
- the MessagePostProcessor to remove.addAfterReceivePostProcessors(MessagePostProcessor...)
public void setCorrelationDataPostProcessor(CorrelationDataPostProcessor correlationDataPostProcessor)
CorrelationDataPostProcessor
to be invoked before publishing a message.
Correlation data is used to correlate publisher confirms.correlationDataPostProcessor
- the post processor.setConfirmCallback(ConfirmCallback)
public void setUseTemporaryReplyQueues(boolean value)
replyAddress
is provided, send/receive
methods will use Direct reply-to (https://www.rabbitmq.com/direct-reply-to.html).
Setting this property to true will override that behavior and use
a temporary, auto-delete, queue for each request instead.
Changing this property has no effect once the first request has been
processed.value
- true to use temporary queues.public void setUseDirectReplyToContainer(boolean useDirectReplyToContainer)
DirectReplyToMessageListenerContainer
when
direct reply-to is available and being used. When false, a new consumer is created
for each request (the mechanism used in versions prior to 2.0). Default true.useDirectReplyToContainer
- set to false to use a consumer per request.setUseTemporaryReplyQueues(boolean)
public void setUserIdExpression(Expression userIdExpression)
userIdExpression
- the expression.public void setUserIdExpressionString(String userIdExpression)
userIdExpression
- the expression.public void setBeanName(String name)
setBeanName
in interface BeanNameAware
public void setTaskExecutor(Executor taskExecutor)
DirectReplyToMessageListenerContainer
.taskExecutor
- the executor.public void setUserCorrelationId(boolean userCorrelationId)
Users must therefore take create care to ensure uniqueness.
userCorrelationId
- true to use user correlation data.public boolean isUsePublisherConnection()
setUsePublisherConnection(boolean)
public void setUsePublisherConnection(boolean usePublisherConnection)
usePublisherConnection
- true to use a publisher connection.public void setNoLocalReplyConsumer(boolean noLocalReplyConsumer)
noLocalReplyConsumer
- true for a no-local consumer.AbstractMessageListenerContainer.setNoLocal(boolean)
,
Channel.basicConsume(String, boolean, String, boolean, boolean, Map, com.rabbitmq.client.Consumer)
public void setReplyErrorHandler(ErrorHandler replyErrorHandler)
replyErrorHandler
- the reply error handlersetUseDirectReplyToContainer(boolean)
@Nullable public Collection<String> expectedQueueNames()
expectedQueueNames
in interface ListenerContainerAware
@Nullable public Collection<CorrelationData> getUnconfirmed(long age)
age
- in millisecondspublic int getUnconfirmedCount()
protected void doStart()
protected void doStop()
protected boolean useDirectReplyTo()
replyAddress
set and
useTemporaryReplyQueues
is false.
When direct reply-to is not used, the template
will create a temporary, exclusive, auto-delete queue for the reply.
This method is invoked once only - when the first message is sent, from a synchronized block.
public void send(Message message) throws AmqpException
AmqpTemplate
send
in interface AmqpTemplate
message
- a message to sendAmqpException
- if there is a problempublic void send(String routingKey, Message message) throws AmqpException
AmqpTemplate
send
in interface AmqpTemplate
routingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problempublic void send(String exchange, String routingKey, Message message) throws AmqpException
AmqpTemplate
send
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problempublic void send(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
send
in interface RabbitOperations
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(Object object) throws AmqpException
AmqpTemplate
Message
and send it to a default exchange
with a default routing key.convertAndSend
in interface AmqpTemplate
object
- a message to sendAmqpException
- if there is a problempublic void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a default exchange
with a default routing key.correlationConvertAndSend
in interface RabbitOperations
object
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(String routingKey, Object object) throws AmqpException
AmqpTemplate
Message
and send it to a default exchange
with a specific routing key.convertAndSend
in interface AmqpTemplate
routingKey
- the routing keyobject
- a message to sendAmqpException
- if there is a problempublic void convertAndSend(String routingKey, Object object, CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a default exchange
with a specific routing key.convertAndSend
in interface RabbitOperations
routingKey
- the routing keyobject
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(String exchange, String routingKey, Object object) throws AmqpException
AmqpTemplate
Message
and send it to a specific exchange
with a specific routing key.convertAndSend
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keyobject
- a message to sendAmqpException
- if there is a problempublic void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a specific exchange
with a specific routing key.convertAndSend
in interface RabbitOperations
exchange
- the name of the exchangeroutingKey
- the routing keyobject
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
Message
and send it to a default exchange
with a default routing key.convertAndSend
in interface AmqpTemplate
message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problempublic void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
Message
and send it to a default exchange
with a specific routing key.convertAndSend
in interface AmqpTemplate
routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problempublic void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a default exchange
with a default routing key.convertAndSend
in interface RabbitOperations
message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a default exchange
with a specific routing key.convertAndSend
in interface RabbitOperations
routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problempublic void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
Message
and send it to a specific exchange
with a specific routing key.convertAndSend
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problempublic void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
Message
and send it to a specific exchange
with a specific routing key.convertAndSend
in interface RabbitOperations
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Message receive() throws AmqpException
AmqpTemplate
receive
in interface AmqpTemplate
AmqpException
- if there is a problem@Nullable public Message receive(String queueName)
AmqpTemplate
receive
in interface AmqpTemplate
queueName
- the name of the queue to poll@Nullable protected Message doReceiveNoWait(String queueName)
queueName
- the queue to receive from.@Nullable public Message receive(long timeoutMillis) throws AmqpException
AmqpTemplate
receive
in interface AmqpTemplate
timeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.AmqpException
- if there is a problem@Nullable public Message receive(String queueName, long timeoutMillis)
AmqpTemplate
receive
in interface AmqpTemplate
queueName
- the queue to receive fromtimeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.@Nullable public Object receiveAndConvert() throws AmqpException
AmqpTemplate
receiveAndConvert
in interface AmqpTemplate
AmqpException
- if there is a problem@Nullable public Object receiveAndConvert(String queueName) throws AmqpException
AmqpTemplate
receiveAndConvert
in interface AmqpTemplate
queueName
- the name of the queue to pollAmqpException
- if there is a problem@Nullable public Object receiveAndConvert(long timeoutMillis) throws AmqpException
AmqpTemplate
receiveAndConvert
in interface AmqpTemplate
timeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.AmqpException
- if there is a problem@Nullable public Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException
AmqpTemplate
receiveAndConvert
in interface AmqpTemplate
queueName
- the name of the queue to polltimeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.AmqpException
- if there is a problem@Nullable public <T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException
AmqpTemplate
SmartMessageConverter
.receiveAndConvert
in interface AmqpTemplate
T
- the type.type
- the type to convert to.AmqpException
- if there is a problem.@Nullable public <T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException
AmqpTemplate
SmartMessageConverter
.receiveAndConvert
in interface AmqpTemplate
T
- the type.queueName
- the name of the queue to polltype
- the type to convert to.AmqpException
- if there is a problem@Nullable public <T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException
AmqpTemplate
SmartMessageConverter
.receiveAndConvert
in interface AmqpTemplate
T
- the type.timeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.type
- the type to convert to.AmqpException
- if there is a problem@Nullable public <T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException
AmqpTemplate
SmartMessageConverter
.receiveAndConvert
in interface AmqpTemplate
T
- the type.queueName
- the name of the queue to polltimeoutMillis
- how long to wait before giving up. Zero value means the method
will return null
immediately if there is no message available. Negative
value makes method wait for a message indefinitely.type
- the type to convert to.AmqpException
- if there is a problempublic <R,S> boolean receiveAndReply(ReceiveAndReplyCallback<R,S> callback) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the replyTo
Address
from MessageProperties
or to default exchange
and default routingKey.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.true
, if message was receivedAmqpException
- if there is a problempublic <R,S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R,S> callback) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the replyTo
Address
from MessageProperties
or to default exchange
and default routingKey.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.queueName
- the queue name to receive a message.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.true
, if message was received.AmqpException
- if there is a problem.public <R,S> boolean receiveAndReply(ReceiveAndReplyCallback<R,S> callback, String exchange, String routingKey) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the provided exchange
and routingKey
.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.exchange
- the exchange name to send reply message.routingKey
- the routing key to send reply message.true
, if message was received.AmqpException
- if there is a problem.public <R,S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R,S> callback, String replyExchange, String replyRoutingKey) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the provided exchange
and routingKey
.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.queueName
- the queue name to receive a message.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.replyExchange
- the exchange name to send reply message.replyRoutingKey
- the routing key to send reply message.true
, if message was receivedAmqpException
- if there is a problempublic <R,S> boolean receiveAndReply(ReceiveAndReplyCallback<R,S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the replyTo
Address
from result of ReplyToAddressCallback
.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.replyToAddressCallback
- the callback to determine replyTo address at runtime.true
, if message was received.AmqpException
- if there is a problem.public <R,S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R,S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException
AmqpTemplate
ReceiveAndReplyCallback
and send reply message, if the callback
returns one, to the replyTo
Address
from result of ReplyToAddressCallback
.receiveAndReply
in interface AmqpTemplate
R
- The type of the request after conversion from the Message
.S
- The type of the response.queueName
- the queue name to receive a message.callback
- a user-provided ReceiveAndReplyCallback
implementation to
process received message and return a reply message.replyToAddressCallback
- the callback to determine replyTo address at runtime.true
, if message was receivedAmqpException
- if there is a problem@Nullable public Message sendAndReceive(Message message) throws AmqpException
AmqpTemplate
sendAndReceive
in interface AmqpTemplate
message
- a message to sendAmqpException
- if there is a problem@Nullable public Message sendAndReceive(Message message, @Nullable CorrelationData correlationData) throws AmqpException
AmqpException
@Nullable public Message sendAndReceive(String routingKey, Message message) throws AmqpException
AmqpTemplate
sendAndReceive
in interface AmqpTemplate
routingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problem@Nullable public Message sendAndReceive(String routingKey, Message message, @Nullable CorrelationData correlationData) throws AmqpException
AmqpException
@Nullable public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException
AmqpTemplate
sendAndReceive
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problem@Nullable public Message sendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) throws AmqpException
AmqpException
@Nullable public Object convertSendAndReceive(Object message) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
message
- a message to sendAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(Object message, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
message
- a message to send.correlationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String routingKey, Object message) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
routingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String routingKey, Object message, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
routingKey
- the routing keymessage
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String exchange, String routingKey, Object message, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
AmqpTemplate
convertSendAndReceive
in interface AmqpTemplate
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentAmqpException
- if there is a problem@Nullable public Object convertSendAndReceive(String exchange, String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
RabbitOperations
convertSendAndReceive
in interface RabbitOperations
exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(Object message, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.message
- a message to send.responseType
- the type to convert the reply to.AmqpException
- if there is a problem.@Nullable public <T> T convertSendAndReceiveAsType(Object message, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException
RabbitOperations
SmartMessageConverter
.convertSendAndReceiveAsType
in interface RabbitOperations
T
- the type.message
- a message to send.correlationData
- data to correlate publisher confirms.responseType
- the type to convert the reply to.AmqpException
- if there is a problem.@Nullable public <T> T convertSendAndReceiveAsType(String routingKey, Object message, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.routingKey
- the routing keymessage
- a message to sendresponseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String routingKey, Object message, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException
RabbitOperations
SmartMessageConverter
.convertSendAndReceiveAsType
in interface RabbitOperations
T
- the type.routingKey
- the routing keymessage
- a message to sendcorrelationData
- data to correlate publisher confirms.responseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendresponseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(Object message, @Nullable MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentresponseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(Object message, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException
RabbitOperations
SmartMessageConverter
.convertSendAndReceiveAsType
in interface RabbitOperations
T
- the type.message
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.responseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentresponseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException
RabbitOperations
SmartMessageConverter
.convertSendAndReceiveAsType
in interface RabbitOperations
T
- the type.routingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.responseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType) throws AmqpException
AmqpTemplate
SmartMessageConverter
.convertSendAndReceiveAsType
in interface AmqpTemplate
T
- the type.exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentresponseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable public <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException
RabbitOperations
SmartMessageConverter
.convertSendAndReceiveAsType
in interface RabbitOperations
T
- the type.exchange
- the name of the exchangeroutingKey
- the routing keymessage
- a message to sendmessagePostProcessor
- a processor to apply to the message before it is sentcorrelationData
- data to correlate publisher confirms.responseType
- the type to convert the reply to.AmqpException
- if there is a problem@Nullable protected Message convertSendAndReceiveRaw(String exchange, String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData)
exchange
- the exchange.routingKey
- the routing key.message
- the data to send.messagePostProcessor
- a message post processor (can be null).correlationData
- correlation data (can be null).@Nullable protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData)
exchange
- the exchange nameroutingKey
- the routing keymessage
- the message to sendcorrelationData
- the correlation data for confirms@Nullable protected Message doSendAndReceiveWithTemporary(String exchange, String routingKey, Message message, CorrelationData correlationData)
@Nullable protected Message doSendAndReceiveWithFixed(String exchange, String routingKey, Message message, CorrelationData correlationData)
protected void replyTimedOut(String correlationId)
correlationId
- the correlationIdpublic Boolean isMandatoryFor(Message message)
message
- the message.@Nullable public <T> T execute(ChannelCallback<T> action)
RabbitOperations
execute
in interface RabbitOperations
T
- the return type.action
- the call back.ChannelCallback.doInRabbit(com.rabbitmq.client.Channel)
.@Nullable public <T> T invoke(RabbitOperations.OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks, @Nullable com.rabbitmq.client.ConfirmCallback nacks)
RabbitOperations
invoke
in interface RabbitOperations
T
- the return type.action
- the callback.acks
- a confirm callback for acks.nacks
- a confirm callback for nacks.public boolean waitForConfirms(long timeout)
RabbitOperations
RabbitOperations.invoke(OperationsCallback)
operation.
Requires CachingConnectionFactory#setPublisherConfirms(true)
.waitForConfirms
in interface RabbitOperations
timeout
- the timeoutChannel.waitForConfirms(long)
public void waitForConfirmsOrDie(long timeout)
RabbitOperations
RabbitOperations.invoke(OperationsCallback)
operation.
Requires CachingConnectionFactory#setPublisherConfirms(true)
.waitForConfirmsOrDie
in interface RabbitOperations
timeout
- the timeoutChannel.waitForConfirmsOrDie(long)
public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory)
public void doSend(com.rabbitmq.client.Channel channel, String exchangeArg, String routingKeyArg, Message message, boolean mandatory, @Nullable CorrelationData correlationData) throws Exception
channel
- The RabbitMQ Channel to operate within.exchangeArg
- The name of the RabbitMQ exchange to send to.routingKeyArg
- The routing key.message
- The Message to send.mandatory
- The mandatory flag.correlationData
- The correlation data.Exception
- If thrown by RabbitMQ API methodsprotected void sendToRabbit(com.rabbitmq.client.Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException
IOException
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
channel
- the Channel to checkConnectionFactoryUtils.isChannelTransactional(com.rabbitmq.client.Channel, org.springframework.amqp.rabbit.connection.ConnectionFactory)
,
RabbitAccessor.isChannelTransacted()
public void addListener(com.rabbitmq.client.Channel channel)
channel
- the channel.public void handleConfirm(PendingConfirm pendingConfirm, boolean ack)
PublisherCallbackChannel.Listener
handleConfirm
in interface PublisherCallbackChannel.Listener
pendingConfirm
- The pending confirmation, containing
correlation data.ack
- true when 'ack', false when 'nack'.public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException
handleReturn
in interface PublisherCallbackChannel.Listener
IOException
public boolean isConfirmListener()
isConfirmListener
in interface PublisherCallbackChannel.Listener
public boolean isReturnListener()
isReturnListener
in interface PublisherCallbackChannel.Listener
public void revoke(com.rabbitmq.client.Channel channel)
PublisherCallbackChannel.Listener
revoke
in interface PublisherCallbackChannel.Listener
channel
- The channel.public String getUUID()
PublisherCallbackChannel.Listener
getUUID
in interface PublisherCallbackChannel.Listener
public void onMessage(Message message)
onMessage
in interface MessageListener