Java DSL

The Spring Integration Java configuration and DSL provides a set of convenient builders and a fluent API that lets you configure Spring Integration message flows from Spring @Configuration classes.

(See also Kotlin DSL.)

(See also Groovy DSL.)

The Java DSL for Spring Integration is essentially a facade for Spring Integration. The DSL provides a simple way to embed Spring Integration Message Flows into your application by using the fluent Builder pattern together with existing Java configuration from Spring Framework and Spring Integration. We also use and support lambdas (available with Java 8) to further simplify Java configuration.

The cafe offers a good example of using the DSL.

The DSL is presented by the IntegrationFlow fluent API (see IntegrationFlowBuilder). This produces the IntegrationFlow component, which should be registered as a Spring bean (by using the @Bean annotation). The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that can accept lambdas as arguments.

The IntegrationFlowBuilder only collects integration components (MessageChannel instances, AbstractEndpoint instances, and so on) in the IntegrationFlow bean for further parsing and registration of concrete beans in the application context by the IntegrationFlowBeanPostProcessor.

The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing. However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration’s support for the Spring Expression Language (SpEL) and inline scripting address this, but lambdas are easier and much more powerful.

The following example shows how to use Java Configuration for Spring Integration:

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow(AtomicInteger integerSource) {
        return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

The result of the preceding configuration example is that it creates, after ApplicationContext start up, Spring Integration endpoints and message channels. Java configuration can be used both to replace and augment XML configuration. You need not replace all of your existing XML configuration to use Java configuration.

DSL Basics

The org.springframework.integration.dsl package contains the IntegrationFlowBuilder API mentioned earlier and a number of IntegrationComponentSpec implementations, which are also builders and provide the fluent API to configure concrete endpoints. The IntegrationFlowBuilder infrastructure provides common enterprise integration patterns (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors.

IMPORTANT

The IntegrationComponentSpec is a FactoryBean implementation, therefore its getObject() method must not be called from bean definitions. The IntegrationComponentSpec implementation must be left as is for bean definitions and the framework will manage its lifecycle. Bean method parameter injection for the target IntegrationComponentSpec type (a FactoryBean value) must be used for IntegrationFlow bean definitions instead of bean method references.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform → Transformer

  • filter → Filter

  • handle → ServiceActivator

  • split → Splitter

  • aggregate → Aggregator

  • route → Router

  • bridge → Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime. However, the bean for IntegrationFlow can be autowired as a Lifecycle to control start() and stop() for the whole flow which is delegated to all the Spring Integration components associated with this IntegrationFlow. The following example uses the IntegrationFlow fluent API to define an IntegrationFlow bean by using EIP-methods from IntegrationFlowBuilder:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

The transform method accepts a lambda as an endpoint argument to operate on the message payload. The real argument of this method is a GenericTransformer<S, T> instance. Consequently, any of the provided transformers (ObjectToJsonTransformer, FileToStringTransformer, and other) can be used here.

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and the endpoint for it, with MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Consider another example:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

The preceding example composes a sequence of Filter → Transformer → Service Activator. The flow is "'one way'". That is, it does not provide a reply message but only prints the payload to STDOUT. The endpoints are automatically wired together by using direct channels.

Lambdas And Message<?> Arguments

When using lambdas in EIP methods, the "input" argument is generally the message payload. If you wish to access the entire message, use one of the overloaded methods that take a Class<?> as the first parameter. For example, this won’t work:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

This will fail at runtime with a ClassCastException because the lambda doesn’t retain the argument type and the framework will attempt to cast the payload to a Message<?>.

Instead, use:

.(Message.class, m -> newFooFromMessage(m))
Bean Definitions override

The Java DSL can register beans for the object defined in-line in the flow definition, as well as can reuse existing, injected beans. In case of the same bean name defined for in-line object and existing bean definition, a BeanDefinitionOverrideException is thrown indicating that such a configuration is wrong. However, when you deal with prototype beans, there is no way to detect from the integration flow processor an existing bean definition because every time we call a prototype bean from the BeanFactory we get a new instance. This way a provided instance is used in the IntegrationFlow as is without any bean registration and any possible check against existing prototype bean definition. However BeanFactory.initializeBean() is called for this object if it has an explicit id and bean definition for this name is in prototype scope.

Message Channels

In addition to the IntegrationFlowBuilder with EIP methods, the Java DSL provides a fluent API to configure MessageChannel instances. For this purpose the MessageChannels builder factory is provided. The following example shows how to use it:

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

The same MessageChannels builder factory can be used in the channel() EIP method from IntegrationFlowBuilder to wire endpoints, similar to wiring an input-channel/output-channel pair in the XML configuration. By default, endpoints are wired with DirectChannel instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]. This rule is also applied for unnamed channels produced by inline MessageChannels builder factory usage. However, all MessageChannels methods have a variant that is aware of the channelId that you can use to set the bean names for MessageChannel instances. The MessageChannel references and beanName can be used as bean-method invocations. The following example shows the possible ways to use the channel() EIP method:

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") means "'find and use the MessageChannel with the "input" id, or create one'".

  • fixedSubscriberChannel() produces an instance of FixedSubscriberChannel and registers it with a name of channelFlow.channel#0.

  • channel("queueChannel") works the same way but uses an existing queueChannel bean.

  • channel(publishSubscribe()) is the bean-method reference.

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) is the IntegrationFlowBuilder that exposes IntegrationComponentSpec to the ExecutorChannel and registers it as executorChannel.

  • channel("output") registers the DirectChannel bean with output as its name, as long as no beans with this name already exist.

