9. Java DSL

The Spring Integration JavaConfig and DSL provides a set of convenient Builders and a fluent API to configure Spring Integration message flows from Spring @Configuration classes.

9.1 Example Configurations

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.from(integerSource::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

As the result after ApplicationContext start up Spring Integration endpoints and Message Channels will be created as is the case after XML parsing. Such configuration can be used to replace XML configuration or along side with it.

9.2 Introduction

The Java DSL for Spring Integration is essentially a facade for Spring Integration. The DSL provides a simple way to embed Spring Integration Message Flows into your application using the fluent Builder pattern together with existing Java and Annotation configurations from Spring Framework and Spring Integration as well. Another useful tool to simplify configuration is Java 8 Lambdas.

The cafe is a good example of using the DSL.

The DSL is presented by the IntegrationFlows Factory for the IntegrationFlowBuilder. This produces the IntegrationFlow component, which should be registered as a Spring bean (@Bean). The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that may accept Lambdas as arguments.

The IntegrationFlowBuilder just collects integration components (MessageChannel s, AbstractEndpoint s etc.) in the IntegrationFlow bean for further parsing and registration of concrete beans in the application context by the IntegrationFlowBeanPostProcessor.

The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing. However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline Lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration’s support for the Spring Expression Language (SpEL) and inline scripting address this, but Java Lambdas are easier and much more powerful.

9.3 DSL Basics

The org.springframework.integration.dsl package contains the IntegrationFlowBuilder API mentioned above and a bunch of IntegrationComponentSpec implementations which are builders too and provide the fluent API to configure concrete endpoints. The IntegrationFlowBuilder infrastructure provides common EIP for message based applications, such as channels, endpoints, pollers and channel interceptors.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform → Transformer
  • filter → Filter
  • handle → ServiceActivator
  • split → Splitter
  • aggregate → Aggregator
  • route → Router
  • bridge → Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term message flow, but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and isn’t used at runtime:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

Here we use the IntegrationFlows factory to define an IntegrationFlow bean using EIP-methods from IntegrationFlowBuilder.

The transform method accepts a Lambda as an endpoint argument to operate on the message payload. The real argument of this method is GenericTransformer<S, T>, hence any out-of-the-box transformers (ObjectToJsonTransformer, FileToStringTransformer etc.) can be used here.

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and endpoint for that: MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Let’s look at another example:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

The above example composes a sequence of Filter -> Transformer -> Service Activator. The flow is one way, that is it does not provide a a reply message but simply prints the payload to STDOUT. The endpoints are automatically wired together using direct channels.

9.4 Message Channels

In addition to the IntegrationFlowBuilder with EIP-methods the Java DSL provides a fluent API to configure MessageChannel s. For this purpose the MessageChannels builder factory is provided:

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

The same MessageChannels builder factory can be used in the channel() EIP-method from IntegrationFlowBuilder to wire endpoints similar to an input-channel/output-channel pair in the XML configuration. By default endpoints are wired via DirectChannel s where the bean name is based on the pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]. This rule is applied for unnamed channels produced by inline MessageChannels builder factory usage, too. However all MessageChannels methods have a channelId -aware variant to create the bean names for MessageChannel s. The MessageChannel references can be used as well as beanName, as bean-method invocations. Here is a sample with possible variants of channel() EIP-method usage:

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlows.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") means: find and use the MessageChannel with the "input" id, or create one;
  • fixedSubscriberChannel() produces an instance of FixedSubscriberChannel and registers it with name channelFlow.channel#0;
  • channel("queueChannel") works the same way but, of course, uses an existing "queueChannel" bean;
  • channel(publishSubscribe()) - the bean-method reference;
  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) the IntegrationFlowBuilder unwraps IntegrationComponentSpec to the ExecutorChannel and registers it as "executorChannel";
  • channel("output") - registers the DirectChannel bean with "output" name as long as there are no beans with this name.

Note: the IntegrationFlow definition shown above is valid and all of its channels are applied to endpoints with BridgeHandler s.

[Important]Important

