11. Java DSL

The Spring Integration Java configuration and DSL provides a set of convenient builders and a fluent API that lets you configure Spring Integration message flows from Spring @Configuration classes.

The Java DSL for Spring Integration is essentially a facade for Spring Integration. The DSL provides a simple way to embed Spring Integration Message Flows into your application by using the fluent Builder pattern together with existing Java configuration from Spring Framework and Spring Integration. We also use and support lambdas (available with Java 8) to further simplify Java configuration.

The cafe offers a good example of using the DSL.

The DSL is presented by the IntegrationFlows factory for the IntegrationFlowBuilder. This produces the IntegrationFlow component, which should be registered as a Spring bean (by using the @Bean annotation). The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that can accept lambdas as arguments.

The IntegrationFlowBuilder only collects integration components (MessageChannel instances, AbstractEndpoint instances, and so on) in the IntegrationFlow bean for further parsing and registration of concrete beans in the application context by the IntegrationFlowBeanPostProcessor.

The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing. However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration’s support for the Spring Expression Language (SpEL) and inline scripting address this, but lambdas are easier and much more powerful.

The following example shows how to use Java Configuration for Spring Integration:

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.from(integerSource::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

The result of the preceding configuration example is that it creates, after ApplicationContext start up, Spring Integration endpoints and message channels. Java configuration can be used both to replace and augment XML configuration. You need not replace all of your existing XML configuration to use Java configuration.

11.1 DSL Basics

The org.springframework.integration.dsl package contains the IntegrationFlowBuilder API mentioned earlier and a number of IntegrationComponentSpec implementations, which are also builders and provide the fluent API to configure concrete endpoints. The IntegrationFlowBuilder infrastructure provides common enterprise integration patterns (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform → Transformer
  • filter → Filter
  • handle → ServiceActivator
  • split → Splitter
  • aggregate → Aggregator
  • route → Router
  • bridge → Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term message flow, but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime. The following example uses the IntegrationFlows factory to define an IntegrationFlow bean by using EIP-methods from IntegrationFlowBuilder:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

The transform method accepts a lambda as an endpoint argument to operate on the message payload. The real argument of this method is GenericTransformer<S, T>. Consequently, any of the provided transformers (ObjectToJsonTransformer, FileToStringTransformer, and other) can be used here.

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and the endpoint for it, with MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Consider another example:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

The preceding example composes a sequence of Filter -> Transformer -> Service Activator. The flow is "one way". That is, it does not provide a reply message but only prints the payload to STDOUT. The endpoints are automatically wired together by using direct channels.

[Important]Lambdas And Message<?> Arguments

When using lambdas in EIP methods, the "input" argument is generally the message payload. If you wish to access the entire message, use one of the overloaded methods that take a Class<?> as the first parameter. For example, this won’t work:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

This will fail at runtime with a ClassCastException because the lambda doesn’t retain the argument type and the framework will attempt to cast the payload to a Message<?>.

Instead, use:

.(Message.class, m -> newFooFromMessage(m))

11.2 Message Channels

In addition to the IntegrationFlowBuilder with EIP methods, the Java DSL provides a fluent API to configure MessageChannel instances. For this purpose the MessageChannels builder factory is provided. The following example shows how to use it:

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

The same MessageChannels builder factory can be used in the channel() EIP method from IntegrationFlowBuilder to wire endpoints, similar to wiring an input-channel/output-channel pair in the XML configuration. By default, endpoints are wired with DirectChannel instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]. This rule is also applied for unnamed channels produced by inline MessageChannels builder factory usage. However all MessageChannels methods have a variant that is aware of the channelId that you can use to set the bean names for MessageChannel instances. The MessageChannel references and beanName can be used as bean-method invocations. The following example shows the possible ways to use the channel() EIP method:

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlows.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") means "find and use the MessageChannel with the "input" id, or create one".
  • fixedSubscriberChannel() produces an instance of FixedSubscriberChannel and registers it with a name of channelFlow.channel#0.
  • channel("queueChannel") works the same way but uses an existing queueChannel bean.
  • channel(publishSubscribe()) is the bean-method reference.
  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) is the IntegrationFlowBuilder that exposes IntegrationComponentSpec to the ExecutorChannel and registers it as executorChannel.
  • channel("output") registers the DirectChannel bean with output as its name, as long as no beans with this name already exist.