Note: The preceding IntegrationFlow definition is valid, and all of its channels are applied to endpoints with BridgeHandler instances.

Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances. Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel) from different IntegrationFlow containers. The following example is wrong:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

The result of that bad example is the following exception:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

To make it work, you need to declare @Bean for that channel and use its bean method from different IntegrationFlow instances.

Pollers

Spring Integration also provides a fluent API that lets you configure PollerMetadata for AbstractPollingEndpoint implementations. You can use the Pollers builder factory to configure common bean definitions or those created from IntegrationFlowBuilder EIP methods, as the following example shows:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

See Pollers and PollerSpec in the Javadoc for more information.

If you use the DSL to construct a PollerSpec as a @Bean, do not call the getObject() method in the bean definition. The PollerSpec is a FactoryBean that generates the PollerMetadata object from the specification and initializes all of its properties.

The reactive() Endpoint

Starting with version 5.5, the ConsumerEndpointSpec provides a reactive() configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. This option configures the target endpoint as a ReactiveStreamsConsumer instance, independently of the input channel type, which is converted to a Flux via IntegrationReactiveUtils.messageChannelToFlux(). The provided function is used from the Flux.transform() operator to customize (publishOn(), log(), doOnNext() etc.) a reactive stream source from the input channel.

The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel:

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

See Reactive Streams Support for more information.

DSL and Endpoint Configuration

All IntegrationFlowBuilder EIP methods have a variant that applies the lambda parameter to provide options for AbstractEndpoint instances: SmartLifecycle, PollerMetadata, request-handler-advice-chain, and others. Each of them has generic arguments, so it lets you configure an endpoint and even its MessageHandler in the context, as the following example shows:

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlow.from(this.inputChannel)
                .transformWith(t -> t
                              .transformer(new PayloadSerializingTransformer())
                              .autoStartup(false)
                              .id("payloadSerializingTransformer"))
                .transformWith(t -> t
                              .transformer((Integer p) -> p * 2)
                              .advice(expressionAdvice()))
                .get();
}

In addition, the EndpointSpec provides an id() method to let you register an endpoint bean with a given bean name, rather than a generated one.

If the MessageHandler is referenced as a bean, then any existing adviceChain configuration will be overridden if the .advice() method is present in the DSL definition:

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

