JMS Support

Spring Integration provides channel adapters for receiving and sending JMS messages.

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>6.4.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.4.1"

The jakarta.jms:jakarta.jms-api must be added explicitly via some JMS vendor-specific implementation, e.g. Apache ActiveMQ.

There are actually two JMS-based inbound Channel Adapters. The first uses Spring’s JmsTemplate to receive based on a polling period. The second is “message-driven” and relies on a Spring MessageListener container. The outbound channel adapter uses the JmsTemplate to convert and send a JMS message on demand.

By using JmsTemplate and the MessageListener container, Spring Integration relies on Spring’s JMS support. This is important to understand, since most of the attributes exposed on these adapters configure the underlying JmsTemplate and MessageListener container. For more details about JmsTemplate and the MessageListener container, see the Spring JMS documentation.

Whereas the JMS channel adapters are intended for unidirectional messaging (send-only or receive-only), Spring Integration also provides inbound and outbound JMS Gateways for request and reply operations. The inbound gateway relies on one of Spring’s MessageListener container implementations for message-driven reception. It is also capable of sending a return value to the reply-to destination, as provided by the received message. The outbound gateway sends a JMS message to a request-destination (or request-destination-name or request-destination-expression) and then receives a reply message. You can explicitly configure the reply-destination reference (or reply-destination-name or reply-destination-expression). Otherwise, the outbound gateway uses a JMS TemporaryQueue.

Prior to Spring Integration 2.2, if necessary, a TemporaryQueue was created (and removed) for each request or reply. Beginning with Spring Integration 2.2, you can configure the outbound gateway to use a MessageListener container to receive replies instead of directly using a new (or cached) Consumer to receive the reply for each request. When so configured, and no explicit reply destination is provided, a single TemporaryQueue is used for each gateway instead of one for each request.

Starting with version 6.0, the outbound gateway creates a TemporaryTopic instead of TemporaryQueue if replyPubSubDomain option is set to true. Some JMS vendors handle these destinations differently.

Inbound Channel Adapter

The inbound channel adapter requires a reference to either a single JmsTemplate instance or both a ConnectionFactory and a Destination (you can provide a 'destinationName' in place of the 'destination' reference). The following example defines an inbound channel adapter with a Destination reference:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
Notice from the preceding configuration that the inbound-channel-adapter is a polling consumer. That means that it invokes receive() when triggered. You should use this should only in situations where polling is done relatively infrequently and timeliness is not important. For all other situations (a vast majority of JMS-based use-cases), the message-driven-channel-adapter (described later) is a better option.
By default, all JMS adapters that require a reference to the ConnectionFactory automatically look for a bean named jmsConnectionFactory. That is why you do not see a connection-factory attribute in many of the examples. However, if your JMS ConnectionFactory has a different bean name, you need to provide that attribute.

If extract-payload is set to true (the default), the received JMS Message is passed through the MessageConverter. When relying on the default SimpleMessageConverter, this means that the resulting Spring Integration Message has the JMS message’s body as its payload. A JMS TextMessage produces a string-based payload, a JMS BytesMessage produces a byte array payload, and the serializable instance of a JMS ObjectMessage becomes the Spring Integration message’s payload. If you prefer to have the raw JMS message as the Spring Integration message’s payload, set the extractPayload option to false.

Starting with version 5.0.8, a default value of the receive-timeout is -1 (no wait) for the org.springframework.jms.connection.CachingConnectionFactory and cacheConsumers, otherwise it is 1 second. The JMS Inbound Channel Adapter crates a DynamicJmsTemplate based on the provided ConnectionFactory and options. If an external JmsTemplate is required (e.g. in Spring Boot environment), or ConnectionFactory is not caching, or no cacheConsumers, it is recommended to set jmsTemplate.receiveTimeout(-1) if a non-blocking consumption is expected:

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

Transactions

Starting with version 4.0, the inbound channel adapter supports the session-transacted attribute. In earlier versions, you had to inject a JmsTemplate with sessionTransacted set to true. (The adapter did let you set the acknowledge attribute to transacted, but this was incorrect and did not work).

Note, however, that setting session-transacted to true has little value, because the transaction is committed immediately after the receive() operation and before the message is sent to the channel.