Note: The preceding IntegrationFlow definition is valid, and all of its channels are applied to endpoints with BridgeHandler instances.

[Important]Important

Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances. Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel) from different IntegrationFlow containers. The following example is wrong:

@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlows.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

The result of that bad example is the following exception:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

To make it work, you need to declare @Bean for that channel and use its bean method from different IntegrationFlow instances.

11.3 Pollers

Spring Integration also provides a fluent API that lets you configure PollerMetadata for AbstractPollingEndpoint implementations. You can use the Pollers builder factory to configure common bean definitions or those created from IntegrationFlowBuilder EIP methods, as the following example shows:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

See Pollers and PollerSpec in the Javadoc for more information.

[Important]Important

If you use the DSL to construct a PollerSpec as a @Bean, do not call the get() method in the bean definition. The PollerSpec is a FactoryBean that generates the PollerMetadata object from the specification and initializes all of its properties.

11.4 DSL and Endpoint Configuration

All IntegrationFlowBuilder EIP methods have a variant that applies the lambda parameter to provide options for AbstractEndpoint instances: SmartLifecycle, PollerMetadata, request-handler-advice-chain, and others. Each of them has generic arguments, so it lets you configure an endpoint and even its MessageHandler in the context, as the following example shows:

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlows.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

In addition, the EndpointSpec provides an id() method to let you register an endpoint bean with a given bean name, rather than a generated one.

If the MessageHandler is referenced as a bean, then any existing adviceChain configuration will be overridden if the .advice() method is present in the DSL definition:

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

i.e. they are not merged, only the testAdvice() bean is used in this case.

11.5 Transformers

The DSL API provides a convenient, fluent Transformers factory to be used as inline target object definition within the .transform() EIP method. The following example shows how to use it:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

It avoids inconvenient coding using setters and makes the flow definition more straightforward. Note that you can use Transformers to declare target Transformer instances as @Bean instances and, again, use them from IntegrationFlow definition as bean methods. Nevertheless, the DSL parser takes care of bean declarations for inline objects, if they are not yet defined as beans.

See [https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/Transformers.html] in the Javadoc for more information and supported factory methods.

Also see Lambdas And Message<?> Arguments.

11.6 Inbound Channel Adapters

Typically, message flows start from an inbound channel adapter (such as <int-jdbc:inbound-channel-adapter>). The adapter is configured with <poller>, and it asks a MessageSource<?> to periodically produce messages. Java DSL allows for starting IntegrationFlow from a MessageSource<?>, too. For this purpose, the IntegrationFlows builder factory provides an overloaded IntegrationFlows.from(MessageSource<?> messageSource) method. You can configure the MessageSource<?> as a bean and provide it as an argument for that method. The second parameter of IntegrationFlows.from() is a Consumer<SourcePollingChannelAdapterSpec> lambda that lets you provide options (such as PollerMetadata or SmartLifecycle) for the SourcePollingChannelAdapter. The following example shows how to use the fluent API and a lambda to create an IntegrationFlow:

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

For those cases that have no requirements to build Message objects directly, you can use the IntegrationFlows.from() variant that is based on the java.util.function.Supplier . The result of the Supplier.get() is automatically wrapped in a Message (if it is not already a Message).

11.7 Message Routers

Spring Integration natively provides specialized router types, including:

  • HeaderValueRouter
  • PayloadTypeRouter
  • ExceptionTypeRouter
  • RecipientListRouter
  • XPathRouter

As with many other DSL IntegrationFlowBuilder EIP methods, the route() method can apply any AbstractMessageRouter implementation or, for convenience, a String as a SpEL expression or a ref-method pair. In addition, you can configure route() with a lambda and use a lambda for a Consumer<RouterSpec<MethodInvokingRouter>>. The fluent API also provides AbstractMappingMessageRouter options such as channelMapping(String key, String channelName) pairs, as the following example shows:

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping("true", "even")
                            .channelMapping("false", "odd")
            )
            .get();
}

The following example shows a simple expression-based router:

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

The routeToRecipients() method takes a Consumer<RecipientListRouterSpec>, as the following example shows:

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlows.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
                        .routeToRecipients(r -> r
                .recipient("thing1-channel", "'thing1' == payload")
                .recipient("thing2-channel", m ->
                    m.getHeaders().containsKey("recipient")
                        && (boolean) m.getHeaders().get("recipient"))
                .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                    f -> f.<String, String>transform(String::toUpperCase)
                        .channel(c -> c.queue("recipientListSubFlow1Result")))
                .recipientFlow((String p) -> p.startsWith("thing3"),
                    f -> f.transform("Hello "::concat)
                        .channel(c -> c.queue("recipientListSubFlow2Result")))
                .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                             "thing3".equals(m.getPayload())),
                    f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                .defaultOutputToParentFlow())
            .get();
}