They are not merged, only the testAdvice() bean is used in this case.

Transformers

The DSL API provides a convenient, fluent Transformers factory to be used as inline target object definition within the .transform() EIP method. The following example shows how to use it:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlow.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

It avoids inconvenient coding using setters and makes the flow definition more straightforward. Note that you can use Transformers to declare target Transformer instances as @Bean instances and, again, use them from IntegrationFlow definition as bean methods. Nevertheless, the DSL parser takes care of bean declarations for inline objects, if they are not yet defined as beans.

See Transformers in the Javadoc for more information and supported factory methods.

Starting with version 6.2, a transformWith(Consumer<TransformerEndpointSpec>) variant has been introduced to have all the transformer and its endpoint options to be configured via single builder argument. This style gives DSL more readability and increases developer experience while modifying code. This also make Groovy and Kotlin DSLs more straightforward.

Inbound Channel Adapters

Typically, message flows start from an inbound channel adapter (such as <int-jdbc:inbound-channel-adapter>). The adapter is configured with <poller>, and it asks a MessageSource<?> to periodically produce messages. Java DSL allows for starting IntegrationFlow from a MessageSource<?>, too. For this purpose, the IntegrationFlow fluent API provides an overloaded IntegrationFlow.from(MessageSource<?> messageSource) method. You can configure the MessageSource<?> as a bean and provide it as an argument for that method. The second parameter of IntegrationFlow.from() is a Consumer<SourcePollingChannelAdapterSpec> lambda that lets you provide options (such as PollerMetadata or SmartLifecycle) for the SourcePollingChannelAdapter. The following example shows how to use the fluent API and a lambda to create an IntegrationFlow:

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlow.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

For those cases that have no requirements to build Message objects directly, you can use a IntegrationFlow.fromSupplier() variant that is based on the java.util.function.Supplier . The result of the Supplier.get() is automatically wrapped in a Message (if it is not already a Message).

Message Routers

Spring Integration natively provides specialized router types, including:

  • HeaderValueRouter

  • PayloadTypeRouter

  • ExceptionTypeRouter

  • RecipientListRouter

  • XPathRouter

As with many other DSL IntegrationFlowBuilder EIP methods, the route() method can apply any AbstractMessageRouter implementation or, for convenience, a String as a SpEL expression or a ref-method pair. In addition, you can configure route() with a lambda and use a lambda for a Consumer<RouterSpec<MethodInvokingRouter>>. The fluent API also provides AbstractMappingMessageRouter options such as channelMapping(String key, String channelName) pairs, as the following example shows:

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlow.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

The following example shows a simple expression-based router:

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlow.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

The routeToRecipients() method takes a Consumer<RecipientListRouterSpec>, as the following example shows:

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlow.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

The .defaultOutputToParentFlow() of the .routeToRecipients() definition lets you set the router’s defaultOutput as a gateway to continue a process for the unmatched messages in the main flow.

Splitters