If you want the entire flow to be transactional (for example, if there is a downstream outbound channel adapter), you must use a transactional poller with a JmsTransactionManager. Alternatively, consider using a jms-message-driven-channel-adapter with acknowledge set to transacted (the default).

Message Driven Channel Adapter

The message-driven-channel-adapter requires a reference to either an instance of a Spring MessageListener container (any subclass of AbstractMessageListenerContainer) or both ConnectionFactory and Destination (a 'destinationName' can be provided in place of the 'destination' reference). The following example defines a message-driven channel adapter with a Destination reference:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

The message-driven adapter also accepts several properties that pertain to the MessageListener container. These values are considered only if you do not provide a container reference. In that case, an instance of DefaultMessageListenerContainer is created and configured based on these properties. For example, you can specify the transaction-manager reference, the concurrent-consumers value, and several other property references and values. See the Javadoc and Spring Integration’s JMS schema (spring-integration-jms.xsd) for more details.

If you have a custom listener container implementation (usually a subclass of DefaultMessageListenerContainer), you can either provide a reference to an instance of it by using the container attribute or provide its fully qualified class name by using the container-class attribute. In that case, the attributes on the adapter are transferred to an instance of your custom container.

You can’t use the Spring JMS namespace element <jms:listener-container/> to configure a container reference for the <int-jms:message-driven-channel-adapter> since that element doesn’t actually reference a container. Each <jms:listener/> sub-element gets its own DefaultMessageListenerContainer (with shared attributes defined on the parent <jms:listener-container/> element). You can give each listener sub-element an id, and use that to inject into the channel adapter, however, the <jms:/> namespace requires a real listener.

It is recommended to configure a regular <bean> for the DefaultMessageListenerContainer and use it as a reference in the channel adapter.

Starting with version 4.2, the default acknowledge mode is transacted, unless you provide an external container. In that case, you should configure the container as needed. We recommend using transacted with the DefaultMessageListenerContainer to avoid message loss.

The 'extract-payload' property has the same effect, and its default value is 'true'. The poller element is not applicable for a message-driven channel adapter, as it is actively invoked. For most scenarios, the message-driven approach is better, since the messages are passed along to the MessageChannel as soon as they are received from the underlying JMS consumer.

Finally, the <message-driven-channel-adapter> element also accepts the 'error-channel' attribute. This provides the same basic functionality, as described in Enter the GatewayProxyFactoryBean. The following example shows how to set an error channel on a message-driven channel adapter:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

When comparing the preceding example to the generic gateway configuration or the JMS 'inbound-gateway' that we discuss later, the key difference is that we are in a one-way flow, since this is a 'channel-adapter', not a gateway. Therefore, the flow downstream from the 'error-channel' should also be one-way. For example, it could send to a logging handler, or it could connect to a different JMS <outbound-channel-adapter> element.

When consuming from topics, set the pub-sub-domain attribute to true. Set subscription-durable to true for a durable subscription or subscription-shared for a shared subscription (which requires a JMS 2.0 broker and has been available since version 4.2). Use subscription-name to name the subscription.

Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers. Previously, the connection and consumers remained open. To revert to the previous behavior, set the shutdownContainerOnStop on the JmsMessageDrivenEndpoint to false.

Starting with version 6.3, the ChannelPublishingJmsMessageListener can now be supplied with a RetryTemplate and RecoveryCallback<Message<?>> for retries on the downstream send and send-and-receive operations. These options are also exposed into a JmsMessageDrivenChannelAdapterSpec for Java DSL.

Inbound Conversion Errors

Starting with version 4.2, the 'error-channel' is used for the conversion errors, too. Previously, if a JMS <message-driven-channel-adapter/> or <inbound-gateway/> could not deliver a message due to a conversion error, an exception would be thrown back to the container. If the container is configured to use transactions, the message is rolled back and redelivered repeatedly. The conversion process occurs before and during message construction so that such errors are not sent to the 'error-channel'. Now such conversion exceptions result in an ErrorMessage being sent to the 'error-channel', with the exception as the payload. If you wish the transaction to roll back, and you have an 'error-channel' defined, the integration flow on the 'error-channel' must re-throw the exception (or another exception). If the error flow does not throw an exception, the transaction is committed and the message is removed. If no 'error-channel' is defined, the exception is thrown back to the container, as before.