The .defaultOutputToParentFlow() of the .routeToRecipients() definition lets you set the router’s defaultOutput as a gateway to continue a process for the unmatched messages in the main flow.

Also see Lambdas And Message<?> Arguments.

11.8 Splitters

To create a splitter, use the split() EIP method. By default, if the payload is an Iterable, an Iterator, an Array, a Stream, or a reactive Publisher, the split() method outputs each item as an individual message. It accepts a lambda, a SpEL expression, or any AbstractMessageSplitter implementation. Alternatively, you can use it without parameters to provide the DefaultMessageSplitter. The following example shows how to use the split() method by providing a lambda:

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitInput")
              .split(s ->
                      s.applySequence(false).get().getT2().setDelimiters(","))
              .channel(MessageChannels.executor(this.taskExecutor()))
              .get();
}

The preceding example creates a splitter that splits a message containing a comma-delimited String. Note: The getT2() method comes from a Tuple Collection, which is the result of EndpointSpec.get(), and represents a pair of ConsumerEndpointFactoryBean and DefaultMessageSplitter for the preceding example.

Also see Lambdas And Message<?> Arguments.

11.9 Aggregators and Resequencers

An Aggregator is conceptually the opposite of a Splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator returns a message that contains a collection of payloads from incoming messages. The same rules are applied for the Resequencer. The following example shows a canonical example of the splitter-aggregator pattern:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlows.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

The split() method splits the list into individual messages and sends them to the ExecutorChannel. The resequence() method reorders messages by sequence details found in the message headers. The aggregate() method collects those messages.

However, you can change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following example:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

The preceding example correlates messages that have myCorrelationKey headers and releases the messages once at least ten have been accumulated.

Similar lambda configurations are provided for the resequence() EIP method.

11.10 Service Activators and the .handle() method

The .handle() EIP method’s goal is to invoke any MessageHandler implementation or any method on some POJO. Another option is to define an "activity" by using lambda expressions. Consequently, we introduced a generic GenericHandler<P> functional interface. Its handle method requires two arguments: P payload and MessageHeaders headers (starting with version 5.1). Having that, we can define a flow as follows:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

The preceding example doubles any integer it receives.

However, one main goal of Spring Integration is loose coupling, through runtime type conversion from message payload to the target arguments of the message handler. Since Java does not support generic type resolution for lambda classes, we introduced a workaround with an additional payloadType argument for the most EIP methods and LambdaMessageProcessor. Doing so delegates the hard conversion work to Spring’s ConversionService, which uses the provided type and the requested message to target method arguments. The following example shows what the resulting IntegrationFlow might look like:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

We also can register some BytesToIntegerConverter within ConversionService to get rid of that additional .transform():

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

Also see Lambdas And Message<?> Arguments.

11.11 Operator log()

For convenience, to log the message journey through the Spring Integration flow (<logging-channel-adapter>), a log() operator is presented. Internally, it is represented by the WireTap ChannelInterceptor with a LoggingHandler as its subscriber. It is responsible for logging the incoming message into the next endpoint or the current channel. The following example shows how to use LoggingHandler:

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

In the preceding example, an id header is logged at the ERROR level onto test.category only for messages that passed the filter and before routing.

When this operator is used at the end of a flow, it is a one-way handler and the flow ends. To make it as a reply-producing flow, you can either use a simple bridge() after the log() or, starting with version 5.1, you can use a logAndReply() operator instead. logAndReply can only be used at the end of a flow.

11.12 MessageChannelSpec.wireTap()

Spring Integration includes a .wireTap() fluent API MessageChannelSpec builders. The following example shows how to use the wireTap method to log input:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
[Important]Important

If the MessageChannel is an instance of ChannelInterceptorAware, the log() or wireTap() operators are applied to the current MessageChannel. Otherwise, an intermediate DirectChannel is injected into the flow for the currently configured endpoint. In the following example, the WireTap interceptor is added to myChannel directly, because DirectChannel implements ChannelInterceptorAware:

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