Be careful to use the same inline channel definition via MessageChannels factory from different IntegrationFlow s. Even if the DSL parsers register non-existing objects as beans, it can’t determine the same object (MessageChannel) from different IntegrationFlow containers. This is wrong:

@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlows.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

You end up with:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

To make it working there is just need to declare @Bean for that channel and use its bean-method from different IntegrationFlow s.

9.5 Pollers

A similar fluent API is provided to configure PollerMetadata for AbstractPollingEndpoint implementations. The Pollers builder factory can be used to configure common bean definitions or those created from IntegrationFlowBuilder EIP-methods:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

See Pollers and PollerSpec Java Docs for more information.

[Important]Important

If you use the DSL to construct a PollerSpec as a @Bean, do not call the get() method in the bean definition; the PollerSpec is a FactoryBean that will generate the PollerMetadata object from the specification and initialize all of its properties as needed.

9.6 DSL and Endpoint Configuration

All IntegrationFlowBuilder EIP-methods have a variant to apply the Lambda parameter to provide options for AbstractEndpoint s: SmartLifecycle, PollerMetadata, request-handler-advice-chain etc. Each of them has generic arguments, so it allows you to simply configure an endpoint and even its MessageHandler in the context:

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlows.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

In addition the EndpointSpec provides an id() method to allow you to register an endpoint bean with a given bean name, rather than a generated one.

9.7 Transformers

The DSL API provides a convenient, fluent Transformers factory to be used as inline target object definition within .transform() EIP-method:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

It avoids inconvenient coding using setters and makes the flow definition more straightforward. Note, that Transformers can be use to declare target Transformer s as @Bean s and, again, use them from IntegrationFlow definition as bean-methods. Nevertheless, the DSL parser takes care about bean declarations for inline objects, if they aren’t defined as beans yet.

See Transformers Java Docs for more information and supported factory methods.

9.8 Inbound Channel Adapters

Typically message flows start from some Inbound Channel Adapter (e.g. <int-jdbc:inbound-channel-adapter>). The adapter is configured with <poller> and it asks a MessageSource<?> for producing messages periodically. Java DSL allows to start IntegrationFlow from a MessageSource<?>, too. For this purpose IntegrationFlows builder factory provides overloaded IntegrationFlows.from(MessageSource<?> messageSource) method. The MessageSource<?> may be configured as a bean and provided as argument for that method. The second parameter of IntegrationFlows.from() is a Consumer<SourcePollingChannelAdapterSpec> Lambda and allows to provide options for the SourcePollingChannelAdapter, e.g. PollerMetadata or SmartLifecycle:

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM foo");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

There is also an IntegrationFlows.from() variant based on the java.util.function.Supplier if there is no requirements to build Message objects directly. The result of the Supplier.get() is wrapped to the Message (if it isn’t message already) by Framework automatically.

The next sections discuss selected endpoints which require further explanation.

9.9 Message Routers

Spring Integration natively provides specialized router types including:

  • HeaderValueRouter
  • PayloadTypeRouter
  • ExceptionTypeRouter
  • RecipientListRouter
  • XPathRouter

As with many other DSL IntegrationFlowBuilder EIP-methods the route() method can apply any out-of-the-box AbstractMessageRouter implementation, or for convenience a String as a SpEL expression, or a ref/method pair. In addition route() can be configured with a Lambda - the inline method invocation case, and with a Lambda for a Consumer<RouterSpec<MethodInvokingRouter>>. The fluent API also provides AbstractMappingMessageRouter options like channelMapping(String key, String channelName) pairs:

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping("true", "even")
                            .channelMapping("false", "odd")
            )
            .get();
}

A simple expression-based router:

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

The routeToRecipients() method takes a Consumer<RecipientListRouterSpec>:

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlows.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
                        .routeToRecipients(r -> r
                .recipient("foo-channel", "'foo' == payload")
                .recipient("bar-channel", m ->
                    m.getHeaders().containsKey("recipient")
                        && (boolean) m.getHeaders().get("recipient"))
                .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
                    f -> f.<String, String>transform(String::toUpperCase)
                        .channel(c -> c.queue("recipientListSubFlow1Result")))
                .recipientFlow((String p) -> p.startsWith("baz"),
                    f -> f.transform("Hello "::concat)
                        .channel(c -> c.queue("recipientListSubFlow2Result")))
                .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                             "bax".equals(m.getPayload())),
                    f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                .defaultOutputToParentFlow())
            .get();
}