To create a splitter, use the split() EIP method. By default, if the payload is an Iterable, an Iterator, an Array, a Stream, or a reactive Publisher, the split() method outputs each item as an individual message. It accepts a lambda, a SpEL expression, or any AbstractMessageSplitter implementation. Alternatively, you can use it without parameters to provide the DefaultMessageSplitter. The following example shows how to use the splitWith() method by providing a lambda:

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlow.from("splitInput")
              .splitWith(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

The preceding example creates a splitter that splits a message containing a comma-delimited String.

Aggregators and Resequencers

An Aggregator is conceptually the opposite of a Splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator returns a message that contains a collection of payloads from incoming messages. The same rules are applied for the Resequencer. The following example shows a canonical example of the splitter-aggregator pattern:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

The split() method splits the list into individual messages and sends them to the ExecutorChannel. The resequence() method reorders messages by sequence details found in the message headers. The aggregate() method collects those messages.

However, you can change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following example:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

The preceding example correlates messages that have myCorrelationKey headers and releases the messages once at least ten have been accumulated.

Similar lambda configurations are provided for the resequence() EIP method.

Service Activators and the .handle() method

The .handle() EIP method’s goal is to invoke any MessageHandler implementation or any method on some POJO. Another option is to define an “activity” by using lambda expressions. Consequently, we introduced a generic GenericHandler<P> functional interface. Its handle method requires two arguments: P payload and MessageHeaders headers (starting with version 5.1). Having that, we can define a flow as follows:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

The preceding example doubles any integer it receives.

However, one main goal of Spring Integration is loose coupling, through runtime type conversion from message payload to the target arguments of the message handler. Since Java does not support generic type resolution for lambda classes, we introduced a workaround with an additional payloadType argument for the most EIP methods and LambdaMessageProcessor. Doing so delegates the hard conversion work to Spring’s ConversionService, which uses the provided type and the requested message to target method arguments. The following example shows what the resulting IntegrationFlow might look like:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

We also can register some BytesToIntegerConverter within ConversionService to get rid of that additional .transform():

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

Operator gateway()

The gateway() operator in an IntegrationFlow definition is a special service activator implementation, to call some other endpoint or integration flow via its input channel and wait for reply. Technically it plays the same role as a nested <gateway> component in a <chain> definition (see Calling a Chain from within a Chain) and allows a flow to be cleaner and more straightforward. Logically, and from business perspective, it is a messaging gateway to allow the distribution and reuse of functionality between different parts of the target integration solution (see Messaging Gateways). This operator has several overloads for different goals:

  • gateway(String requestChannel) to send a message to some endpoint’s input channel by its name;

  • gateway(MessageChannel requestChannel) to send a message to some endpoint’s input channel by its direct injection;

  • gateway(IntegrationFlow flow) to send a message to the input channel of the provided IntegrationFlow.

All of these have a variant with the second Consumer<GatewayEndpointSpec> argument to configure the target GatewayMessageHandler and respective AbstractEndpoint. Also, the IntegrationFlow-based methods allows calling existing IntegrationFlow bean or declare the flow as a sub-flow via an in-place lambda for an IntegrationFlow functional interface or have it extracted in a private method cleaner code style:

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
If the downstream flow does not always return a reply, you should set the requestTimeout to 0 to prevent hanging the calling thread indefinitely. In that case, the flow will end at that point and the thread released for further work.

Operator log()

For convenience, to log the message journey through the Spring Integration flow (<logging-channel-adapter>), a log() operator is presented. Internally, it is represented by the WireTap ChannelInterceptor with a LoggingHandler as its subscriber. It is responsible for logging the incoming message into the next endpoint or the current channel. The following example shows how to use LoggingHandler:

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

In the preceding example, an id header is logged at the ERROR level onto test.category only for messages that passed the filter and before routing.

Starting with version 6.0, the behavior of this operator in the end of flow is aligned with its usage in the middle. In other words the behavior of the flow remains the same even if the log() operator is removed. So, if a reply is not expected to be produced in the end of the flow, the nullChannel() is recommended to be used after the last log().

Operator intercept()

Starting with version 5.3, the intercept() operator allows to register one or more ChannelInterceptor instances at the current MessageChannel in the flow. This is an alternative to creating an explicit MessageChannel via the MessageChannels API. The following example uses a MessageSelectingInterceptor to reject certain messages with an exception:

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring Integration includes a .wireTap() fluent API MessageChannelSpec builders. The following example shows how to use the wireTap method to log input:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

If the MessageChannel is an instance of InterceptableChannel, the log(), wireTap() or intercept() operators are applied to the current MessageChannel. Otherwise, an intermediate DirectChannel is injected into the flow for the currently configured endpoint. In the following example, the WireTap interceptor is added to myChannel directly, because DirectChannel implements InterceptableChannel:

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

When the current MessageChannel does not implement InterceptableChannel, an implicit DirectChannel and BridgeHandler are injected into the IntegrationFlow, and the WireTap is added to this new DirectChannel. The following example does not have any channel declaration:

.handle(...)
.log()
}