When the current MessageChannel does not implement ChannelInterceptorAware, an implicit DirectChannel and BridgeHandler are injected into the IntegrationFlow, and the WireTap is added to this new DirectChannel. The following example does not have any channel declaration:

.handle(...)
.log()
}

In the preceding example (and any time no channel has been declared), an implicit DirectChannel is injected in the current position of the IntegrationFlow and used as an output channel for the currently configured ServiceActivatingHandler (from the .handle(), described earlier).

11.13 Working With Message Flows

IntegrationFlowBuilder provides a top-level API to produce integration components wired to message flows. When your integration may be accomplished with a single flow (which is often the case), this is convenient. Alternately IntegrationFlow instances can be joined via MessageChannel instances.

By default, MessageFlow behaves as a "chain" in Spring Integration parlance. That is, the endpoints are automatically and implicitly wired by DirectChannel instances. The message flow is not actually constructed as a chain, which offers much more flexibility. For example, you may send a message to any component within the flow, if you know its inputChannel name (that is, if you explicitly define it). You may also reference externally defined channels within a flow to allow the use of channel adapters (to enable remote transport protocols, file I/O, and so on), instead of direct channels. As such, the DSL does not support the Spring Integration chain element, because it does not add much value in this case.

Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration infrastructure, it can be used together with XML definitions and wired with Spring Integration messaging annotation configuration.

You can also define direct IntegrationFlow instances by using a lambda. The following example shows how to do so:

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

The result of this definition is the same set of integration components that are wired with an implicit direct channel. The only limitation here is that this flow is started with a named direct channel - lambdaFlow.input. Also, a Lambda flow cannot start from MessageSource or MessageProducer.

Starting with version 5.1, this kind of IntegrationFlow is wrapped to the proxy to expose lifecycle control and provide access to the inputChannel of the internally associated StandardIntegrationFlow.

Starting with version 5.0.6, the generated bean names for the components in an IntegrationFlow include the flow bean followed by a dot (.) as a prefix. For example, the ConsumerEndpointFactoryBean for the .transform("Hello "::concat) in the preceding sample results in a bean name of lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0. (The o.s.i is a shortened from org.springframework.integration to fit on the page.) The Transformer implementation bean for that endpoint has a bean name of lambdaFlow.transformer#0 (starting with version 5.1), where instead of a fully qualified name of the MethodInvokingTransformer class, its component type is used. The same pattern is applied for all the NamedComponent s when the bean name has to be generated within the flow. These generated bean names are prepended with the flow ID for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime. See Section 11.18, “Dynamic and Runtime Integration Flows” for more information.

11.14 FunctionExpression

We introduced the FunctionExpression class (an implementation of SpEL’s Expression interface) to let us use lambdas and generics. The Function<T, R> option is provided for the DSL components, along with an expression option, when there is the implicit Strategy variant from Core Spring Integration. The following example shows how to use a function expression:

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

The FunctionExpression also supports runtime type conversion, as is done in SpelExpression.

11.15 Sub-flows support

Some of if...else and publish-subscribe components provide the ability to specify their logic or mapping by using sub-flows. The simplest sample is .publishSubscribeChannel(), as the following example shows:

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

You can achieve the same result with separate IntegrationFlow @Bean definitions, but we hope you find the sub-flow style of logic composition useful. We find that it results in shorter (and so more readable) code.

A similar publish-subscribe sub-flow composition provides the .routeToRecipients() method.

Another example is using .discardFlow() instead of .discardChannel() on the .filter() method.

The .route() deserves special attention. Consider the following example:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

The .channelMapping() continues to work as it does in regular Router mapping, but the .subFlowMapping() tied that sub-flow to the main flow. In other words, any router’s sub-flow returns to the main flow after .route().

[Important]Important

Sometimes, you need to refer to an existing IntegrationFlow @Bean from the .subFlowMapping(). The following example shows how to do so:

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}

In this case, when you need to receive a reply from such a sub-flow and continue the main flow, this IntegrationFlow bean reference (or its input channel) has to be wrapped with a .gateway() as shown in the preceding example. The oddFlow() reference in the preceding example is not wrapped to the .gateway(). Therefore, we do not expect a reply from this routing branch. Otherwise, you end up with an exception similar to the following:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' ([email protected]a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed.