The .defaultOutputToParentFlow() of the .routeToRecipients() allows to make the router’s defaultOutput as a gateway to continue a process for the unmatched messages in the main flow.

9.10 Splitters

A splitter is created using the split() EIP-method. By default, if the payload is a Iterable, Iterator, Array, Stream or Reactive Publisher, this will output each item as an individual message. This takes a Lambda, SpEL expression, any AbstractMessageSplitter implementation, or can be used without parameters to provide the DefaultMessageSplitter. For example:

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitInput")
              .split(s ->
                      s.applySequence(false).get().getT2().setDelimiters(","))
              .channel(MessageChannels.executor(this.taskExecutor()))
              .get();
}

This creates a splitter that splits a message containing a comma delimited String. Note: the getT2() method comes from Tuple Collection which is the result of EndpointSpec.get() and represents a pair of ConsumerEndpointFactoryBean and DefaultMessageSplitter for the example above.

9.11 Aggregators and Resequencers

An Aggregator is conceptually the converse of a Splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator will return a message containing a collection of payloads from incoming messages. The same rules are applied for the Resequencer:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlows.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

The above is a canonical example of splitter/aggregator pattern. The split() method splits the list into individual messages and sends them to the ExecutorChannel. The resequence() method reorders messages by sequence details from message headers. The aggregate() method just collects those messages to the result list.

However, you may change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

The similar Lambda configurations are provided for the resequence() EIP-method.

9.12 ServiceActivators (.handle())

The .handle() EIP-method’s goal is to invoke any MessageHandler implementation or any method on some POJO. Another option to define "activity" via Lambda expression. Hence a generic GenericHandler<P> functional interface has been introduced. Its handle method requires two arguments - P payload and Map<String, Object> headers. Having that we can define a flow like this:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

However one main goal of Spring Integration an achieving of loose coupling via runtime type conversion from message payload to target arguments of message handler. Since Java doesn’t support generic type resolution for Lambda classes, we introduced a workaround with additional payloadType argument for the most EIP-methods and LambdaMessageProcessor, which delegates the hard conversion work to the Spring’s ConversionService using provided type and requested message to target method arguments. The IntegrationFlow might look like this:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

Of course we register some custom BytesToIntegerConverter within ConversionService and get rid of that additional .transform().

9.13 Operator log()

For convenience to log the message journey throw the Spring Integration flow (<logging-channel-adapter>), a log() operator is presented. Underneath it is represented by the WireTap ChannelInterceptor and LoggingHandler as subscriber. It is responsible to log message incoming into the next endpoint or for the current channel:

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

In this example an id header will be logged with ERROR level onto "test.category" only for messages passed the filter and before routing.

9.14 MessageChannelSpec.wireTap()

A .wireTap() fluent API exists for MessageChannelSpec builders. A target configuration gains much more from Java DSL usage:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
[Important]Important

The log() or wireTap() opearators are applied to the current MessageChannel (if it is an instance of ChannelInterceptorAware) or an intermediate DirectChannel is injected into the flow for the currently configured endpoint. In the example below the WireTap interceptor is added to the myChannel directly, because DirectChannel implements ChannelInterceptorAware:

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

When current MessageChannel doesn’t implement ChannelInterceptorAware, an implicit DirectChannel and BridgeHandler are injected into the IntegrationFlow and the WireTap is added to this new DirectChannel. And when there is not any channel declaration like in this sample:

.handle(...)
.log()
}

an implicit DirectChannel is injected in the current position of the IntegrationFlow and it is used as an output channel for the currently configured ServiceActivatingHandler (the .handle() above).

[Important]Important