Outbound Channel Adapter

The JmsSendingMessageHandler implements the MessageHandler interface and is capable of converting Spring Integration Messages to JMS messages and then sending to a JMS destination. It requires either a jmsTemplate reference or both jmsConnectionFactory and destination references (destinationName may be provided in place of destination). As with the inbound channel adapter, the easiest way to configure this adapter is with the namespace support. The following configuration produces an adapter that receives Spring Integration messages from the exampleChannel, converts those into JMS messages, and sends them to the JMS destination reference whose bean name is outQueue:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlow.from("exampleChannel")
                .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
        integrationFlow("exampleChannel") {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
@Bean
jmsOutboundFlow() {
    integrationFlow('exampleChannel') {
        handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
                .with {
                    destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
                    deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
                    timeToLiveExpression '10000'
                    configureJmsTemplate {
                        it.explicitQosEnabled true
                    }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

As with the inbound channel adapters, there is an 'extract-payload' property. However, the meaning is reversed for the outbound adapter. Rather than applying to the JMS message, the boolean property applies to the Spring Integration message payload. In other words, the decision is whether to pass the Spring Integration message itself as the JMS message body or to pass the Spring Integration message payload as the JMS message body. The default value is 'true'. Therefore, if you pass a Spring Integration message whose payload is a String, a JMS TextMessage is created. If, on the other hand, you want to send the actual Spring Integration message to another system over JMS, set it to 'false'.

Regardless of the boolean value for payload extraction, the Spring Integration MessageHeaders map to JMS properties, as long as you rely on the default converter or provide a reference to another instance of MessageConverter. (The same holds true for 'inbound' adapters, except that, in those cases, the JMS properties map to Spring Integration MessageHeaders).

Starting with version 5.1, the <int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) can be configured with the deliveryModeExpression and timeToLiveExpression properties to evaluate an appropriate QoS values for JMS message to send at runtime against request Spring Message. The new setMapInboundDeliveryMode(true) and setMapInboundExpiration(true) options of the DefaultJmsHeaderMapper may facilitate as a source of the information for the dynamic deliveryMode and timeToLive from message headers:

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

Transactions

Starting with version 4.0, the outbound channel adapter supports the session-transacted attribute. In earlier versions, you had to inject a JmsTemplate with sessionTransacted set to true. The attribute now sets the property on the built-in default JmsTemplate. If a transaction exists (perhaps from an upstream message-driven-channel-adapter), the send operation is performed within the same transaction. Otherwise, a new transaction is started.

Inbound Gateway

Spring Integration’s message-driven JMS inbound-gateway delegates to a MessageListener container, supports dynamically adjusting concurrent consumers, and can also handle replies. The inbound gateway requires references to a ConnectionFactory and a request Destination (or 'requestDestinationName'). The following example defines a JMS inbound-gateway that receives from the JMS queue referenced by the bean id, inQueue, and sends to the Spring Integration channel named exampleChannel:

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

Since the gateways provide request-reply behavior instead of unidirectional send or receive behavior, they also have two distinct properties for “payload extraction” (as discussed earlier for the channel adapters' 'extract-payload' setting). For an inbound gateway, the 'extract-request-payload' property determines whether the received JMS Message body is extracted. If 'false', the JMS message itself becomes the Spring Integration message payload. The default is 'true'.

Similarly, for an inbound-gateway, the 'extract-reply-payload' property applies to the Spring Integration message that is to be converted into a reply JMS Message. If you want to pass the whole Spring Integration message (as the body of a JMS ObjectMessage), set value this to 'false'. By default, it is also 'true' that the Spring Integration message payload is converted into a JMS Message (for example, a String payload becomes a JMS TextMessage).

As with anything else, gateway invocation might result in error. By default, a producer is not notified of the errors that might have occurred on the consumer side and times out waiting for the reply. However, there might be times when you want to communicate an error condition back to the consumer (in other words, you might want to treat the exception as a valid reply by mapping it to a message). To accomplish this, JMS inbound gateway provides support for a message channel to which errors can be sent for processing, potentially resulting in a reply message payload that conforms to some contract that defines what a caller may expect as an “error” reply. You can use the error-channel attribute to configure such a channel, as the following example shows:

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

You might notice that this example looks very similar to that included within Enter the GatewayProxyFactoryBean. The same idea applies here: The exceptionTransformer could be a POJO that creates error-response objects, you could reference the nullChannel to suppress the errors, or you could leave 'error-channel' out to let the exception propagate.

When consuming from topics, set the pub-sub-domain attribute to true. Set subscription-durable to true for a durable subscription or subscription-shared for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2). Use subscription-name to name the subscription.

Starting with version 4.2, the default acknowledge mode is transacted, unless an external container is provided. In that case, you should configure the container as needed. We recommend that you use transacted with the DefaultMessageListenerContainer to avoid message loss.

Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers. Previously, the connection and consumers remained open. To revert to the previous behavior, set the shutdownContainerOnStop on the JmsInboundGateway to false.

By default, the JmsInboundGateway looks for a jakarta.jms.Message.getJMSReplyTo() property in the received message to determine where to send a reply. Otherwise, it can be configured with a static defaultReplyDestination, or defaultReplyQueueName or defaultReplyTopicName. In addition, starting with version 6.1, a replyToExpression can be configured on a provided ChannelPublishingJmsMessageListener to determine the reply destination dynamically, if the standard JMSReplyTo property is null on the request. The received jakarta.jms.Message is used the root evaluation context object. The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message:

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundGateway(connectionFactory)
                            .requestDestination("requestDestination")
                            .replyToFunction(message -> message.getStringProperty("myReplyTo")))
            .<String, String>transform(String::toUpperCase)
            .get();
}

Starting with version 6.3, the Jms.inboundGateway() API exposes a retryTemplate() and recoveryCallback() options for retrying internal send-and-receive operations.

Outbound Gateway

The outbound gateway creates JMS messages from Spring Integration messages and sends them to a request-destination. It then handles the JMS reply message either by using a selector to receive from the reply-destination that you configure or, if no reply-destination is provided, by creating JMS TemporaryQueue (or TemporaryTopic if replyPubSubDomain= true) instances.

Using a reply-destination (or reply-destination-name) together with a CachingConnectionFactory that has cacheConsumers set to true can cause out-of-memory conditions. This is because each request gets a new consumer with a new selector (selecting on the correlation-key value or when there is no correlation-key, on the sent JMSMessageID). Given that these selectors are unique, they remain in the cache (unused) after the current request completes.

If you specify a reply destination, you are advised to not use cached consumers. Alternatively, consider using a <reply-listener/> as described below.

The following example shows how to configure an outbound gateway:

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

The 'outbound-gateway' payload extraction properties are inversely related to those of the 'inbound-gateway' (see the earlier discussion). That means that the 'extract-request-payload' property value applies to the Spring Integration message being converted into a JMS message to be sent as a request. The 'extract-reply-payload' property value applies to the JMS message received as a reply and is then converted into a Spring Integration message to be subsequently sent to the 'reply-channel', as shown in the preceding configuration example.

Using a <reply-listener/>

Spring Integration 2.2 introduced an alternative technique for handling replies. If you add a <reply-listener/> child element to the gateway instead of creating a consumer for each reply, a MessageListener container is used to receive the replies and hand them over to the requesting thread. This provides a number of performance benefits as well as alleviating the cached consumer memory utilization problem described in the earlier caution.

When using a <reply-listener/> with an outbound gateway that has no reply-destination, instead of creating a TemporaryQueue for each request, a single TemporaryQueue is used. (The gateway creates an additional TemporaryQueue, as necessary, if the connection to the broker is lost and recovered). If replyPubSubDomain is set to true, starting with version 6.0, a TemporaryTopic is created instead.

When using a correlation-key, multiple gateways can share the same reply destination, because the listener container uses a selector that is unique to each gateway.

If you specify a reply listener and specify a reply destination (or reply destination name) but provide no correlation key, the gateway logs a warning and falls back to pre-version 2.2 behavior. This is because there is no way to configure a selector in this case. Thus, there is no way to avoid a reply going to a different gateway that might be configured with the same reply destination.

Note that, in this situation, a new consumer is used for each request, and consumers can build up in memory as described in the caution above; therefore cached consumers should not be used in this case.

The following example shows a reply listener with default attributes:

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

The listener is very lightweight, and we anticipate that, in most cases, you need only a single consumer. However, you can add attributes such as concurrent-consumers, max-concurrent-consumers, and others. See the schema for a complete list of supported attributes, together with the Spring JMS documentation for their meanings.

Idle Reply Listeners

Starting with version 4.2, you can start the reply listener as needed (and stop it after an idle time) instead of running for the duration of the gateway’s lifecycle. This can be useful if you have many gateways in the application context where they are mostly idle. One such situation is a context with many (inactive) partitioned Spring Batch jobs using Spring Integration and JMS for partition distribution. If all the reply listeners are active, the JMS broker has an active consumer for each gateway. By enabling the idle timeout, each consumer exists only while the corresponding batch job is running (and for a short time after it finishes).

See idle-reply-listener-timeout in Attribute Reference.

Gateway Reply Correlation

This section describes the mechanisms used for reply correlation (ensuring the originating gateway receives replies to only its requests), depending on how the gateway is configured. See Attribute Reference for complete description of the attributes discussed here.

The following list describes the various scenarios (the numbers are for identification — order does not matter):

  1. No reply-destination* properties and no <reply-listener>

    A TemporaryQueue is created for each request and deleted when the request is complete (successfully or otherwise). correlation-key is irrelevant.

  2. A reply-destination* property is provided and neither a <reply-listener/> nor a correlation-key is provided

    The JMSCorrelationID equal to the outgoing message is used as a message selector for the consumer:

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    The responding system is expected to return the inbound JMSMessageID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs.

    When you use this configuration, you should not use a topic for replies. The reply may be lost.
  3. A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="JMSCorrelationID"

    The gateway generates a unique correlation IS and inserts it in the JMSCorrelationID header. The message selector is:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs.

  4. A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="myCorrelationHeader"

    The gateway generates a unique correlation ID and inserts it in the myCorrelationHeader message property. The correlation-key can be any user-defined value. The message selector is:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    The responding system is expected to return the inbound myCorrelationHeader in the reply myCorrelationHeader.

  5. A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="JMSCorrelationID*" (Note the * in the correlation key.)

    The gateway uses the value in the jms_correlationId header (if present) from the request message and inserts it in the JMSCorrelationID header. The message selector is:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    The user must ensure this value is unique.

    If the header does not exist, the gateway behaves as in 3.

    The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs.

  6. No reply-destination* properties is provided, and a <reply-listener> is provided

    A temporary queue is created and used for all replies from this gateway instance. No correlation data is needed in the message, but the outgoing JMSMessageID is used internally in the gateway to direct the reply to the correct requesting thread.

  7. A reply-destination* property is provided, a <reply-listener> is provided, and no correlation-key is provided

    Not allowed.

    The <reply-listener/> configuration is ignored, and the gateway behaves as in 2. A warning log message is written to indicate this situation.

  8. A reply-destination* property is provided, a <reply-listener> is provided, and correlation-key="JMSCorrelationID"

    The gateway has a unique correlation ID and inserts it, together with an incrementing value in the JMSCorrelationID header (gatewayId + "_" + ++seq). The message selector is:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs. Since each gateway has a unique ID, each instance gets only its own replies. The complete correlation data is used to route the reply to the correct requesting thread.

  9. A reply-destination* property is provided a <reply-listener/> is provided, and correlation-key="myCorrelationHeader"

    The gateway has a unique correlation ID and inserts it, together with an incrementing value in the myCorrelationHeader property (gatewayId + "_" + ++seq). The correlation-key can be any user-defined value. The message selector is:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    The responding system is expected to return the inbound myCorrelationHeader in the reply myCorrelationHeader. Since each gateway has a unique ID, each instance only gets its own replies. The complete correlation data is used to route the reply to the correct requesting thread.

  10. A reply-destination* property is provided, a <reply-listener/> is provided, and correlation-key="JMSCorrelationID*"

    (Note the * in the correlation key)

    Not allowed.

    User-supplied correlation IDs are not permitted with a reply listener. The gateway does not initialize with this configuration.

Async Gateway

Starting with version 4.3, you can now specify async="true" (or setAsync(true) in Java) when configuring the outbound gateway.

By default, when a request is sent to the gateway, the requesting thread is suspended until the reply is received. The flow then continues on that thread. If async is true, the requesting thread is released immediately after the send() completes, and the reply is returned (and the flow continues) on the listener container thread. This can be useful when the gateway is invoked on a poller thread. The thread is released and is available for other tasks within the framework.

Thee async requires a <reply-listener/> (or setUseReplyContainer(true) when using Java configuration). It also requires a correlationKey (usually JMSCorrelationID) to be specified. If either of these conditions are not met, async is ignored.

Attribute Reference

The following listing shows all the available attributes for an outbound-gateway:

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 Reference to a jakarta.jms.ConnectionFactory. The default jmsConnectionFactory.
2 The name of a property that contains correlation data to correlate responses with replies. If omitted, the gateway expects the responding system to return the value of the outbound JMSMessageID header in the JMSCorrelationID header. If specified, the gateway generates a correlation ID and populates the specified property with it. The responding system must echo back that value in the same property. It can be set to JMSCorrelationID, in which case the standard header is used instead of a String property to hold the correlation data. When you use a <reply-container/>, you must specify the correlation-key if you provide an explicit reply-destination. Starting with version 4.0.1, this attribute also supports the value JMSCorrelationID*, which means that if the outbound message already has a JMSCorrelationID (mapped from the jms_correlationId) header, it is used instead of generating a new one. Note, the JMSCorrelationID* key is not allowed when you use a <reply-container/>, because the container needs to set up a message selector during initialization.
You should understand that the gateway has no way to ensure uniqueness, and unexpected side effects can occur if the provided correlation ID is not unique.
3 A boolean value indicating whether the delivery mode should be DeliveryMode.PERSISTENT (true) or DeliveryMode.NON_PERSISTENT (false). This setting takes effect only if explicit-qos-enabled is true.
4 A DestinationResolver. The default is a DynamicDestinationResolver, which maps the destination name to a queue or topic of that name.
5 When set to true, it enables the use of quality of service attributes: priority, delivery-mode, and time-to-live.
6 When set to true (the default), the payload of the Spring Integration reply message is created from the JMS Reply message’s body (by using the MessageConverter). When set to false, the entire JMS message becomes the payload of the Spring Integration message.
7 When set to true (the default), the payload of the Spring Integration message is converted to a JMSMessage (by using the MessageConverter). When set to false, the entire Spring Integration Message is converted to the JMSMessage. In both cases, the Spring Integration message headers are mapped to JMS headers and properties by using the HeaderMapper.
8 A HeaderMapper used to map Spring Integration message headers to and from JMS message headers and properties.
9 A reference to a MessageConverter for converting between JMS messages and the Spring Integration message payloads (or messages if extract-request-payload is false). The default is a SimpleMessageConverter.
10 The default priority of request messages. Overridden by the message priority header, if present. Its range is 0 to 9. This setting takes effect only if explicit-qos-enabled is true.
11 The time (in milliseconds) to wait for a reply. The default is 5000 (five seconds).
12 The channel to which the reply message is sent.
13 A reference to a Destination, which is set as the JMSReplyTo header. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
14 A SpEL expression evaluating to a Destination, which will be set as the JMSReplyTo header. The expression can result in a Destination object or a String. It is used by the DestinationResolver to resolve the actual Destination. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
15 The name of the destination that is set as the JMSReplyTo header. It is used by the DestinationResolver to resolve the actual Destination. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
16 When set to true, it indicates that any reply Destination resolved by the DestinationResolver should be a Topic rather then a Queue.
17 The time the gateway waits when sending the reply message to the reply-channel. This only has an effect if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. The default is infinity.
18 The channel on which this gateway receives request messages.
19 A reference to a Destination to which request messages are sent. One of reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
20 A SpEL expression evaluating to a Destination to which request messages are sent. The expression can result in a Destination object or a String. It is used by the DestinationResolver to resolve the actual Destination. One of the reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
21 The name of the destination to which request messages are sent. It is used by the DestinationResolver to resolve the actual Destination. One of reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
22 When set to true, it indicates that any request Destination resolved by the DestinationResolver should be a Topic rather then a Queue.
23 Specifies the message time to live. This setting takes effect only if explicit-qos-enabled is true.
24 Specifies whether this outbound gateway must return a non-null value. By default, this value is true, and a MessageTimeoutException is thrown when the underlying service does not return a value after the receive-timeout. Note that, if the service is never expected to return a reply, it would be better to use a <int-jms:outbound-channel-adapter/> instead of a <int-jms:outbound-gateway/> with requires-reply="false". With the latter, the sending thread is blocked, waiting for a reply for the receive-timeout period.
25 When you use a <reply-listener />, its lifecycle (start and stop) matches that of the gateway by default. When this value is greater than 0, the container is started on demand (when a request is sent). The container continues to run until at least this time elapses with no requests being received (and until no replies are outstanding). The container is started again on the next request. The stop time is a minimum and may actually be up to 1.5x this value.
26 See Async Gateway.
27 When this element is included, replies are received by an asynchronous MessageListenerContainer rather than creating a consumer for each reply. This can be more efficient in many cases.

Mapping Message Headers to and from JMS Message

JMS messages can contain meta-information such as JMS API headers and simple properties. You can map those to and from Spring Integration message headers by using JmsHeaderMapper. The JMS API headers are passed to the appropriate setter methods (such as setJMSReplyTo), whereas other headers are copied to the general properties of the JMS Message. JMS outbound gateway is bootstrapped with the default implementation of JmsHeaderMapper, which will map standard JMS API Headers as well as primitive or String message headers. You could also provide a custom header mapper by using the header-mapper attribute of inbound and outbound gateways.

Many JMS vendor-specific clients don’t allow setting the deliveryMode, priority and timeToLive properties directly on an already created JMS message. They are considered to be QoS properties and therefore have to be propagated to the target MessageProducer.send(message, deliveryMode, priority, timeToLive) API. For this reason the DefaultJmsHeaderMapper doesn’t map appropriate Spring Integration headers (or expression results) into the mentioned JMS message properties. Instead, a DynamicJmsTemplate is used by the JmsSendingMessageHandler to propagate header values from the request message into the MessageProducer.send() API. To enable this feature, you must configure the outbound endpoint with a DynamicJmsTemplate with its explicitQosEnabled property set to true. The Spring Integration Java DSL configures a DynamicJmsTemplate by default but you must still set the explicitQosEnabled property.
Since version 4.0, the JMSPriority header is mapped to the standard priority header for inbound messages. Previously, the priority header was only used for outbound messages. To revert to the previous behavior (that is, to not map the inbound priority), set the mapInboundPriority property of DefaultJmsHeaderMapper to false.
Since version 4.3, the DefaultJmsHeaderMapper maps the standard correlationId header as a message property by invoking its toString() method (correlationId is often a UUID, which is not supported by JMS). On the inbound side, it is mapped as a String. This is independent of the jms_correlationId header, which is mapped to and from the JMSCorrelationID header. The JMSCorrelationID is generally used to correlate requests and replies, whereas the correlationId is often used to combine related messages into a group (such as with an aggregator or a resequencer).

Starting with version 5.1, the DefaultJmsHeaderMapper can be configured for mapping inbound JMSDeliveryMode and JMSExpiration properties:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

These JMS properties are mapped to the JmsHeaders.DELIVERY_MODE and JmsHeaders.EXPIRATION Spring Message headers respectively.

Message Conversion, Marshalling, and Unmarshalling

If you need to convert the message, all JMS adapters and gateways let you provide a MessageConverter by setting the message-converter attribute. To do so, provide the bean name of an instance of MessageConverter that is available within the same ApplicationContext. Also, to provide some consistency with marshaller and unmarshaller interfaces, Spring provides MarshallingMessageConverter, which you can configure with your own custom marshallers and unmarshallers. The following example shows how to do so

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
When you provide your own MessageConverter instance, it is still wrapped within the HeaderMappingMessageConverter. This means that the 'extract-request-payload' and 'extract-reply-payload' properties can affect the actual objects passed to your converter. The HeaderMappingMessageConverter itself delegates to a target MessageConverter while also mapping the Spring Integration MessageHeaders to JMS message properties and back again.

JMS-backed Message Channels

The channel adapters and gateways featured earlier are all intended for applications that integrate with other external systems. The inbound options assume that some other system is sending JMS messages to the JMS destination, and the outbound options assume that some other system is receiving from the destination. The other system may or may not be a Spring Integration application. Of course, when sending a Spring Integration message instance as the body of the JMS message itself (with 'extract-payload' value set to false), it is assumed that the other system is based on Spring Integration. However, that is by no means a requirement. That flexibility is one of the benefits of using a message-based integration option with the abstraction of “channels”( or destinations in the case of JMS).

Sometimes, both the producer and consumer for a given JMS Destination are intended to be part of the same application, running within the same process. You can accomplish this by using a pair of inbound and outbound channel adapters. The problem with that approach is that you need two adapters, even though, conceptually, the goal is to have a single message channel. A better option is supported as of Spring Integration version 2.0. Now it is possible to define a single “channel” when using the JMS namespace, as the following example shows:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

The channel in the preceding example behaves much like a normal <channel/> element from the main Spring Integration namespace. It can be referenced by both the input-channel and output-channel attributes of any endpoint. The difference is that this channel is backed by a JMS Queue instance named exampleQueue. This means that asynchronous messaging is possible between the producing and consuming endpoints. However, unlike the simpler asynchronous message channels created by adding a <queue/> element within a non-JMS <channel/> element, the messages are not stored in an in-memory queue. Instead, those messages are passed within a JMS message body, and the full power of the underlying JMS provider is then available for that channel. Probably the most common rationale for using this alternative is to take advantage of the persistence made available by the store-and-forward approach of JMS messaging.

If configured properly, the JMS-backed message channel also supports transactions. In other words, a producer would not actually write to a transactional JMS-backed channel if its send operation is part of a transaction that rolls back. Likewise, a consumer would not physically remove a JMS message from the channel if the reception of that message is part of a transaction that rolls back. Note that the producer and consumer transactions are separate in such a scenario. This is significantly different from the propagation of a transactional context across a simple, synchronous <channel/> element that has no <queue/> child element.

Since the preceding example above references a JMS Queue instance, it acts as a point-to-point channel. If, on the other hand, you need publish-subscribe behavior, you can use a separate element and reference a JMS Topic instead. The following example shows how to do so:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

For either type of JMS-backed channel, the name of the destination may be provided instead of a reference, as the following example shows:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

In the preceding examples, the destination names are resolved by Spring’s default DynamicDestinationResolver implementation, but you could provide any implementation of the DestinationResolver interface. Also, the JMS ConnectionFactory is a required property of the channel, but, by default, the expected bean name would be jmsConnectionFactory. The following example provides both a custom instance for resolution of the JMS destination names and a different name for the ConnectionFactory:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

For the <publish-subscribe-channel />, set the durable attribute to true for a durable subscription or subscription-shared for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2). Use subscription to name the subscription.