In the preceding example (and any time no channel has been declared), an implicit DirectChannel is injected in the current position of the IntegrationFlow and used as an output channel for the currently configured ServiceActivatingHandler (from the .handle(), described earlier).

Working With Message Flows

IntegrationFlowBuilder provides a top-level API to produce integration components wired to message flows. When your integration may be accomplished with a single flow (which is often the case), this is convenient. Alternately IntegrationFlow instances can be joined via MessageChannel instances.

By default, MessageFlow behaves as a “chain” in Spring Integration parlance. That is, the endpoints are automatically and implicitly wired by DirectChannel instances. The message flow is not actually constructed as a chain, which offers much more flexibility. For example, you may send a message to any component within the flow, if you know its inputChannel name (that is, if you explicitly define it). You may also reference externally defined channels within a flow to allow the use of channel adapters (to enable remote transport protocols, file I/O, and so on), instead of direct channels. As such, the DSL does not support the Spring Integration chain element, because it does not add much value in this case.

Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration infrastructure, it can be used together with XML definitions and wired with Spring Integration messaging annotation configuration.

You can also define direct IntegrationFlow instances by using a lambda. The following example shows how to do so:

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

The result of this definition is the same set of integration components that are wired with an implicit direct channel. The only limitation here is that this flow is started with a named direct channel - lambdaFlow.input. Also, a Lambda flow cannot start from MessageSource or MessageProducer.

Starting with version 5.1, this kind of IntegrationFlow is wrapped to the proxy to expose lifecycle control and provide access to the inputChannel of the internally associated StandardIntegrationFlow.

Starting with version 5.0.6, the generated bean names for the components in an IntegrationFlow include the flow bean followed by a dot (.) as a prefix. For example, the ConsumerEndpointFactoryBean for the .transform("Hello "::concat) in the preceding sample results in a bean name of lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0. (The o.s.i is a shortened from org.springframework.integration to fit on the page.) The Transformer implementation bean for that endpoint has a bean name of lambdaFlow.transformer#0 (starting with version 5.1), where instead of a fully qualified name of the MethodInvokingTransformer class, its component type is used. The same pattern is applied for all the NamedComponent s when the bean name has to be generated within the flow. These generated bean names are prepended with the flow ID for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime. See Dynamic and Runtime Integration Flows for more information.

FunctionExpression

We introduced the FunctionExpression class (an implementation of SpEL’s Expression interface) to let us use lambdas and generics. The Function<T, R> option is provided for the DSL components, along with an expression option, when there is the implicit Strategy variant from Core Spring Integration. The following example shows how to use a function expression:

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

The FunctionExpression also supports runtime type conversion, as is done in SpelExpression.

Sub-flows support

Some of if…​else and publish-subscribe components provide the ability to specify their logic or mapping by using sub-flows. The simplest sample is .publishSubscribeChannel(), as the following example shows:

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

You can achieve the same result with separate IntegrationFlow @Bean definitions, but we hope you find the sub-flow style of logic composition useful. We find that it results in shorter (and so more readable) code.

Starting with version 5.3, a BroadcastCapableChannel-based publishSubscribeChannel() implementation is provided to configure sub-flow subscribers on broker-backed message channels. For example, we now can configure several subscribers as sub-flows on the Jms.publishSubscribeChannel():

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

A similar publish-subscribe sub-flow composition provides the .routeToRecipients() method.

Another example is using .discardFlow() instead of .discardChannel() on the .filter() method.

The .route() deserves special attention. Consider the following example:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

The .channelMapping() continues to work as it does in regular Router mapping, but the .subFlowMapping() tied that sub-flow to the main flow. In other words, any router’s sub-flow returns to the main flow after .route().