If log() or wireTap() are used in the end of flow they are considered one-way MessageHandler s. If the integration flow is expected to return a reply, a bridge() should be added to the end, after log() or wireTap():

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlows
        .from(WebFlux.inboundGateway("/sse")
            .requestMapping(m ->
                m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
        .handle((p, h) -> Flux.just("foo", "bar", "baz"))
        .log(LoggingHandler.Level.WARN)
        .bridge()
        .get();
}

9.15 Working With Message Flows

As we have seen, IntegrationFlowBuilder provides a top level API to produce Integration components wired to message flows. This is convenient if your integration may be accomplished with a single flow (which is often the case). Alternately IntegrationFlow s can be joined via MessageChannel s.

By default, the MessageFlow behaves as a Chain in Spring Integration parlance. That is, the endpoints are automatically wired implicitly via DirectChannel s. The message flow is not actually constructed as a chain, affording much more flexibility. For example, you may send a message to any component within the flow, if you know its inputChannel name, i.e., explicitly define it. You may also reference externally defined channels within a flow to allow the use of channel adapters to enable remote transport protocols, file I/O, and the like, instead of direct channels. As such, the DSL does not support the Spring Integration chain element since it doesn’t add much value.

Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration infrastructure, it can be used together with Integration XML definitions and wired with Spring Integration Messaging Annotations configuration.

Another alternative to define direct IntegrationFlow s is based on a fact that IntegrationFlow can be declared as Lambda too:

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

The result of this definition is the same bunch of Integration components wired with implicit direct channel. Only limitation is here, that this flow is started with named direct channel - lambdaFlow.input. And Lambda flow can’t start from MessageSource or MessageProducer.

Starting with version 5.0.6, the generated bean names for the components in an IntegrationFlow include the flow bean followed by a dot as a prefix. For example the ConsumerEndpointFactoryBean for the .transform("Hello "::concat) in the sample above, will end up with te bean name like lambdaFlow.org.springframework.integration.config. ConsumerEndpointFactoryBean#0. The Transformer implementation bean for that endpoint will have a bean name such as lambdaFlow.org.springframework.integration.transformer. MethodInvokingTransformer#0. These generated bean names are prepended with the flow id prefix for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime. See Section 9.20, “Dynamic and runtime Integration Flows” for more information.

9.16 FunctionExpression

The FunctionExpression (an implementation of SpEL Expression) has been introduced to get a gain of Java and Lambda usage for the method and its generics context. The Function<T, R> option is provided for the DSL components alongside with expression option, when there is the implicit Strategy variant from Core Spring Integration. The usage may look like:

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

The FunctionExpression also supports runtime type conversion as it is done in the standard SpelExpression.

9.17 Sub Flows support

Some of if...else and publish-subscribe components provide the support to specify their logic or mapping using Sub Flows. The simplest sample is .publishSubscribeChannel():

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

Of course the same result we can achieve with separate IntegrationFlow @Bean definitions, but we hope you’ll find the subflow style of logic composition useful.

Similar publish-subscribe subflow composition provides .routeToRecipients().

Another sample is .discardFlow() on the .filter() instead of .discardChannel().

The .route() deserves special attention. As a sample:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

The .channelMapping() continues to work as in regular Router mapping, but the .subFlowMapping() tied that subflow with main flow. In other words, any router’s subflow returns to the main flow after .route().

Of course, subflows can be nested with any depth, but we don’t recommend to do that because, in fact, even in the router case, adding complex subflows within a flow would quickly begin to look like a plate of spaghetti and difficult for a human to parse.

9.18 Using Protocol Adapters

All of the examples so far illustrate how the DSL supports a messaging architecture using the Spring Integration programming model, but we haven’t done any real integration yet. This requires access to remote resources via http, jms, amqp, tcp, jdbc, ftp, smtp, and the like, or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL will continually be catching up with Spring Integration.

Anyway we are providing the hi-level API to define protocol-specific seamlessly. This is achieved with Factory and Builder patterns and, of course, with Lambdas. The factory classes can be considered "Namespace Factories", because they play the same role as XML namespace for components from the concrete protocol-specific Spring Integration modules. Currently, Spring Integration Java DSL supports Amqp, Feed, Jms, Files, (S)Ftp, Http, JPA, MongoDb, TCP/UDP, Mail, WebFlux and Scripts namespace factories:

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

We show here the usage of namespace factories as inline adapters declarations, however they can be used from @Bean definitions to make the IntegrationFlow method-chain more readable.

We are soliciting community feedback on these namespace factories before we spend effort on others; we’d also appreciate some prioritization for which adapters/gateways we should support next.

See more Java DSL samples in the protocol-specific chapter throughout this reference manual.

All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlows.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

9.19 IntegrationFlowAdapter

The IntegrationFlow as an interface can be implemented directly and specified as component for scanning:

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

And yes, it is picked up by the IntegrationFlowBeanPostProcessor and correctly parsed and registered in the application context.

For convenience and loosely coupled architecture the IntegrationFlowAdapter base class implementation is provided. It requires a buildFlow() method implementation to produce an IntegrationFlowDefinition using one of from() support methods:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new tomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this, "messageSource",
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
  	   .transform(this)
  	   .aggregate(a -> a.processor(this, null), null)
  	   .enrichHeaders(Collections.singletonMap("foo", "FOO"))
  	   .filter(this)
  	   .handle(this)
  	   .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "B,A,R";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> foo) {
            return foo.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String foo) {
           return payload + ":" + foo;
    }

}