Sub-flows can be nested to any depth, but we do not recommend doing so. In fact, even in the router case, adding complex sub-flows within a flow would quickly begin to look like a plate of spaghetti and be difficult for a human to parse.

11.16 Using Protocol Adapters

All of the examples shown so far illustrate how the DSL supports a messaging architecture by using the Spring Integration programming model. However, we have yet to do any real integration. Doing so requires access to remote resources over HTTP, JMS, AMQP, TCP, JDBC, FTP, SMTP, and so on or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them, but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL is continually catching up with Spring Integration.

Consequently, we provide the high-level API to seamlessly define protocol-specific messaging. We do so with the factory and builder patterns and with lambdas. You can think of the factory classes as "Namespace Factories", because they play the same role as the XML namespace for components from the concrete protocol-specific Spring Integration modules. Currently, Spring Integration Java DSL supports the Amqp, Feed, Jms, Files, (S)Ftp, Http, JPA, MongoDb, TCP/UDP, Mail, WebFlux, and Scripts namespace factories. The following example shows how to use three of them (Amqp, Jms, and Mail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

The preceding example shows how to use the "namespace factories" as inline adapters declarations. However, you can use them from @Bean definitions to make the IntegrationFlow method chain more readable.

[Note]Note

We are soliciting community feedback on these namespace factories before we spend effort on others. We also appreciate any input into prioritization for which adapters and gateways we should support next.

You can find more Java DSL samples in the protocol-specific chapters throughout this reference manual.

All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow, as the following examples show:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlows.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

11.17 IntegrationFlowAdapter

The IntegrationFlow interface can be implemented directly and specified as a component for scanning, as the following example shows:

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

It is picked up by the IntegrationFlowBeanPostProcessor and correctly parsed and registered in the application context.

For convenience and to gain the benefits of loosely coupled architecture, we provide the IntegrationFlowAdapter base class implementation. It requires a buildFlow() method implementation to produce an IntegrationFlowDefinition by using one of from() methods, as the following example shows:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this, "messageSource",
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

11.18 Dynamic and Runtime Integration Flows

IntegrationFlow and all its dependent components can be registered at runtime. Before version 5.0, we used the BeanFactory.registerSingleton() hook. Starting in the Spring Framework 5.0, we use the instanceSupplier hook for programmatic BeanDefinition registration. The following example shows how to programmatically register a bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

Note that, in the preceding example, the instanceSupplier hook is the last parameter to the genericBeanDefinition method, provided by a lambda in this case.

All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.

To simplify the development experience, Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime, as the following example shows:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based and we must create it on the fly. Such a sample is Reactive Streams event source, as the following example shows:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlows.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

The IntegrationFlowRegistrationBuilder (as a result of the IntegrationFlowContext.registration()) can be used to specify a bean name for the IntegrationFlow to register, to control its autoStartup, and to register, non-Spring Integration beans. Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.

You can use the IntegrationFlowRegistration.destroy() callback to remove a dynamically registered IntegrationFlow and all its dependent beans when you no longer need them. See the IntegrationFlowContext Javadoc for more information.

[Note]Note

Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with the flow ID as a prefix. We recommend always specifying an explicit flow ID. Otherwise, a synchronization barrier is initiated in the IntegrationFlowContext, to generate the bean name for the IntegrationFlow and register its beans. We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.

Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix(). This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler.

[Note]Note

An id attribute is required when you usE useFlowIdAsPrefix().

11.19 IntegrationFlow as Gateway

The IntegrationFlow can start from the service interface that provides a GatewayProxyFactoryBean component, as the following example shows:

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

All the proxy for interface methods are supplied with the channel to send messages to the next integration component in the IntegrationFlow. You can mark the service interface with the @MessagingGateway annotation and mark the methods with the @Gateway annotations. Nevertheless, the requestChannel is ignored and overridden with that internal channel for the next component in the IntegrationFlow. Otherwise, creating such a configuration by using IntegrationFlow does not make sense.

By default a GatewayProxyFactoryBean gets a conventional bean name, such as [FLOW_BEAN_NAME.gateway]. You can change that ID by using the @MessagingGateway.name() attribute or the overloaded from(Class<?> serviceInterface, String beanName) factory method.

With Java 8, you can even create an integration fateway with the java.util.function interfaces, as the following example shows:

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlows.from(Function.class, "errorRecovererFunction")
            .handle((GenericHandler<?>) (p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

That errorRecovererFlow can be used as follows:

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;