Sometimes, you need to refer to an existing IntegrationFlow @Bean from the .subFlowMapping(). The following example shows how to do so:

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


In this case, when you need to receive a reply from such a sub-flow and continue the main flow, this IntegrationFlow bean reference (or its input channel) has to be wrapped with a .gateway() as shown in the preceding example. The oddFlow() reference in the preceding example is not wrapped to the .gateway(). Therefore, we do not expect a reply from this routing branch. Otherwise, you end up with an exception similar to the following:

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed.

Sub-flows can be nested to any depth, but we do not recommend doing so. In fact, even in the router case, adding complex sub-flows within a flow would quickly begin to look like a plate of spaghetti and be difficult for a human to parse.

In cases where the DSL supports a subflow configuration, when a channel is normally needed for the component being configured, and that subflow starts with a channel() element, the framework implicitly places a bridge() between the component output channel and the flow’s input channel. For example, in this filter definition:

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

the Framework internally creates a DirectChannel bean for injecting into the MessageFilter.discardChannel. Then it wraps the subflow into an IntegrationFlow starting with this implicit channel for the subscription and places a bridge before the channel() specified in the flow. When an existing IntegrationFlow bean is used as a subflow reference (instead of an inline subflow, e.g. a lambda), there is no such bridge required because the framework can resolve the first channel from the flow bean. With an inline subflow, the input channel is not yet available.

Using Protocol Adapters

All the examples shown so far illustrate how the DSL supports a messaging architecture by using the Spring Integration programming model. However, we have yet to do any real integration. Doing so requires access to remote resources over HTTP, JMS, AMQP, TCP, JDBC, FTP, SMTP, and so on or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them, but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL is continually catching up with Spring Integration.

Consequently, we provide the high-level API to seamlessly define protocol-specific messaging. We do so with the factory and builder patterns and with lambdas. You can think of the factory classes as “Namespace Factories”, because they play the same role as the XML namespace for components from the concrete protocol-specific Spring Integration modules. Currently, Spring Integration Java DSL supports the Amqp, Feed, Jms, Files, (S)Ftp, Http, JPA, MongoDb, TCP/UDP, Mail, WebFlux, and Scripts namespace factories. The following example shows how to use three of them (Amqp, Jms, and Mail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

The preceding example shows how to use the “namespace factories” as inline adapters declarations. However, you can use them from @Bean definitions to make the IntegrationFlow method chain more readable.

We are soliciting community feedback on these namespace factories before we spend effort on others. We also appreciate any input into prioritization for which adapters and gateways we should support next.

You can find more Java DSL samples in the protocol-specific chapters throughout this reference manual.

All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow, as the following examples show:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

The IntegrationFlow interface can be implemented directly and specified as a component for scanning, as the following example shows:

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

It is picked up by the IntegrationFlowBeanPostProcessor and correctly parsed and registered in the application context.

For convenience and to gain the benefits of loosely coupled architecture, we provide the IntegrationFlowAdapter base class implementation. It requires a buildFlow() method implementation to produce an IntegrationFlowDefinition by using one of from() methods, as the following example shows:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Instant nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : Instant.now();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return fromSupplier(this::messageSource,
                e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                .split(this)
                .transform(this)
                .aggregate(this)
                .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                .filter(this)
                .handle(this)
                .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
        return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
        return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
        return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
        return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
        return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
        return payload + ":" + thing1;
    }

}

Dynamic and Runtime Integration Flows

IntegrationFlow and all its dependent components can be registered at runtime. Before version 5.0, we used the BeanFactory.registerSingleton() hook. Starting in the Spring Framework 5.0, we use the instanceSupplier hook for programmatic BeanDefinition registration. The following example shows how to programmatically register a bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

Note that, in the preceding example, the instanceSupplier hook is the last parameter to the genericBeanDefinition method, provided by a lambda in this case.

All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.

To simplify the development experience, Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime, as the following example shows:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based, so we must create it on the fly. Such a sample is Reactive Streams event source, as the following example shows:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