9.20 Dynamic and runtime Integration Flows

The IntegrationFlow s and therefore all its dependant components can be registered at runtime. This was done previously by the BeanFactory.registerSingleton() hook and now via newly introduced in the Spring Framework 5.0 programmatic BeanDefinition registration with the instanceSupplier hook:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

and all the necessary bean initialization and lifecycle is done automatically as it is with the standard context configuration bean definitions.

To simplify the development experience Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

This is useful when we have multi configuration options and have to create several instances of similar flows. So, we can iterate our options and create and register IntegrationFlow s within loop. Another variant when our source of data isn’t Spring-based and we must create it on the fly. Such a sample is Reactive Streams event source:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlows.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

The IntegrationFlowRegistrationBuilder (as a result of the IntegrationFlowContext.registration()) can be used to specify a bean name for the IntegrationFlow to register, to control its autoStartup and also for additional, non Integration beans registration. Usually those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP etc.), serializers/deserializers or any other required support components.

Such a dynamically registered IntegrationFlow and all its dependant beans can be removed afterwards using IntegrationFlowRegistration.destroy() callback. See IntegrationFlowContext JavaDocs for more information.

[Note]Note

Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with flow id as a prefix. It is recommended to always specify an explicit flow id, otherwise a synchronization barrier is initiated in the IntegrationFlowContext to generate the bean name for the IntegrationFlow and register its beans. We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.

Also, starting with version 5.0.6, the registration builder API has a new method useFlowIdAsPrefix(). This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions if components in the flows have the same id.

For example:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

In this case, the message handler for the first flow can be referenced with bean name tcp1.client.handler.

[Note]Note

an id is required when using useFlowIdAsPrefix().

9.21 IntegrationFlow as Gateway

The IntegrationFlow can start from the service interface providing GatewayProxyFactoryBean component:

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

All the proxy for interface methods are supplied with the channel to send messages to the next integration component in the IntegrationFlow. The service interface can be marked with the @MessagingGateway as well as methods with the @Gateway annotations. Nevertheless the requestChannel is ignored and overridden with that internal channel for the next component in the IntegrationFlow. Otherwise such a configuration via IntegrationFlow won’t make sense.

By default a GatewayProxyFactoryBean gets a conventional bean name like [FLOW_BEAN_NAME.gateway]. That id can be changed via @MessagingGateway.name() attribute or the overloaded from(Class<?> serviceInterface, String beanName) factory method.

With the Java 8 on board we even can create such an Integration Gateway with the java.util.function interfaces:

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlows.from(Function.class, "errorRecovererFunction")
            .handle((GenericHandler<?>) (p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

That can be used lately as:

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;