Messaging Gateways
A gateway hides the messaging API provided by Spring Integration. It lets your application’s business logic be unaware of the Spring Integration API. By using a generic Gateway, your code interacts with only a simple interface.
Enter the GatewayProxyFactoryBean
As mentioned earlier, it would be great to have no dependency on the Spring Integration API — including the gateway class.
For that reason, Spring Integration provides the GatewayProxyFactoryBean
, which generates a proxy for any interface and internally invokes the gateway methods shown below.
By using dependency injection, you can then expose the interface to your business methods.
The following example shows an interface that can be used to interact with Spring Integration:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML Namespace Support
Namespace support is also provided. It lets you configure an interface as a service, as the following example shows:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
With this configuration defined, the cafeService
can now be injected into other beans, and the code that invokes the methods on that proxied instance of the Cafe
interface has no awareness of the Spring Integration API.
The general approach is similar to that of Spring Remoting (RMI, HttpInvoker, and so on).
See the “Samples” Appendix for an example that uses the gateway
element (in the Cafe demo).
The defaults in the preceding configuration are applied to all methods on the gateway interface. If a reply timeout is not specified, the calling thread waits indefinitely for a reply. See Gateway Behavior When No response Arrives.
The defaults can be overridden for individual methods. See Gateway Configuration with Annotations and XML.
Setting the Default Reply Channel
Typically, you need not specify the default-reply-channel
, since a Gateway auto-creates a temporary, anonymous reply channel, where it listens for the reply.
However, some cases may prompt you to define a default-reply-channel
(or reply-channel
with adapter gateways, such as HTTP, JMS, and others).
For some background, we briefly discuss some of the inner workings of the gateway.
A gateway creates a temporary point-to-point reply channel.
It is anonymous and is added to the message headers with the name, replyChannel
.
When providing an explicit default-reply-channel
(reply-channel
with remote adapter gateways), you can point to a publish-subscribe channel, which is so named because you can add more than one subscriber to it.
Internally, Spring Integration creates a bridge between the temporary replyChannel
and the explicitly defined default-reply-channel
.
Suppose you want your reply to go not only to the gateway but also to some other consumer. In this case, you want two things:
-
A named channel to which you can subscribe
-
That channel to be a publish-subscribe-channel
The default strategy used by the gateway does not satisfy those needs, because the reply channel added to the header is anonymous and point-to-point.
This means that no other subscriber can get a handle to it and, even if it could, the channel has point-to-point behavior such that only one subscriber would get the message.
By defining a default-reply-channel
you can point to a channel of your choosing.
In this case, that is a publish-subscribe-channel
.
The gateway creates a bridge from it to the temporary, anonymous reply channel that is stored in the header.
You might also want to explicitly provide a reply channel for monitoring or auditing through an interceptor (for example, wiretap). To configure a channel interceptor, you need a named channel.
Gateway Configuration with Annotations and XML
Consider the following example, which expands on the previous Cafe
interface example by adding a @Gateway
annotation:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
The @Header
annotation lets you add values that are interpreted as message headers, as the following example shows:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
If you prefer the XML approach to configuring gateway methods, you can add method
elements to the gateway configuration, as the following example shows:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
You can also use XML to provide individual headers for each method invocation.
This could be useful if the headers you want to set are static in nature and you do not want to embed them in the gateway’s method signature by using @Header
annotations.
For example, in the loan broker example, we want to influence how aggregation of the loan quotes is done, based on what type of request was initiated (single quote or all quotes).
Determining the type of the request by evaluating which gateway method was invoked, although possible, would violate the separation of concerns paradigm (the method is a Java artifact).
However, expressing your intention (meta information) in message headers is natural in a messaging architecture.
The following example shows how to add a different message header for each of two methods:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
In the preceding example a different value is set for the 'RESPONSE_TYPE' header, based on the gateway’s method.
Expressions and “Global” Headers
The <header/>
element supports expression
as an alternative to value
.
The SpEL expression is evaluated to determine the value of the header.
There is no #root
object, but the following variables are available:
-
#args: An
Object[]
containing the method arguments -
#gatewayMethod: The object (derived from
java.reflect.Method
) that represents the method in theservice-interface
that was invoked. A header containing this variable can be used later in the flow (for example, for routing). For example, if you wish to route on the simple method name, you might add a header with the following expression:#gatewayMethod.name
.
The java.reflect.Method is not serializable.
A header with an expression of #gatewayMethod is lost if you later serialize the message.
Consequently, you may wish to use #gatewayMethod.name or #gatewayMethod.toString() in those cases.
The toString() method provides a String representation of the method, including parameter and return types.
|
Since version 3.0, <default-header/>
elements can be defined to add headers to all the messages produced by the gateway, regardless of the method invoked.
Specific headers defined for a method take precedence over default headers.
Specific headers defined for a method here override any @Header
annotations in the service interface.
However, default headers do NOT override any @Header
annotations in the service interface.
The gateway now also supports a default-payload-expression
, which is applied for all methods (unless overridden).
Mapping Method Arguments to a Message
Using the configuration techniques in the previous section allows control of how method arguments are mapped to message elements (payload and headers). When no explicit configuration is used, certain conventions are used to perform the mapping. In some cases, these conventions cannot determine which argument is the payload and which should be mapped to headers. Consider the following example:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
In the first case, the convention is to map the first argument to the payload (as long as it is not a Map
) and the contents of the second argument become headers.
In the second case (or the first when the argument for parameter thing1
is a Map
), the framework cannot determine which argument should be the payload.
Consequently, mapping fails.
This can generally be resolved using a payload-expression
, a @Payload
annotation, or a @Headers
annotation.
Alternatively (and whenever the conventions break down), you can take the entire responsibility for mapping the method calls to messages.
To do so, implement an MethodArgsMessageMapper
and provide it to the <gateway/>
by using the mapper
attribute.
The mapper maps a MethodArgsHolder
, which is a simple class that wraps the java.reflect.Method
instance and an Object[]
containing the arguments.
When providing a custom mapper, the default-payload-expression
attribute and <default-header/>
elements are not allowed on the gateway.
Similarly, the payload-expression
attribute and <header/>
elements are not allowed on any <method/>
elements.
Mapping Method Arguments
The following examples show how method arguments can be mapped to the message and shows some examples of invalid configuration:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("#args[0] + #args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(#args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | Note that, in this example, the SpEL variable, #this , refers to the argument — in this case, the value of s . |
The XML equivalent looks a little different, since there is no #this
context for the method argument.
However, expressions can refer to method arguments by using the #args
variable, as the following example shows:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="#args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(#args[0])"/>
<int:method name="send3" payload-expression="#method"/>
<int:method name="send4">
<int:header name="thing1" expression="#args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
Annotation
Starting with version 4.0, gateway service interfaces can be marked with a @MessagingGateway
annotation instead of requiring the definition of a <gateway />
xml element for configuration.
The following pair of examples compares the two approaches for configuring the same gateway:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
Similarly to the XML version, when Spring Integration discovers these annotations during a component scan, it creates the proxy implementation with its messaging infrastructure.
To perform this scan and register the BeanDefinition in the application context, add the @IntegrationComponentScan annotation to a @Configuration class.
The standard @ComponentScan infrastructure does not deal with interfaces.
Consequently, we introduced the custom @IntegrationComponentScan logic to fine the @MessagingGateway annotation on the interfaces and register GatewayProxyFactoryBean instances for them.
See also Annotation Support.
|
Along with the @MessagingGateway
annotation you can mark a service interface with the @Profile
annotation to avoid the bean creation, if such a profile is not active.
If you have no XML configuration, the @EnableIntegration annotation is required on at least one @Configuration class.
See Configuration and @EnableIntegration for more information.
|
Invoking No-Argument Methods
When invoking methods on a Gateway interface that do not have any arguments, the default behavior is to receive a Message
from a PollableChannel
.
Sometimes, however, you may want to trigger no-argument methods so that you can interact with other components downstream that do not require user-provided parameters, such as triggering no-argument SQL calls or stored procedures.
To achieve send-and-receive semantics, you must provide a payload.
To generate a payload, method parameters on the interface are not necessary.
You can either use the @Payload
annotation or the payload-expression
attribute in XML on the method
element.
The following list includes a few examples of what the payloads could be:
-
a literal string
-
#gatewayMethod.name
-
new java.util.Date()
-
@someBean.someMethod()'s return value
The following example shows how to use the @Payload
annotation:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
If a method has no argument and no return value but does contain a payload expression, it is treated as a send-only operation.
Error Handling
The gateway invocation can result in errors. By default, any error that occurs downstream is re-thrown “as is” upon the gateway’s method invocation. For example, consider the following simple flow:
gateway -> service-activator
If the service invoked by the service activator throws a MyException
(for example), the framework wraps it in a MessagingException
and attaches the message passed to the service activator in the failedMessage
property.
Consequently, any logging performed by the framework has full the context of the failure.
By default, when the exception is caught by the gateway, the MyException
is unwrapped and thrown to the caller.
You can configure a throws
clause on the gateway method declaration to match the particular exception type in the cause chain.
For example, if you want to catch a whole MessagingException
with all the messaging information of the reason of downstream error, you should have a gateway method similar to the following:
public interface MyGateway {
void performProcess() throws MessagingException;
}
Since we encourage POJO programming, you may not want to expose the caller to messaging infrastructure.
If your gateway method does not have a throws
clause, the gateway traverses the cause tree, looking for a RuntimeException
that is not a MessagingException
.
If none is found, the framework throws the MessagingException
.
If the MyException
in the preceding discussion has a cause of SomeOtherException
and your method throws SomeOtherException
, the gateway further unwraps that and throws it to the caller.
When a gateway is declared with no service-interface
, an internal framework interface RequestReplyExchanger
is used.
Consider the following example:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
Before version 5.0, this exchange
method did not have a throws
clause and, as a result, the exception was unwrapped.
If you use this interface and want to restore the previous unwrap behavior, use a custom service-interface
instead or access the cause
of the MessagingException
yourself.
However, you may want to log the error rather than propagating it or you may want to treat an exception as a valid reply (by mapping it to a message that conforms to some "error message" contract that the caller understands).
To accomplish this, the gateway provides support for a message channel dedicated to the errors by including support for the error-channel
attribute.
In the following example, a 'transformer' creates a reply Message
from the Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
The exceptionTransformer
could be a simple POJO that knows how to create the expected error response objects.
That becomes the payload that is sent back to the caller.
You could do many more elaborate things in such an “error flow”, if necessary.
It might involve routers (including Spring Integration’s ErrorMessageExceptionTypeRouter
), filters, and so on.
Most of the time, a simple 'transformer' should be sufficient, however.
Alternatively, you might want to only log the exception (or send it somewhere asynchronously).
If you provide a one-way flow, nothing would be sent back to the caller.
If you want to completely suppress exceptions, you can provide a reference to the global nullChannel
(essentially a /dev/null
approach).
Finally, as mentioned above, if no error-channel
is defined, then the exceptions propagate as usual.
When you use the @MessagingGateway
annotation (see
), you can use use the @MessagingGateway
AnnotationerrorChannel
attribute.
Starting with version 5.0, when you use a gateway method with a void
return type (one-way flow), the error-channel
reference (if provided) is populated in the standard errorChannel
header of each sent message.
This feature allows a downstream asynchronous flow, based on the standard ExecutorChannel
configuration (or a QueueChannel
), to override a default global errorChannel
exceptions sending behavior.
Previously you had to manually specify an errorChannel
header with the @GatewayHeader
annotation or the <header>
element.
The error-channel
property was ignored for void
methods with an asynchronous flow.
Instead, error messages were sent to the default errorChannel
.
Exposing the messaging system through simple POJI Gateways provides benefits, but “hiding” the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
We want our Java method to return as quickly as possible and not hang for an indefinite amount of time while the caller is waiting on it to return (whether void, a return value, or a thrown Exception).
When regular methods are used as a proxies in front of the messaging system, we have to take into account the potentially asynchronous nature of the underlying messaging.
This means that there might be a chance that a message that was initiated by a gateway could be dropped by a filter and never reach a component that is responsible for producing a reply.
Some service activator method might result in an exception, thus providing no reply (as we do not generate null messages).
In other words, multiple scenarios can cause a reply message to never come.
That is perfectly natural in messaging systems.
However, think about the implication on the gateway method. The gateway’s method input arguments were incorporated into a message and sent downstream.
The reply message would be converted to a return value of the gateway’s method.
So you might want to ensure that, for each gateway call, there is always a reply message.
Otherwise, your gateway method might never return and hang indefinitely.
One way to handle this situation is by using an asynchronous gateway (explained later in this section).
Another way of handling it is to explicitly set the reply-timeout attribute.
That way, the gateway does not hang any longer than the time specified by the reply-timeout and returns 'null' if that timeout does elapse.
Finally, you might want to consider setting downstream flags, such as 'requires-reply', on a service-activator or 'throw-exceptions-on-rejection' on a filter. These options are discussed in more detail in the final section of this chapter.
|
If the downstream flow returns an ErrorMessage , its payload (a Throwable ) is treated as a regular downstream error.
If there is an error-channel configured, it is sent to the error flow.
Otherwise the payload is thrown to the caller of the gateway.
Similarly, if the error flow on the error-channel returns an ErrorMessage , its payload is thrown to the caller.
The same applies to any message with a Throwable payload.
This can be useful in asynchronous situations when when you need to propagate an Exception directly to the caller.
To do so, you can either return an Exception (as the reply from some service) or throw it.
Generally, even with an asynchronous flow, the framework takes care of propagating an exception thrown by the downstream flow back to the gateway.
The TCP Client-Server Multiplex sample demonstrates both techniques to return the exception to the caller.
It emulates a socket IO error to the waiting thread by using an aggregator with group-timeout (see Aggregator and Group Timeout) and a MessagingTimeoutException reply on the discard flow.
|
Gateway Timeouts
Gateways have two timeout properties: requestTimeout
and replyTimeout
.
The request timeout applies only if the channel can block (for example, a bounded QueueChannel
that is full).
The replyTimeout
value is how long the gateway waits for a reply or returns null
.
It defaults to infinity.
The timeouts can be set as defaults for all methods on the gateway (defaultRequestTimeout
and defaultReplyTimeout
) or on the MessagingGateway
interface annotation.
Individual methods can override these defaults (in <method/>
child elements) or on the @Gateway
annotation.
Starting with version 5.0, the timeouts can be defined as expressions, as the following example shows:
@Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
The evaluation context has a BeanResolver
(use @someBean
to reference other beans), and the #args
array variable is available.
When configuring with XML, the timeout attributes can be a long value or a SpEL expression, as the following example shows:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="#args[0]"
request-timeout="1000"
reply-timeout="#args[1]">
</method>
Asynchronous Gateway
As a pattern, the messaging gateway offers a nice way to hide messaging-specific code while still exposing the full capabilities of the messaging system.
As described earlier, the GatewayProxyFactoryBean
provides a convenient way to expose a proxy over a service-interface giving you POJO-based access to a messaging system (based on objects in your own domain, primitives/Strings, or other objects).
However, when a gateway is exposed through simple POJO methods that return values, it implies that, for each request message (generated when the method is invoked), there must be a reply message (generated when the method has returned).
Since messaging systems are naturally asynchronous, you may not always be able to guarantee the contract where “for each request, there will always be be a reply”. Spring Integration 2.0 introduced support for an asynchronous gateway, which offers a convenient way to initiate flows when you may not know if a reply is expected or how long it takes for replies to arrive.
To handle these types of scenarios, Spring Integration uses java.util.concurrent.Future
instances to support an asynchronous gateway.
From the XML configuration, nothing changes, and you still define asynchronous gateway the same way as you define a regular gateway, as the following example shows:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
However, the gateway interface (a service interface) is a little different, as follows:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
As the preceding example shows, the return type for the gateway method is a Future
.
When GatewayProxyFactoryBean
sees that the return type of the gateway method is a Future
, it immediately switches to the asynchronous mode by using an AsyncTaskExecutor
.
That is the extent of the differences.
The call to such a method always returns immediately with a Future
instance.
Then you can interact with the Future
at your own pace to get the result, cancel, and so on.
Also, as with any other use of Future
instances, calling get()
may reveal a timeout, an execution exception, and so on.
The following example shows how to use a Future
that returns from an asynchronous gateway:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
For a more detailed example, see the async-gateway sample in the Spring Integration samples.
ListenableFuture
Starting with version 4.1, asynchronous gateway methods can also return ListenableFuture
(introduced in Spring Framework 4.0).
These return types let you provide a callback, which is invoked when the result is available (or an exception occurs).
When the gateway detects this return type and the task executor is an AsyncListenableTaskExecutor
, the executor’s submitListenable()
method is invoked.
The following example shows how to use a ListenableFuture
:
ListenableFuture<String> result = this.asyncGateway.async("something");
result.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
...
}
@Override
public void onFailure(Throwable t) {
...
}
});
AsyncTaskExecutor
By default, the GatewayProxyFactoryBean
uses org.springframework.core.task.SimpleAsyncTaskExecutor
when submitting internal AsyncInvocationTask
instances for any gateway method whose return type is a Future
.
However, the async-executor
attribute in the <gateway/>
element’s configuration lets you provide a reference to any implementation of java.util.concurrent.Executor
available within the Spring application context.
The (default) SimpleAsyncTaskExecutor
supports both Future
and ListenableFuture
return types, returning FutureTask
or ListenableFutureTask
respectively.
See CompletableFuture
.
Even though there is a default executor, it is often useful to provide an external one so that you can identify its threads in logs (when using XML, the thread name is based on the executor’s bean name), as the following example shows:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
If you wish to return a different Future
implementation, you can provide a custom executor or disable the executor altogether and return the Future
in the reply message payload from the downstream flow.
To disable the executor, set it to null
in the GatewayProxyFactoryBean
(by using setAsyncTaskExecutor(null)
).
When configuring the gateway with XML, use async-executor=""
.
When configuring by using the @MessagingGateway
annotation, use code similar to the following:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
If the return type is a specific concrete Future implementation or some other sub-interface that is not supported by the configured executor, the flow runs on the caller’s thread and the flow must return the required type in the reply message payload.
|
CompletableFuture
Starting with version 4.2, gateway methods can now return CompletableFuture<?>
.
There are two modes of operation when returning this type:
-
When an async executor is provided and the return type is exactly
CompletableFuture
(not a subclass), the framework runs the task on the executor and immediately returns aCompletableFuture
to the caller.CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
is used to create the future. -
When the async executor is explicitly set to
null
and the return type isCompletableFuture
or the return type is a subclass ofCompletableFuture
, the flow is invoked on the caller’s thread. In this scenario, the downstream flow is expected to return aCompletableFuture
of the appropriate type.
Usage Scenarios
In the following scenario, the caller thread returns immediately with a CompletableFuture<Invoice>
, which is completed when the downstream flow replies to the gateway (with an Invoice
object).
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
In the following scenario, the caller thread returns with a CompletableFuture<Invoice>
when the downstream flow provides it as the payload of the reply to the gateway.
Some other process must complete the future when the invoice is ready.
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
In the following scenario, the caller thread returns with a CompletableFuture<Invoice>
when the downstream flow provides it as the payload of the reply to the gateway.
Some other process must complete the future when the invoice is ready.
If DEBUG
logging is enabled, a log entry is emitted, indicating that the async executor cannot be used for this scenario.
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
instances can be used to perform additional manipulation on the reply, as the following example shows:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
Reactor Mono
Starting with version 5.0, the GatewayProxyFactoryBean
allows the use of Project Reactor with gateway interface methods, using a Mono<T>
return type.
The internal AsyncInvocationTask
is wrapped in a Mono.fromCallable()
.
A Mono
can be used to retrieve the result later (similar to a Future<?>
), or you can consume from it with the dispatcher by invoking your Consumer
when the result is returned to the gateway.
The Mono is not immediately flushed by the framework.
Consequently, the underlying message flow is not started before the gateway method returns (as it is with a Future<?> Executor task).
The flow starts when the Mono is subscribed to.
Alternatively, the Mono (being a Composable ) might be a part of Reactor stream, when the subscribe() is related to the entire Flux .
The following example shows how to create a gateway with Project Reactor:
|
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
Another example that uses Project Reactor is a simple callback scenario, as the following example shows:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
The calling thread continues, with handleInvoice()
being called when the flow completes.
Downstream Flows Returning an Asynchronous Type
As mentioned in the ListenableFuture
section above, if you wish some downstream component to return a message with an async payload (Future
, Mono
, and others), you must explicitly set the async executor to null
(or ""
when using XML configuration).
The flow is then invoked on the caller thread and the result can be retrieved later.
void
Return Type
Unlike the return types mentioned earlier, when the method return type is void
, the framework cannot implicitly determine that you wish the downstream flow to run asynchronously, with the caller thread returning immediately.
In this case, you must annotate the interface method with @Async
, as the following example shows:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
Unlike the Future<?>
return types, there is no way to inform the caller if some exception is thrown by the flow, unless some custom TaskExecutor
(such as an ErrorHandlingTaskExecutor
) is associated with the @Async
annotation.
Gateway Behavior When No response Arrives
As explained earlier, the gateway provides a convenient way of interacting with a messaging system through POJO method invocations. However, a typical method invocation, which is generally expected to always return (even with an Exception), might not always map one-to-one to message exchanges (for example, a reply message might not arrive — the equivalent to a method not returning).
The rest of this section covers various scenarios and how to make the gateway behave more predictably.
Certain attributes can be configured to make synchronous gateway behavior more predictable, but some of them might not always work as you might expect.
One of them is reply-timeout
(at the method level or default-reply-timeout
at the gateway level).
We examine the reply-timeout
attribute to see how it can and cannot influence the behavior of the synchronous gateway in various scenarios.
We examine a single-threaded scenario (all components downstream are connected through a direct channel) and multi-threaded scenarios (for example, somewhere downstream you may have a pollable or executor channel that breaks the single-thread boundary).
Long-running Process Downstream
- Sync Gateway, single-threaded
-
If a component downstream is still running (perhaps because of an infinite loop or a slow service), setting a
reply-timeout
has no effect, and the gateway method call does not return until the downstream service exits (by returning or throwing an exception). - Sync Gateway, multi-threaded
-
If a component downstream is still running (perhaps because of an infinite loop or a slow service) in a multi-threaded message flow, setting the
reply-timeout
has an effect by allowing gateway method invocation to return once the timeout has been reached, because theGatewayProxyFactoryBean
polls on the reply channel, waiting for a message until the timeout expires. However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method. You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.
Downstream Component Returns 'null'
- Sync Gateway — single-threaded
-
If a component downstream returns 'null' and no
reply-timeout
has been configured, the gateway method call hangs indefinitely, unless areply-timeout
has been configured or therequires-reply
attribute has been set on the downstream component (for example, a service activator) that might return 'null'. In this case, an exception would be thrown and propagated to the gateway. - Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
Downstream Component Return Signature is 'void' While Gateway Method Signature Is Non-void
- Sync Gateway — single-threaded
-
If a component downstream returns 'void' and no
reply-timeout
has been configured, the gateway method call hangs indefinitely unless areply-timeout
has been configured. - Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
Downstream Component Results in Runtime Exception
- Sync Gateway — single-threaded
-
If a component downstream throws a runtime exception, the exception is propagated through an error message back to the gateway and re-thrown.
- Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
You should understand that, by default, reply-timeout is unbounded.
Consequently, if you do not explicitly set the reply-timeout , your gateway method invocation might hang indefinitely.
So, to make sure you analyze your flow and if there is even a remote possibility of one of these scenarios to occur, you should set the reply-timeout attribute to a "'safe'" value.
Even better, you can set the requires-reply attribute of the downstream component to 'true' to ensure a timely response, as produced by the throwing of an exception as soon as that downstream component returns null internally.
However you should also realize that there are some scenarios (see the first one) where reply-timeout does not help.
That means it is also important to analyze your message flow and decide when to use a synchronous gateway rather than an asynchrnous gateway.
As described earlier, the latter case is a matter of defining gateway methods that return Future instances.
Then you are guaranteed to receive that return value, and you have more granular control over the results of the invocation.
Also, when dealing with a router, you should remember that setting the resolution-required attribute to 'true' results in an exception thrown by the router if it can not resolve a particular channel.
Likewise, when dealing with a Filter, you can set the throw-exception-on-rejection attribute.
In both of these cases, the resulting flow behaves like it contain a service activator with the 'requires-reply' attribute.
In other words, it helps to ensure a timely response from the gateway method invocation.
|
reply-timeout is unbounded for <gateway/> elements (created by the GatewayProxyFactoryBean ).
Inbound gateways for external integration (WS, HTTP, and so on) share many characteristics and attributes with these gateways.
However, for those inbound gateways, the default reply-timeout is 1000 milliseconds (one second).
If a downstream asynchronous hand-off is made to another thread, you may need to increase this attribute to allow enough time for the flow to complete before the gateway times out.
|
You should understand that the timer starts when the thread returns to the gateway — that is, when the flow completes or a message is handed off to another thread. At that time, the calling thread starts waiting for the reply. If the flow was completely synchronous, the reply is immediately available. For asynchronous flows, the thread waits for up to this time. |
See IntegrationFlow
as Gateway in the Java DSL chapter for options to define gateways through IntegrationFlows
.