The IntegrationFlowRegistrationBuilder (as a result of the IntegrationFlowContext.registration()) can be used to specify a bean name for the IntegrationFlow to register, to control its autoStartup, and to register, non-Spring Integration beans. Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.

You can use the IntegrationFlowRegistration.destroy() callback to remove a dynamically registered IntegrationFlow and all its dependent beans when you no longer need them. See the IntegrationFlowContext Javadoc for more information.

Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with the flow ID as a prefix. We recommend always specifying an explicit flow ID. Otherwise, a synchronization barrier is initiated in the IntegrationFlowContext, to generate the bean name for the IntegrationFlow and register its beans. We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.

Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix(). This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler.

An id attribute is required when you usE useFlowIdAsPrefix().

IntegrationFlow as a Gateway

The IntegrationFlow can start from the service interface that provides a GatewayProxyFactoryBean component, as the following example shows:

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

All the proxy for interface methods are supplied with the channel to send messages to the next integration component in the IntegrationFlow. You can mark the service interface with the @MessagingGateway annotation and mark the methods with the @Gateway annotations. Nevertheless, the requestChannel is ignored and overridden with that internal channel for the next component in the IntegrationFlow. Otherwise, creating such a configuration by using IntegrationFlow does not make sense.

By default, a GatewayProxyFactoryBean gets a conventional bean name, such as [FLOW_BEAN_NAME.gateway]. You can change that ID by using the @MessagingGateway.name() attribute or the overloaded IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer) factory method. Also, all the attributes from the @MessagingGateway annotation on the interface are applied to the target GatewayProxyFactoryBean. When annotation configuration is not applicable, the Consumer<GatewayProxySpec> variant can be used for providing appropriate option for the target proxy. This DSL method is available starting with version 5.2.

With Java 8, you can even create an integration gateway with the java.util.function interfaces, as the following example shows:

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .<Object>handle((p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

That errorRecovererFlow can be used as follows:

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL Extensions

Starting with version 5.3, an IntegrationFlowExtension has been introduced to allow extension of the existing Java DSL with custom or composed EIP-operators. All that is needed is an extension of this class that provides methods which can be used in the IntegrationFlow bean definitions. The extension class can also be used for custom IntegrationComponentSpec configuration; for example, missed or default options can be implemented in the existing IntegrationComponentSpec extension. The sample below demonstrates a composite custom operator and usage of an AggregatorSpec extension for a default custom outputProcessor:

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

For a method chain flow the new DSL operator in these extensions must return the extension class. This way a target IntegrationFlow definition will work with new and existing DSL operators:

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

Integration Flows Composition

With the MessageChannel abstraction as a first class citizen in Spring Integration, the composition of integration flows was always assumed. The input channel of any endpoint in the flow can be used to send messages from any other endpoint and not only from the one which has this channel as an output. Furthermore, with a @MessagingGateway contract, Content Enricher components, composite endpoints like a <chain>, and now with IntegrationFlow beans (e.g. IntegrationFlowAdapter), it is straightforward enough to distribute the business logic between shorter, reusable parts. All that is needed for the final composition is knowledge about a MessageChannel to send to or receive from.

Starting with version 5.5.4, to abstract more from MessageChannel and hide implementation details from the end-user, the IntegrationFlow introduces the from(IntegrationFlow) factory method to allow starting the current IntegrationFlow from the output of an existing flow:

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlow.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlow.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

On the other hand, the IntegrationFlowDefinition has added a to(IntegrationFlow) terminal operator to continue the current flow at the input channel of some other flow:

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

The composition in the middle of the flow is simply achievable with an existing gateway(IntegrationFlow) EIP-method. This way we can build flows with any complexity by composing them from simpler, reusable logical blocks. For example, you may add a library of IntegrationFlow beans as a dependency, and it is just enough to have their configuration classes imported to the final project and autowired for your IntegrationFlow definitions.