DSL Basics

The org.springframework.integration.dsl package contains the IntegrationFlowBuilder API mentioned earlier and a number of IntegrationComponentSpec implementations, which are also builders and provide the fluent API to configure concrete endpoints. The IntegrationFlowBuilder infrastructure provides common enterprise integration patterns (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors.

IMPORTANT

The IntegrationComponentSpec is a FactoryBean implementation, therefore its getObject() method must not be called from bean definitions. The IntegrationComponentSpec implementation must be left as is for bean definitions and the framework will manage its lifecycle. Bean method parameter injection for the target IntegrationComponentSpec type (a FactoryBean value) must be used for IntegrationFlow bean definitions instead of bean method references.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform → Transformer

  • filter → Filter

  • handle → ServiceActivator

  • split → Splitter

  • aggregate → Aggregator

  • route → Router

  • bridge → Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime. However, the bean for IntegrationFlow can be autowired as a Lifecycle to control start() and stop() for the whole flow which is delegated to all the Spring Integration components associated with this IntegrationFlow. The following example uses the IntegrationFlow fluent API to define an IntegrationFlow bean by using EIP-methods from IntegrationFlowBuilder:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

The transform method accepts a lambda as an endpoint argument to operate on the message payload. The real argument of this method is a GenericTransformer<S, T> instance. Consequently, any of the provided transformers (ObjectToJsonTransformer, FileToStringTransformer, and other) can be used here.

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and the endpoint for it, with MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Consider another example:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

The preceding example composes a sequence of Filter → Transformer → Service Activator. The flow is "'one way'". That is, it does not provide a reply message but only prints the payload to STDOUT. The endpoints are automatically wired together by using direct channels.

Lambdas And Message<?> Arguments

When using lambdas in EIP methods, the "input" argument is generally the message payload. If you wish to access the entire message, use one of the overloaded methods that take a Class<?> as the first parameter. For example, this won’t work:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

This will fail at runtime with a ClassCastException because the lambda doesn’t retain the argument type and the framework will attempt to cast the payload to a Message<?>.

Instead, use:

.(Message.class, m -> newFooFromMessage(m))
Bean Definitions override

The Java DSL can register beans for the object defined in-line in the flow definition, as well as can reuse existing, injected beans. In case of the same bean name defined for in-line object and existing bean definition, a BeanDefinitionOverrideException is thrown indicating that such a configuration is wrong. However, when you deal with prototype beans, there is no way to detect from the integration flow processor an existing bean definition because every time we call a prototype bean from the BeanFactory we get a new instance. This way a provided instance is used in the IntegrationFlow as is without any bean registration and any possible check against existing prototype bean definition. However BeanFactory.initializeBean() is called for this object if it has an explicit id and bean definition for this name is in prototype scope.