Using JMS Message Selectors

With JMS message selectors, you can filter JMS Messages based on JMS headers as well as JMS properties. For example, if you want to listen to messages whose custom JMS header property, myHeaderProperty, equals something, you can specify the following expression:

myHeaderProperty = 'something'

Message selector expressions are a subset of the SQL-92 conditional expression syntax and are defined as part of the Java Message Service specification. You can specify the JMS message selector attribute by using XML namespace configuration for the following Spring Integration JMS components:

  • JMS Channel

  • JMS Publish Subscribe Channel

  • JMS Inbound Channel Adapter

  • JMS Inbound Gateway

  • JMS Message-driven Channel Adapter

You cannot reference message body values by using JMS Message selectors.

JMS Samples

To experiment with these JMS adapters, check out the JMS samples available in the Spring Integration Samples Git repository at https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms.

That repository includes two samples. One provides inbound and outbound channel adapters, and the other provides inbound and outbound gateways. They are configured to run with an embedded ActiveMQ process, but you can modify the common.xml Spring application context file of each sample to support either a different JMS provider or a standalone ActiveMQ process.

In other words, you can split the configuration so that the inbound and outbound adapters run in separate JVMs. If you have ActiveMQ installed, modify the brokerURL property within the common.xml file to use tcp://localhost:61616 (instead of vm://localhost). Both of the samples accept input from stdin and echo back to stdout. Look at the configuration to see how these messages are routed over JMS.