JMS Support
Spring Integration provides channel adapters for receiving and sending JMS messages.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.2.0-M1</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.2.0-M1"
The jakarta.jms:jakarta.jms-api
must be added explicitly via some JMS vendor-specific implementation, e.g. Apache ActiveMQ.
There are actually two JMS-based inbound Channel Adapters.
The first uses Spring’s JmsTemplate
to receive based on a polling period.
The second is “message-driven” and relies on a Spring MessageListener
container.
The outbound channel adapter uses the JmsTemplate
to convert and send a JMS message on demand.
By using JmsTemplate
and the MessageListener
container, Spring Integration relies on Spring’s JMS support.
This is important to understand, since most of the attributes exposed on these adapters configure the underlying JmsTemplate
and MessageListener
container.
For more details about JmsTemplate
and the MessageListener
container, see the Spring JMS documentation.
Whereas the JMS channel adapters are intended for unidirectional messaging (send-only or receive-only), Spring Integration also provides inbound and outbound JMS Gateways for request and reply operations.
The inbound gateway relies on one of Spring’s MessageListener
container implementations for message-driven reception.
It is also capable of sending a return value to the reply-to
destination, as provided by the received message.
The outbound gateway sends a JMS message to a request-destination
(or request-destination-name
or request-destination-expression
) and then receives a reply message.
You can explicitly configure the reply-destination
reference (or reply-destination-name
or reply-destination-expression
).
Otherwise, the outbound gateway uses a JMS TemporaryQueue.
Prior to Spring Integration 2.2, if necessary, a TemporaryQueue
was created (and removed) for each request or reply.
Beginning with Spring Integration 2.2, you can configure the outbound gateway to use a MessageListener
container to receive replies instead of directly using a new (or cached) Consumer
to receive the reply for each request.
When so configured, and no explicit reply destination is provided, a single TemporaryQueue
is used for each gateway instead of one for each request.
Starting with version 6.0, the outbound gateway creates a TemporaryTopic
instead of TemporaryQueue
if replyPubSubDomain
option is set to true
.
Some JMS vendors handle these destinations differently.
Inbound Channel Adapter
The inbound channel adapter requires a reference to either a single JmsTemplate
instance or both a ConnectionFactory
and a Destination
(you can provide a 'destinationName' in place of the 'destination' reference).
The following example defines an inbound channel adapter with a Destination
reference:
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
Notice from the preceding configuration that the inbound-channel-adapter is a polling consumer.
That means that it invokes receive() when triggered.
You should use this should only in situations where polling is done relatively infrequently and timeliness is not important.
For all other situations (a vast majority of JMS-based use-cases), the message-driven-channel-adapter (described later) is a better option.
|
By default, all JMS adapters that require a reference to the ConnectionFactory automatically look for a bean named jmsConnectionFactory .
That is why you do not see a connection-factory attribute in many of the examples.
However, if your JMS ConnectionFactory has a different bean name, you need to provide that attribute.
|
If extract-payload
is set to true
(the default), the received JMS Message is passed through the MessageConverter
.
When relying on the default SimpleMessageConverter
, this means that the resulting Spring Integration Message has the JMS message’s body as its payload.
A JMS TextMessage
produces a string-based payload, a JMS BytesMessage
produces a byte array payload, and the serializable instance of a JMS ObjectMessage
becomes the Spring Integration message’s payload.
If you prefer to have the raw JMS message as the Spring Integration message’s payload, set the extractPayload
option to false
.
Starting with version 5.0.8, a default value of the receive-timeout
is -1
(no wait) for the org.springframework.jms.connection.CachingConnectionFactory
and cacheConsumers
, otherwise it is 1 second.
The JMS Inbound Channel Adapter crates a DynamicJmsTemplate
based on the provided ConnectionFactory
and options.
If an external JmsTemplate
is required (e.g. in Spring Boot environment), or ConnectionFactory
is not caching, or no cacheConsumers
, it is recommended to set jmsTemplate.receiveTimeout(-1)
if a non-blocking consumption is expected:
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
Transactions
Starting with version 4.0, the inbound channel adapter supports the session-transacted
attribute.
In earlier versions, you had to inject a JmsTemplate
with sessionTransacted
set to true
.
(The adapter did let you set the acknowledge
attribute to transacted
, but this was incorrect and did not work).
Note, however, that setting session-transacted
to true
has little value, because the transaction is committed
immediately after the receive()
operation and before the message is sent to the channel
.
If you want the entire flow to be transactional (for example, if there is a downstream outbound channel adapter), you must use a transactional
poller with a JmsTransactionManager
.
Alternatively, consider using a jms-message-driven-channel-adapter
with acknowledge
set to transacted
(the default).
Message-driven Channel Adapter
The message-driven-channel-adapter
requires a reference to either an instance of a Spring MessageListener
container (any subclass of AbstractMessageListenerContainer
) or both ConnectionFactory
and Destination
(a 'destinationName' can be provided in place of the 'destination' reference).
The following example defines a message-driven channel adapter with a Destination
reference:
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
The message-driven adapter also accepts several properties that pertain to the If you have a custom listener container implementation (usually a subclass of |
You can’t use the Spring JMS namespace element It is recommended to configure a regular |
Starting with version 4.2, the default acknowledge mode is transacted , unless you provide an external container.
In that case, you should configure the container as needed.
We recommend using transacted with the DefaultMessageListenerContainer to avoid message loss.
|
The 'extract-payload' property has the same effect, and its default value is 'true'.
The poller
element is not applicable for a message-driven channel adapter, as it is actively invoked.
For most scenarios, the message-driven approach is better, since the messages are passed along to the MessageChannel
as soon as they are received from the underlying JMS consumer.
Finally, the <message-driven-channel-adapter>
element also accepts the 'error-channel' attribute.
This provides the same basic functionality, as described in Enter the GatewayProxyFactoryBean
.
The following example shows how to set an error channel on a message-driven channel adapter:
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
When comparing the preceding example to the generic gateway configuration or the JMS 'inbound-gateway' that we discuss later, the key difference is that we are in a one-way flow, since this is a 'channel-adapter', not a gateway.
Therefore, the flow downstream from the 'error-channel' should also be one-way.
For example, it could send to a logging handler, or it could connect to a different JMS <outbound-channel-adapter>
element.
When consuming from topics, set the pub-sub-domain
attribute to true.
Set subscription-durable
to true
for a durable subscription or subscription-shared
for a shared subscription (which requires a JMS 2.0 broker and has been available since version 4.2).
Use subscription-name
to name the subscription.
Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers.
Previously, the connection and consumers remained open.
To revert to the previous behavior, set the shutdownContainerOnStop
on the JmsMessageDrivenEndpoint
to false
.
Inbound Conversion Errors
Starting with version 4.2, the 'error-channel' is used for the conversion errors, too.
Previously, if a JMS <message-driven-channel-adapter/>
or <inbound-gateway/>
could not deliver a message due to a conversion error, an exception would be thrown back to the container.
If the container is configured to use transactions, the message is rolled back and redelivered repeatedly.
The conversion process occurs before and during message construction so that such errors are not sent to the 'error-channel'.
Now such conversion exceptions result in an ErrorMessage
being sent to the 'error-channel', with the exception as the payload
.
If you wish the transaction to roll back, and you have an 'error-channel' defined, the integration flow on the 'error-channel' must re-throw the exception (or another exception).
If the error flow does not throw an exception, the transaction is committed and the message is removed.
If no 'error-channel' is defined, the exception is thrown back to the container, as before.
Outbound Channel Adapter
The JmsSendingMessageHandler
implements the MessageHandler
interface and is capable of converting Spring Integration Messages
to JMS messages and then sending to a JMS destination.
It requires either a jmsTemplate
reference or both jmsConnectionFactory
and destination
references (destinationName
may be provided in place of destination
).
As with the inbound channel adapter, the easiest way to configure this adapter is with the namespace support.
The following configuration produces an adapter that receives Spring Integration messages from the exampleChannel
, converts those into JMS messages, and sends them to the JMS destination reference whose bean name is outQueue
:
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow("exampleChannel") {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
jmsOutboundFlow() {
integrationFlow('exampleChannel') {
handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
.with {
destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression '10000'
configureJmsTemplate {
it.explicitQosEnabled true
}
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
As with the inbound channel adapters, there is an 'extract-payload' property.
However, the meaning is reversed for the outbound adapter.
Rather than applying to the JMS message, the boolean property applies to the Spring Integration message payload.
In other words, the decision is whether to pass the Spring Integration message itself as the JMS message body or to pass the Spring Integration message payload as the JMS message body.
The default value is 'true'.
Therefore, if you pass a Spring Integration message whose payload is a String
, a JMS TextMessage
is created.
If, on the other hand, you want to send the actual Spring Integration message to another system over JMS, set it to 'false'.
Regardless of the boolean value for payload extraction, the Spring Integration MessageHeaders map to JMS properties, as long as you rely on the default converter or provide a reference to another instance of MessageConverter .
(The same holds true for 'inbound' adapters, except that, in those cases, the JMS properties map to Spring Integration MessageHeaders ).
|
Starting with version 5.1, the <int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
) can be configured with the deliveryModeExpression
and timeToLiveExpression
properties to evaluate an appropriate QoS values for JMS message to send at runtime against request Spring Message
.
The new setMapInboundDeliveryMode(true)
and setMapInboundExpiration(true)
options of the DefaultJmsHeaderMapper
may facilitate as a source of the information for the dynamic deliveryMode
and timeToLive
from message headers:
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
Transactions
Starting with version 4.0, the outbound channel adapter supports the session-transacted
attribute.
In earlier versions, you had to inject a JmsTemplate
with sessionTransacted
set to true
.
The attribute now sets the property on the built-in default JmsTemplate
.
If a transaction exists (perhaps from an upstream message-driven-channel-adapter
), the send operation is performed within the same transaction.
Otherwise, a new transaction is started.
Inbound Gateway
Spring Integration’s message-driven JMS inbound-gateway delegates to a MessageListener
container, supports dynamically adjusting concurrent consumers, and can also handle replies.
The inbound gateway requires references to a ConnectionFactory
and a request Destination
(or 'requestDestinationName').
The following example defines a JMS inbound-gateway
that receives from the JMS queue referenced by the bean id, inQueue
, and sends to the Spring Integration channel named exampleChannel
:
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
Since the gateways provide request-reply behavior instead of unidirectional send or receive behavior, they also have two distinct properties for “payload extraction” (as discussed earlier for the channel adapters' 'extract-payload' setting). For an inbound gateway, the 'extract-request-payload' property determines whether the received JMS Message body is extracted. If 'false', the JMS message itself becomes the Spring Integration message payload. The default is 'true'.
Similarly, for an inbound-gateway, the 'extract-reply-payload' property applies to the Spring Integration message that is to be converted into a reply JMS Message.
If you want to pass the whole Spring Integration message (as the body of a JMS ObjectMessage), set value this to 'false'.
By default, it is also 'true' that the Spring Integration message payload is converted into a JMS Message (for example, a String
payload becomes a JMS TextMessage).
As with anything else, gateway invocation might result in error. By default, a producer is not notified of the errors that might have occurred on the consumer side and times out waiting for the reply. However, there might be times when you want to communicate an error condition back to the consumer (in other words, you might want to treat the exception as a valid reply by mapping it to a message). To accomplish this, JMS inbound gateway provides support for a message channel to which errors can be sent for processing, potentially resulting in a reply message payload that conforms to some contract that defines what a caller may expect as an “error” reply. You can use the error-channel attribute to configure such a channel, as the following example shows:
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
You might notice that this example looks very similar to that included within Enter the GatewayProxyFactoryBean
.
The same idea applies here: The exceptionTransformer
could be a POJO that creates error-response objects, you could reference the nullChannel
to suppress the errors, or you could leave 'error-channel' out to let the exception propagate.
When consuming from topics, set the pub-sub-domain
attribute to true.
Set subscription-durable
to true
for a durable subscription or subscription-shared
for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2).
Use subscription-name
to name the subscription.
Starting with version 4.2, the default acknowledge mode is transacted , unless an external container is provided.
In that case, you should configure the container as needed.
We recommend that you use transacted with the DefaultMessageListenerContainer to avoid message loss.
|
Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers.
Previously, the connection and consumers remained open.
To revert to the previous behavior, set the shutdownContainerOnStop
on the JmsInboundGateway
to false
.
By default, the JmsInboundGateway
looks for a jakarta.jms.Message.getJMSReplyTo()
property in the received message to determine where to send a reply.
Otherwise, it can be configured with a static defaultReplyDestination
, or defaultReplyQueueName
or defaultReplyTopicName
.
In addition, starting with version 6.1, a replyToExpression
can be configured on a provided ChannelPublishingJmsMessageListener
to determine the reply destination dynamically, if the standard JMSReplyTo
property is null
on the request.
The received jakarta.jms.Message
is used the root evaluation context object.
The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message:
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
Outbound Gateway
The outbound gateway creates JMS messages from Spring Integration messages and sends them to a request-destination
.
It then handles the JMS reply message either by using a selector to receive from the reply-destination
that you configure or, if no reply-destination
is provided, by creating JMS TemporaryQueue
(or TemporaryTopic
if replyPubSubDomain= true
) instances.
Using a If you specify a reply destination, you are advised to not use cached consumers.
Alternatively, consider using a |
The following example shows how to configure an outbound gateway:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
The 'outbound-gateway' payload extraction properties are inversely related to those of the 'inbound-gateway' (see the earlier discussion). That means that the 'extract-request-payload' property value applies to the Spring Integration message being converted into a JMS message to be sent as a request. The 'extract-reply-payload' property value applies to the JMS message received as a reply and is then converted into a Spring Integration message to be subsequently sent to the 'reply-channel', as shown in the preceding configuration example.
Using a <reply-listener/>
Spring Integration 2.2 introduced an alternative technique for handling replies.
If you add a <reply-listener/>
child element to the gateway instead of creating a consumer for each reply, a MessageListener
container is used to receive the replies and hand them over to the requesting thread.
This provides a number of performance benefits as well as alleviating the cached consumer memory utilization problem described in the earlier caution.
When using a <reply-listener/>
with an outbound gateway that has no reply-destination
, instead of creating a TemporaryQueue
for each request, a single TemporaryQueue
is used.
(The gateway creates an additional TemporaryQueue
, as necessary, if the connection to the broker is lost and recovered).
If replyPubSubDomain
is set to true
, starting with version 6.0, a TemporaryTopic
is created instead.
When using a correlation-key
, multiple gateways can share the same reply destination, because the listener container uses a selector that is unique to each gateway.
If you specify a reply listener and specify a reply destination (or reply destination name) but provide no correlation key, the gateway logs a warning and falls back to pre-version 2.2 behavior. This is because there is no way to configure a selector in this case. Thus, there is no way to avoid a reply going to a different gateway that might be configured with the same reply destination. Note that, in this situation, a new consumer is used for each request, and consumers can build up in memory as described in the caution above; therefore cached consumers should not be used in this case. |
The following example shows a reply listener with default attributes:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
The listener is very lightweight, and we anticipate that, in most cases, you need only a single consumer.
However, you can add attributes such as concurrent-consumers
, max-concurrent-consumers
, and others.
See the schema for a complete list of supported attributes, together with the Spring JMS documentation for their meanings.
Idle Reply Listeners
Starting with version 4.2, you can start the reply listener as needed (and stop it after an idle time) instead of running for the duration of the gateway’s lifecycle. This can be useful if you have many gateways in the application context where they are mostly idle. One such situation is a context with many (inactive) partitioned Spring Batch jobs using Spring Integration and JMS for partition distribution. If all the reply listeners are active, the JMS broker has an active consumer for each gateway. By enabling the idle timeout, each consumer exists only while the corresponding batch job is running (and for a short time after it finishes).
See idle-reply-listener-timeout
in Attribute Reference.
Gateway Reply Correlation
This section describes the mechanisms used for reply correlation (ensuring the originating gateway receives replies to only its requests), depending on how the gateway is configured. See Attribute Reference for complete description of the attributes discussed here.
The following list describes the various scenarios (the numbers are for identification — order does not matter):
-
No
reply-destination*
properties and no<reply-listener>
A
TemporaryQueue
is created for each request and deleted when the request is complete (successfully or otherwise).correlation-key
is irrelevant. -
A
reply-destination*
property is provided and neither a<reply-listener/>
nor acorrelation-key
is providedThe
JMSCorrelationID
equal to the outgoing message is used as a message selector for the consumer:messageSelector = "JMSCorrelationID = '" + messageId + "'"
The responding system is expected to return the inbound
JMSMessageID
in the replyJMSCorrelationID
. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’sMessageListenerAdapter
for message-driven POJOs.When you use this configuration, you should not use a topic for replies. The reply may be lost. -
A
reply-destination*
property is provided, no<reply-listener/>
is provided, andcorrelation-key="JMSCorrelationID"
The gateway generates a unique correlation IS and inserts it in the
JMSCorrelationID
header. The message selector is:messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
The responding system is expected to return the inbound
JMSCorrelationID
in the replyJMSCorrelationID
. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’sMessageListenerAdapter
for message-driven POJOs. -
A
reply-destination*
property is provided, no<reply-listener/>
is provided, andcorrelation-key="myCorrelationHeader"
The gateway generates a unique correlation ID and inserts it in the
myCorrelationHeader
message property. Thecorrelation-key
can be any user-defined value. The message selector is:messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
The responding system is expected to return the inbound
myCorrelationHeader
in the replymyCorrelationHeader
. -
A
reply-destination*
property is provided, no<reply-listener/>
is provided, andcorrelation-key="JMSCorrelationID*"
(Note the*
in the correlation key.)The gateway uses the value in the
jms_correlationId
header (if present) from the request message and inserts it in theJMSCorrelationID
header. The message selector is:messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
The user must ensure this value is unique.
If the header does not exist, the gateway behaves as in
3
.The responding system is expected to return the inbound
JMSCorrelationID
in the replyJMSCorrelationID
. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’sMessageListenerAdapter
for message-driven POJOs. -
No
reply-destination*
properties is provided, and a<reply-listener>
is providedA temporary queue is created and used for all replies from this gateway instance. No correlation data is needed in the message, but the outgoing
JMSMessageID
is used internally in the gateway to direct the reply to the correct requesting thread. -
A
reply-destination*
property is provided, a<reply-listener>
is provided, and nocorrelation-key
is providedNot allowed.
The
<reply-listener/>
configuration is ignored, and the gateway behaves as in2
. A warning log message is written to indicate this situation. -
A
reply-destination*
property is provided, a<reply-listener>
is provided, andcorrelation-key="JMSCorrelationID"
The gateway has a unique correlation ID and inserts it, together with an incrementing value in the
JMSCorrelationID
header (gatewayId + "_" + ++seq
). The message selector is:messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
The responding system is expected to return the inbound
JMSCorrelationID
in the replyJMSCorrelationID
. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’sMessageListenerAdapter
for message-driven POJOs. Since each gateway has a unique ID, each instance gets only its own replies. The complete correlation data is used to route the reply to the correct requesting thread. -
A
reply-destination*
property is provided a<reply-listener/>
is provided, andcorrelation-key="myCorrelationHeader"
The gateway has a unique correlation ID and inserts it, together with an incrementing value in the
myCorrelationHeader
property (gatewayId + "_" + ++seq
). Thecorrelation-key
can be any user-defined value. The message selector is:messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
The responding system is expected to return the inbound
myCorrelationHeader
in the replymyCorrelationHeader
. Since each gateway has a unique ID, each instance only gets its own replies. The complete correlation data is used to route the reply to the correct requesting thread. -
A
reply-destination*
property is provided, a<reply-listener/>
is provided, andcorrelation-key="JMSCorrelationID*"
(Note the
*
in the correlation key)Not allowed.
User-supplied correlation IDs are not permitted with a reply listener. The gateway does not initialize with this configuration.
Async Gateway
Starting with version 4.3, you can now specify async="true"
(or setAsync(true)
in Java) when configuring the outbound gateway.
By default, when a request is sent to the gateway, the requesting thread is suspended until the reply is received.
The flow then continues on that thread.
If async
is true
, the requesting thread is released immediately after the send()
completes, and the reply is returned (and the flow continues) on the listener container thread.
This can be useful when the gateway is invoked on a poller thread.
The thread is released and is available for other tasks within the framework.
Thee async
requires a <reply-listener/>
(or setUseReplyContainer(true)
when using Java configuration).
It also requires a correlationKey
(usually JMSCorrelationID
) to be specified.
If either of these conditions are not met, async
is ignored.
Attribute Reference
The following listing shows all the available attributes for an outbound-gateway
:
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | Reference to a jakarta.jms.ConnectionFactory .
The default jmsConnectionFactory . |
||
2 | The name of a property that contains correlation data to correlate responses with replies.
If omitted, the gateway expects the responding system to return the value of the outbound JMSMessageID header in the JMSCorrelationID header.
If specified, the gateway generates a correlation ID and populates the specified property with it.
The responding system must echo back that value in the same property.
It can be set to JMSCorrelationID , in which case the standard header is used instead of a String property to hold the correlation data.
When you use a <reply-container/> , you must specify the correlation-key if you provide an explicit reply-destination .
Starting with version 4.0.1, this attribute also supports the value JMSCorrelationID* , which means that if the outbound message already has a JMSCorrelationID (mapped from the jms_correlationId ) header, it is used instead of generating a new one.
Note, the JMSCorrelationID* key is not allowed when you use a <reply-container/> , because the container needs to set up a message selector during initialization.
|
||
3 | A boolean value indicating whether the delivery mode should be DeliveryMode.PERSISTENT (true ) or DeliveryMode.NON_PERSISTENT (false ).
This setting takes effect only if explicit-qos-enabled is true . |
||
4 | A DestinationResolver .
The default is a DynamicDestinationResolver , which maps the destination name to a queue or topic of that name. |
||
5 | When set to true , it enables the use of quality of service attributes: priority , delivery-mode , and time-to-live . |
||
6 | When set to true (the default), the payload of the Spring Integration reply message is created from the JMS Reply message’s body (by using the MessageConverter ).
When set to false , the entire JMS message becomes the payload of the Spring Integration message. |
||
7 | When set to true (the default), the payload of the Spring Integration message is converted to a JMSMessage (by using the MessageConverter ).
When set to false , the entire Spring Integration Message is converted to the JMSMessage .
In both cases, the Spring Integration message headers are mapped to JMS headers and properties by using the HeaderMapper . |
||
8 | A HeaderMapper used to map Spring Integration message headers to and from JMS message headers and properties. |
||
9 | A reference to a MessageConverter for converting between JMS messages and the Spring Integration message payloads (or messages if extract-request-payload is false ).
The default is a SimpleMessageConverter . |
||
10 | The default priority of request messages.
Overridden by the message priority header, if present.
Its range is 0 to 9 .
This setting takes effect only if explicit-qos-enabled is true . |
||
11 | The time (in milliseconds) to wait for a reply.
The default is 5000 (five seconds). |
||
12 | The channel to which the reply message is sent. | ||
13 | A reference to a Destination , which is set as the JMSReplyTo header.
At most, only one of reply-destination , reply-destination-expression , or reply-destination-name is allowed.
If none is provided, a TemporaryQueue is used for replies to this gateway. |
||
14 | A SpEL expression evaluating to a Destination , which will be set as the JMSReplyTo header.
The expression can result in a Destination object or a String .
It is used by the DestinationResolver to resolve the actual Destination .
At most, only one of reply-destination , reply-destination-expression , or reply-destination-name is allowed.
If none is provided, a TemporaryQueue is used for replies to this gateway. |
||
15 | The name of the destination that is set as the JMSReplyTo header.
It is used by the DestinationResolver to resolve the actual Destination .
At most, only one of reply-destination , reply-destination-expression , or reply-destination-name is allowed.
If none is provided, a TemporaryQueue is used for replies to this gateway. |
||
16 | When set to true , it indicates that any reply Destination resolved by the DestinationResolver should be a Topic rather then a Queue . |
||
17 | The time the gateway waits when sending the reply message to the reply-channel .
This only has an effect if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full.
The default is infinity. |
||
18 | The channel on which this gateway receives request messages. | ||
19 | A reference to a Destination to which request messages are sent.
One of reply-destination , reply-destination-expression , or reply-destination-name is required.
You can use only one of those three attributes. |
||
20 | A SpEL expression evaluating to a Destination to which request messages are sent.
The expression can result in a Destination object or a String .
It is used by the DestinationResolver to resolve the actual Destination .
One of the reply-destination , reply-destination-expression , or reply-destination-name is required.
You can use only one of those three attributes. |
||
21 | The name of the destination to which request messages are sent.
It is used by the DestinationResolver to resolve the actual Destination .
One of reply-destination , reply-destination-expression , or reply-destination-name is required.
You can use only one of those three attributes. |
||
22 | When set to true , it indicates that any request Destination resolved by the DestinationResolver should be a Topic rather then a Queue . |
||
23 | Specifies the message time to live.
This setting takes effect only if explicit-qos-enabled is true . |
||
24 | Specifies whether this outbound gateway must return a non-null value.
By default, this value is true , and a MessageTimeoutException is thrown when the underlying service does not return a value after the receive-timeout .
Note that, if the service is never expected to return a reply, it would be better to use a <int-jms:outbound-channel-adapter/> instead of a <int-jms:outbound-gateway/> with requires-reply="false" .
With the latter, the sending thread is blocked, waiting for a reply for the receive-timeout period. |
||
25 | When you use a <reply-listener /> , its lifecycle (start and stop) matches that of the gateway by default.
When this value is greater than 0 , the container is started on demand (when a request is sent).
The container continues to run until at least this time elapses with no requests being received (and until no replies are outstanding).
The container is started again on the next request.
The stop time is a minimum and may actually be up to 1.5x this value. |
||
26 | See Async Gateway. | ||
27 | When this element is included, replies are received by an asynchronous MessageListenerContainer rather than creating a consumer for each reply.
This can be more efficient in many cases. |
Mapping Message Headers to and from JMS Message
JMS messages can contain meta-information such as JMS API headers and simple properties.
You can map those to and from Spring Integration message headers by using JmsHeaderMapper
.
The JMS API headers are passed to the appropriate setter methods (such as setJMSReplyTo
), whereas other headers are copied to the general properties of the JMS Message.
JMS outbound gateway is bootstrapped with the default implementation of JmsHeaderMapper
, which will map standard JMS API Headers as well as primitive or String
message headers.
You could also provide a custom header mapper by using the header-mapper
attribute of inbound and outbound gateways.
Many JMS vendor-specific clients don’t allow setting the deliveryMode , priority and timeToLive properties directly on an already created JMS message.
They are considered to be QoS properties and therefore have to be propagated to the target MessageProducer.send(message, deliveryMode, priority, timeToLive) API.
For this reason the DefaultJmsHeaderMapper doesn’t map appropriate Spring Integration headers (or expression results) into the mentioned JMS message properties.
Instead, a DynamicJmsTemplate is used by the JmsSendingMessageHandler to propagate header values from the request message into the MessageProducer.send() API.
To enable this feature, you must configure the outbound endpoint with a DynamicJmsTemplate with its explicitQosEnabled property set to true.
The Spring Integration Java DSL configures a DynamicJmsTemplate by default but you must still set the explicitQosEnabled property.
|
Since version 4.0, the JMSPriority header is mapped to the standard priority header for inbound messages.
Previously, the priority header was only used for outbound messages.
To revert to the previous behavior (that is, to not map the inbound priority), set the mapInboundPriority property of DefaultJmsHeaderMapper to false .
|
Since version 4.3, the DefaultJmsHeaderMapper maps the standard correlationId header as a message property by invoking its toString() method (correlationId is often a UUID , which is not supported by JMS).
On the inbound side, it is mapped as a String .
This is independent of the jms_correlationId header, which is mapped to and from the JMSCorrelationID header.
The JMSCorrelationID is generally used to correlate requests and replies, whereas the correlationId is often used to combine related messages into a group (such as with an aggregator or a resequencer).
|
Starting with version 5.1, the DefaultJmsHeaderMapper
can be configured for mapping inbound JMSDeliveryMode
and JMSExpiration
properties:
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
These JMS properties are mapped to the JmsHeaders.DELIVERY_MODE
and JmsHeaders.EXPIRATION
Spring Message headers respectively.
Message Conversion, Marshalling, and Unmarshalling
If you need to convert the message, all JMS adapters and gateways let you provide a MessageConverter
by setting the message-converter
attribute.
To do so, provide the bean name of an instance of MessageConverter
that is available within the same ApplicationContext.
Also, to provide some consistency with marshaller and unmarshaller interfaces, Spring provides MarshallingMessageConverter
, which you can configure with your own custom marshallers and unmarshallers.
The following example shows how to do so
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
When you provide your own MessageConverter instance, it is still wrapped within the HeaderMappingMessageConverter .
This means that the 'extract-request-payload' and 'extract-reply-payload' properties can affect the actual objects passed to your converter.
The HeaderMappingMessageConverter itself delegates to a target MessageConverter while also mapping the Spring Integration MessageHeaders to JMS message properties and back again.
|
JMS-backed Message Channels
The channel adapters and gateways featured earlier are all intended for applications that integrate with other external systems.
The inbound options assume that some other system is sending JMS messages to the JMS destination, and the outbound options assume that some other system is receiving from the destination.
The other system may or may not be a Spring Integration application.
Of course, when sending a Spring Integration message instance as the body of the JMS message itself (with 'extract-payload' value set to false
), it is assumed that the other system is based on Spring Integration.
However, that is by no means a requirement.
That flexibility is one of the benefits of using a message-based integration option with the abstraction of “channels”( or destinations in the case of JMS).
Sometimes, both the producer and consumer for a given JMS Destination are intended to be part of the same application, running within the same process. You can accomplish this by using a pair of inbound and outbound channel adapters. The problem with that approach is that you need two adapters, even though, conceptually, the goal is to have a single message channel. A better option is supported as of Spring Integration version 2.0. Now it is possible to define a single “channel” when using the JMS namespace, as the following example shows:
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
The channel in the preceding example behaves much like a normal <channel/>
element from the main Spring Integration namespace.
It can be referenced by both the input-channel
and output-channel
attributes of any endpoint.
The difference is that this channel is backed by a JMS Queue instance named exampleQueue
.
This means that asynchronous messaging is possible between the producing and consuming endpoints.
However, unlike the simpler asynchronous message channels created by adding a <queue/>
element within a non-JMS <channel/>
element, the messages are not stored in an in-memory queue.
Instead, those messages are passed within a JMS message body, and the full power of the underlying JMS provider is then available for that channel.
Probably the most common rationale for using this alternative is to take advantage of the persistence made available by the store-and-forward approach of JMS messaging.
If configured properly, the JMS-backed message channel also supports transactions.
In other words, a producer would not actually write to a transactional JMS-backed channel if its send operation is part of a transaction that rolls back.
Likewise, a consumer would not physically remove a JMS message from the channel if the reception of that message is part of a transaction that rolls back.
Note that the producer and consumer transactions are separate in such a scenario.
This is significantly different than the propagation of a transactional context across a simple, synchronous <channel/>
element that has no <queue/>
child element.
Since the preceding example above references a JMS Queue instance, it acts as a point-to-point channel. If, on the other hand, you need publish-subscribe behavior, you can use a separate element and reference a JMS Topic instead. The following example shows how to do so:
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
For either type of JMS-backed channel, the name of the destination may be provided instead of a reference, as the following example shows:
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
In the preceding examples, the destination names are resolved by Spring’s default DynamicDestinationResolver
implementation, but you could provide any implementation of the DestinationResolver
interface.
Also, the JMS ConnectionFactory
is a required property of the channel, but, by default, the expected bean name would be jmsConnectionFactory
.
The following example provides both a custom instance for resolution of the JMS destination names and a different name for the ConnectionFactory
:
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
For the <publish-subscribe-channel />
, set the durable
attribute to true
for a durable subscription or subscription-shared
for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2).
Use subscription
to name the subscription.
Using JMS Message Selectors
With JMS message selectors, you can filter JMS Messages based on JMS headers as well as JMS properties.
For example, if you want to listen to messages whose custom JMS header property, myHeaderProperty
, equals something
, you can specify the following expression:
myHeaderProperty = 'something'
Message selector expressions are a subset of the SQL-92 conditional expression syntax and are defined as part of the Java Message Service specification.
You can specify the JMS message selector
attribute by using XML namespace configuration for the following Spring Integration JMS components:
-
JMS Channel
-
JMS Publish Subscribe Channel
-
JMS Inbound Channel Adapter
-
JMS Inbound Gateway
-
JMS Message-driven Channel Adapter
You cannot reference message body values by using JMS Message selectors. |
JMS Samples
To experiment with these JMS adapters, check out the JMS samples available in the Spring Integration Samples Git repository at https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms.
That repository includes two samples. One provides inbound and outbound channel adapters, and the other provides inbound and outbound gateways. They are configured to run with an embedded ActiveMQ process, but you can modify the common.xml Spring application context file of each sample to support either a different JMS provider or a standalone ActiveMQ process.
In other words, you can split the configuration so that the inbound and outbound adapters run in separate JVMs.
If you have ActiveMQ installed, modify the brokerURL
property within the common.xml
file to use tcp://localhost:61616
(instead of vm://localhost
).
Both of the samples accept input from stdin and echo back to stdout.
Look at the configuration to see how these messages are routed over JMS.