Java DSL
The Spring Integration Java configuration and DSL provides a set of convenient builders and a fluent API that lets you configure Spring Integration message flows from Spring @Configuration
classes.
(See also Kotlin DSL.)
(See also Groovy DSL.)
The Java DSL for Spring Integration is essentially a facade for Spring Integration.
The DSL provides a simple way to embed Spring Integration Message Flows into your application by using the fluent Builder
pattern together with existing Java configuration from Spring Framework and Spring Integration.
We also use and support lambdas (available with Java 8) to further simplify Java configuration.
The cafe offers a good example of using the DSL.
The DSL is presented by the IntegrationFlow
fluent API (see IntegrationFlowBuilder
).
This produces the IntegrationFlow
component, which should be registered as a Spring bean (by using the @Bean
annotation).
The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that can accept lambdas as arguments.
The IntegrationFlowBuilder
only collects integration components (MessageChannel
instances, AbstractEndpoint
instances, and so on) in the IntegrationFlow
bean for further parsing and registration of concrete beans in the application context by the IntegrationFlowBeanPostProcessor
.
The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing. However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration’s support for the Spring Expression Language (SpEL) and inline scripting address this, but lambdas are easier and much more powerful.
The following example shows how to use Java Configuration for Spring Integration:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
The result of the preceding configuration example is that it creates, after ApplicationContext
start up, Spring Integration endpoints and message channels.
Java configuration can be used both to replace and augment XML configuration.
You need not replace all of your existing XML configuration to use Java configuration.
DSL Basics
The org.springframework.integration.dsl
package contains the IntegrationFlowBuilder
API mentioned earlier and a number of IntegrationComponentSpec
implementations, which are also builders and provide the fluent API to configure concrete endpoints.
The IntegrationFlowBuilder
infrastructure provides common enterprise integration patterns (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors.
Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:
-
transform →
Transformer
-
filter →
Filter
-
handle →
ServiceActivator
-
split →
Splitter
-
aggregate →
Aggregator
-
route →
Router
-
bridge →
Bridge
Conceptually, integration processes are constructed by composing these endpoints into one or more message flows.
Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns.
The DSL provides an IntegrationFlow
component to define a composition of channels and endpoints between them, but now IntegrationFlow
plays only the configuration role to populate real beans in the application context and is not used at runtime.
However, the bean for IntegrationFlow
can be autowired as a Lifecycle
to control start()
and stop()
for the whole flow which is delegated to all the Spring Integration components associated with this IntegrationFlow
.
The following example uses the IntegrationFlow
fluent API to define an IntegrationFlow
bean by using EIP-methods from IntegrationFlowBuilder
:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
The transform
method accepts a lambda as an endpoint argument to operate on the message payload.
The real argument of this method is a GenericTransformer<S, T>
instance.
Consequently, any of the provided transformers (ObjectToJsonTransformer
, FileToStringTransformer
, and other) can be used here.
Under the covers, IntegrationFlowBuilder
recognizes the MessageHandler
and the endpoint for it, with MessageTransformingHandler
and ConsumerEndpointFactoryBean
, respectively.
Consider another example:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
The preceding example composes a sequence of Filter → Transformer → Service Activator
.
The flow is "'one way'".
That is, it does not provide a reply message but only prints the payload to STDOUT.
The endpoints are automatically wired together by using direct channels.
Lambdas And
Message<?> ArgumentsWhen using lambdas in EIP methods, the "input" argument is generally the message payload.
If you wish to access the entire message, use one of the overloaded methods that take a
This will fail at runtime with a Instead, use:
|
Bean Definitions override
The Java DSL can register beans for the object defined in-line in the flow definition, as well as can reuse existing, injected beans.
In case of the same bean name defined for in-line object and existing bean definition, a |
Message Channels
In addition to the IntegrationFlowBuilder
with EIP methods, the Java DSL provides a fluent API to configure MessageChannel
instances.
For this purpose the MessageChannels
builder factory is provided.
The following example shows how to use it:
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
The same MessageChannels
builder factory can be used in the channel()
EIP method from IntegrationFlowBuilder
to wire endpoints, similar to wiring an input-channel
/output-channel
pair in the XML configuration.
By default, endpoints are wired with DirectChannel
instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]
.
This rule is also applied for unnamed channels produced by inline MessageChannels
builder factory usage.
However, all MessageChannels
methods have a variant that is aware of the channelId
that you can use to set the bean names for MessageChannel
instances.
The MessageChannel
references and beanName
can be used as bean-method invocations.
The following example shows the possible ways to use the channel()
EIP method:
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
means "'find and use theMessageChannel
with the "input" id, or create one'". -
fixedSubscriberChannel()
produces an instance ofFixedSubscriberChannel
and registers it with a name ofchannelFlow.channel#0
. -
channel("queueChannel")
works the same way but uses an existingqueueChannel
bean. -
channel(publishSubscribe())
is the bean-method reference. -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
is theIntegrationFlowBuilder
that exposesIntegrationComponentSpec
to theExecutorChannel
and registers it asexecutorChannel
. -
channel("output")
registers theDirectChannel
bean withoutput
as its name, as long as no beans with this name already exist.
Note: The preceding IntegrationFlow
definition is valid, and all of its channels are applied to endpoints with BridgeHandler
instances.
Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances.
Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel ) from different IntegrationFlow containers.
The following example is wrong:
|
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
The result of that bad example is the following exception:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
To make it work, you need to declare @Bean
for that channel and use its bean method from different IntegrationFlow
instances.
Pollers
Spring Integration also provides a fluent API that lets you configure PollerMetadata
for AbstractPollingEndpoint
implementations.
You can use the Pollers
builder factory to configure common bean definitions or those created from IntegrationFlowBuilder
EIP methods, as the following example shows:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
See Pollers
and PollerSpec
in the Javadoc for more information.
If you use the DSL to construct a PollerSpec as a @Bean , do not call the get() method in the bean definition.
The PollerSpec is a FactoryBean that generates the PollerMetadata object from the specification and initializes all of its properties.
|
The reactive()
Endpoint
Starting with version 5.5, the ConsumerEndpointSpec
provides a reactive()
configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
.
This option configures the target endpoint as a ReactiveStreamsConsumer
instance, independently of the input channel type, which is converted to a Flux
via IntegrationReactiveUtils.messageChannelToFlux()
.
The provided function is used from the Flux.transform()
operator to customize (publishOn()
, log()
, doOnNext()
etc.) a reactive stream source from the input channel.
The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
See Reactive Streams Support for more information.
DSL and Endpoint Configuration
All IntegrationFlowBuilder
EIP methods have a variant that applies the lambda parameter to provide options for AbstractEndpoint
instances: SmartLifecycle
, PollerMetadata
, request-handler-advice-chain
, and others.
Each of them has generic arguments, so it lets you configure an endpoint and even its MessageHandler
in the context, as the following example shows:
@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
In addition, the EndpointSpec
provides an id()
method to let you register an endpoint bean with a given bean name, rather than a generated one.
If the MessageHandler
is referenced as a bean, then any existing adviceChain
configuration will be overridden if the .advice()
method is present in the DSL definition:
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
They are not merged, only the testAdvice()
bean is used in this case.
Transformers
The DSL API provides a convenient, fluent Transformers
factory to be used as inline target object definition within the .transform()
EIP method.
The following example shows how to use it:
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
It avoids inconvenient coding using setters and makes the flow definition more straightforward.
Note that you can use Transformers
to declare target Transformer
instances as @Bean
instances and, again, use them from IntegrationFlow
definition as bean methods.
Nevertheless, the DSL parser takes care of bean declarations for inline objects, if they are not yet defined as beans.
See Transformers in the Javadoc for more information and supported factory methods.
Also see Lambdas And Message<?>
Arguments.
Inbound Channel Adapters
Typically, message flows start from an inbound channel adapter (such as <int-jdbc:inbound-channel-adapter>
).
The adapter is configured with <poller>
, and it asks a MessageSource<?>
to periodically produce messages.
Java DSL allows for starting IntegrationFlow
from a MessageSource<?>
, too.
For this purpose, the IntegrationFlow
fluent API provides an overloaded IntegrationFlow.from(MessageSource<?> messageSource)
method.
You can configure the MessageSource<?>
as a bean and provide it as an argument for that method.
The second parameter of IntegrationFlow.from()
is a Consumer<SourcePollingChannelAdapterSpec>
lambda that lets you provide options (such as PollerMetadata
or SmartLifecycle
) for the SourcePollingChannelAdapter
.
The following example shows how to use the fluent API and a lambda to create an IntegrationFlow
:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
For those cases that have no requirements to build Message
objects directly, you can use a IntegrationFlow.fromSupplier()
variant that is based on the java.util.function.Supplier
.
The result of the Supplier.get()
is automatically wrapped in a Message
(if it is not already a Message
).
Message Routers
Spring Integration natively provides specialized router types, including:
-
HeaderValueRouter
-
PayloadTypeRouter
-
ExceptionTypeRouter
-
RecipientListRouter
-
XPathRouter
As with many other DSL IntegrationFlowBuilder
EIP methods, the route()
method can apply any AbstractMessageRouter
implementation or, for convenience, a String
as a SpEL expression or a ref
-method
pair.
In addition, you can configure route()
with a lambda and use a lambda for a Consumer<RouterSpec<MethodInvokingRouter>>
.
The fluent API also provides AbstractMappingMessageRouter
options such as channelMapping(String key, String channelName)
pairs, as the following example shows:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
The following example shows a simple expression-based router:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
The routeToRecipients()
method takes a Consumer<RecipientListRouterSpec>
, as the following example shows:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
The .defaultOutputToParentFlow()
of the .routeToRecipients()
definition lets you set the router’s defaultOutput
as a gateway to continue a process for the unmatched messages in the main flow.
Also see Lambdas And Message<?>
Arguments.
Splitters
To create a splitter, use the split()
EIP method.
By default, if the payload is an Iterable
, an Iterator
, an Array
, a Stream
, or a reactive Publisher
, the split()
method outputs each item as an individual message.
It accepts a lambda, a SpEL expression, or any AbstractMessageSplitter
implementation.
Alternatively, you can use it without parameters to provide the DefaultMessageSplitter
.
The following example shows how to use the split()
method by providing a lambda:
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
The preceding example creates a splitter that splits a message containing a comma-delimited String
.
Also see Lambdas And Message<?>
Arguments.
Aggregators and Resequencers
An Aggregator
is conceptually the opposite of a Splitter
.
It aggregates a sequence of individual messages into a single message and is necessarily more complex.
By default, an aggregator returns a message that contains a collection of payloads from incoming messages.
The same rules are applied for the Resequencer
.
The following example shows a canonical example of the splitter-aggregator pattern:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
The split()
method splits the list into individual messages and sends them to the ExecutorChannel
.
The resequence()
method reorders messages by sequence details found in the message headers.
The aggregate()
method collects those messages.
However, you can change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following example:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
The preceding example correlates messages that have myCorrelationKey
headers and releases the messages once at least ten have been accumulated.
Similar lambda configurations are provided for the resequence()
EIP method.
Service Activators and the .handle()
method
The .handle()
EIP method’s goal is to invoke any MessageHandler
implementation or any method on some POJO.
Another option is to define an “activity” by using lambda expressions.
Consequently, we introduced a generic GenericHandler<P>
functional interface.
Its handle
method requires two arguments: P payload
and MessageHeaders headers
(starting with version 5.1).
Having that, we can define a flow as follows:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
The preceding example doubles any integer it receives.
However, one main goal of Spring Integration is loose coupling
, through runtime type conversion from message payload to the target arguments of the message handler.
Since Java does not support generic type resolution for lambda classes, we introduced a workaround with an additional payloadType
argument for the most EIP methods and LambdaMessageProcessor
.
Doing so delegates the hard conversion work to Spring’s ConversionService
, which uses the provided type
and the requested message to target method arguments.
The following example shows what the resulting IntegrationFlow
might look like:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
We also can register some BytesToIntegerConverter
within ConversionService
to get rid of that additional .transform()
:
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
Also see Lambdas And Message<?>
Arguments.
Operator gateway()
The gateway()
operator in an IntegrationFlow
definition is a special service activator implementation, to call some other endpoint or integration flow via its input channel and wait for reply.
Technically it plays the same role as a nested <gateway>
component in a <chain>
definition (see Calling a Chain from within a Chain) and allows a flow to be cleaner and more straightforward.
Logically, and from business perspective, it is a messaging gateway to allow the distribution and reuse of functionality between different parts of the target integration solution (see Messaging Gateways).
This operator has several overloads for different goals:
-
gateway(String requestChannel)
to send a message to some endpoint’s input channel by its name; -
gateway(MessageChannel requestChannel)
to send a message to some endpoint’s input channel by its direct injection; -
gateway(IntegrationFlow flow)
to send a message to the input channel of the providedIntegrationFlow
.
All of these have a variant with the second Consumer<GatewayEndpointSpec>
argument to configure the target GatewayMessageHandler
and respective AbstractEndpoint
.
Also, the IntegrationFlow
-based methods allows calling existing IntegrationFlow
bean or declare the flow as a sub-flow via an in-place lambda for an IntegrationFlow
functional interface or have it extracted in a private
method cleaner code style:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
If the downstream flow does not always return a reply, you should set the requestTimeout to 0 to prevent hanging the calling thread indefinitely.
In that case, the flow will end at that point and the thread released for further work.
|
Operator log()
For convenience, to log the message journey through the Spring Integration flow (<logging-channel-adapter>
), a log()
operator is presented.
Internally, it is represented by the WireTap
ChannelInterceptor
with a LoggingHandler
as its subscriber.
It is responsible for logging the incoming message into the next endpoint or the current channel.
The following example shows how to use LoggingHandler
:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
In the preceding example, an id
header is logged at the ERROR
level onto test.category
only for messages that passed the filter and before routing.
Starting with version 6.0, the behavior of this operator in the end of flow is aligned with its usage in the middle.
In other words the behavior of the flow remains the same even if the log()
operator is removed.
So, if a reply is not expected to be produced in the end of the flow, the nullChannel()
is recommended to be used after the last log()
.
Operator intercept()
Starting with version 5.3, the intercept()
operator allows to register one or more ChannelInterceptor
instances at the current MessageChannel
in the flow.
This is an alternative to creating an explicit MessageChannel
via the MessageChannels
API.
The following example uses a MessageSelectingInterceptor
to reject certain messages with an exception:
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
Spring Integration includes a .wireTap()
fluent API MessageChannelSpec
builders.
The following example shows how to use the wireTap
method to log input:
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
If the
|
When the current MessageChannel
does not implement InterceptableChannel
, an implicit DirectChannel
and BridgeHandler
are injected into the IntegrationFlow
, and the WireTap
is added to this new DirectChannel
.
The following example does not have any channel declaration:
.handle(...)
.log()
}
In the preceding example (and any time no channel has been declared), an implicit DirectChannel
is injected in the current position of the IntegrationFlow
and used as an output channel for the currently configured ServiceActivatingHandler
(from the .handle()
, described earlier).
Working With Message Flows
IntegrationFlowBuilder
provides a top-level API to produce integration components wired to message flows.
When your integration may be accomplished with a single flow (which is often the case), this is convenient.
Alternately IntegrationFlow
instances can be joined via MessageChannel
instances.
By default, MessageFlow
behaves as a “chain” in Spring Integration parlance.
That is, the endpoints are automatically and implicitly wired by DirectChannel
instances.
The message flow is not actually constructed as a chain, which offers much more flexibility.
For example, you may send a message to any component within the flow, if you know its inputChannel
name (that is, if you explicitly define it).
You may also reference externally defined channels within a flow to allow the use of channel adapters (to enable remote transport protocols, file I/O, and so on), instead of direct channels.
As such, the DSL does not support the Spring Integration chain
element, because it does not add much value in this case.
Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration
infrastructure, it can be used together with XML definitions and wired with Spring Integration messaging annotation configuration.
You can also define direct IntegrationFlow
instances by using a lambda.
The following example shows how to do so:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
The result of this definition is the same set of integration components that are wired with an implicit direct channel.
The only limitation here is that this flow is started with a named direct channel - lambdaFlow.input
.
Also, a Lambda flow cannot start from MessageSource
or MessageProducer
.
Starting with version 5.1, this kind of IntegrationFlow
is wrapped to the proxy to expose lifecycle control and provide access to the inputChannel
of the internally associated StandardIntegrationFlow
.
Starting with version 5.0.6, the generated bean names for the components in an IntegrationFlow
include the flow bean followed by a dot (.
) as a prefix.
For example, the ConsumerEndpointFactoryBean
for the .transform("Hello "::concat)
in the preceding sample results in a bean name of lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
.
(The o.s.i
is a shortened from org.springframework.integration
to fit on the page.)
The Transformer
implementation bean for that endpoint has a bean name of lambdaFlow.transformer#0
(starting with version 5.1), where instead of a fully qualified name of the MethodInvokingTransformer
class, its component type is used.
The same pattern is applied for all the NamedComponent
s when the bean name has to be generated within the flow.
These generated bean names are prepended with the flow ID for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime.
See Dynamic and Runtime Integration Flows for more information.
FunctionExpression
We introduced the FunctionExpression
class (an implementation of SpEL’s Expression
interface) to let us use lambdas and generics
.
The Function<T, R>
option is provided for the DSL components, along with an expression
option, when there is the implicit Strategy
variant from Core Spring Integration.
The following example shows how to use a function expression:
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
The FunctionExpression
also supports runtime type conversion, as is done in SpelExpression
.
Sub-flows support
Some of if…else
and publish-subscribe
components provide the ability to specify their logic or mapping by using sub-flows.
The simplest sample is .publishSubscribeChannel()
, as the following example shows:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
You can achieve the same result with separate IntegrationFlow
@Bean
definitions, but we hope you find the sub-flow style of logic composition useful.
We find that it results in shorter (and so more readable) code.
Starting with version 5.3, a BroadcastCapableChannel
-based publishSubscribeChannel()
implementation is provided to configure sub-flow subscribers on broker-backed message channels.
For example, we now can configure several subscribers as sub-flows on the Jms.publishSubscribeChannel()
:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
A similar publish-subscribe
sub-flow composition provides the .routeToRecipients()
method.
Another example is using .discardFlow()
instead of .discardChannel()
on the .filter()
method.
The .route()
deserves special attention.
Consider the following example:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
The .channelMapping()
continues to work as it does in regular Router
mapping, but the .subFlowMapping()
tied that sub-flow to the main flow.
In other words, any router’s sub-flow returns to the main flow after .route()
.
Sometimes, you need to refer to an existing
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed. |
Sub-flows can be nested to any depth, but we do not recommend doing so. In fact, even in the router case, adding complex sub-flows within a flow would quickly begin to look like a plate of spaghetti and be difficult for a human to parse.
In cases where the DSL supports a subflow configuration, when a channel is normally needed for the component being configured, and that subflow starts with a
the Framework internally creates a |
Using Protocol Adapters
All the examples shown so far illustrate how the DSL supports a messaging architecture by using the Spring Integration programming model. However, we have yet to do any real integration. Doing so requires access to remote resources over HTTP, JMS, AMQP, TCP, JDBC, FTP, SMTP, and so on or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them, but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL is continually catching up with Spring Integration.
Consequently, we provide the high-level API to seamlessly define protocol-specific messaging.
We do so with the factory and builder patterns and with lambdas.
You can think of the factory classes as “Namespace Factories”, because they play the same role as the XML namespace for components from the concrete protocol-specific Spring Integration modules.
Currently, Spring Integration Java DSL supports the Amqp
, Feed
, Jms
, Files
, (S)Ftp
, Http
, JPA
, MongoDb
, TCP/UDP
, Mail
, WebFlux
, and Scripts
namespace factories.
The following example shows how to use three of them (Amqp
, Jms
, and Mail
):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
The preceding example shows how to use the “namespace factories” as inline adapters declarations.
However, you can use them from @Bean
definitions to make the IntegrationFlow
method chain more readable.
We are soliciting community feedback on these namespace factories before we spend effort on others. We also appreciate any input into prioritization for which adapters and gateways we should support next. |
You can find more Java DSL samples in the protocol-specific chapters throughout this reference manual.
All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow
, as the following examples show:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
IntegrationFlowAdapter
The IntegrationFlow
interface can be implemented directly and specified as a component for scanning, as the following example shows:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
It is picked up by the IntegrationFlowBeanPostProcessor
and correctly parsed and registered in the application context.
For convenience and to gain the benefits of loosely coupled architecture, we provide the IntegrationFlowAdapter
base class implementation.
It requires a buildFlow()
method implementation to produce an IntegrationFlowDefinition
by using one of from()
methods, as the following example shows:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
Dynamic and Runtime Integration Flows
IntegrationFlow
and all its dependent components can be registered at runtime.
Before version 5.0, we used the BeanFactory.registerSingleton()
hook.
Starting in the Spring Framework 5.0
, we use the instanceSupplier
hook for programmatic BeanDefinition
registration.
The following example shows how to programmatically register a bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
Note that, in the preceding example, the instanceSupplier
hook is the last parameter to the genericBeanDefinition
method, provided by a lambda in this case.
All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.
To simplify the development experience, Spring Integration introduced IntegrationFlowContext
to register and manage IntegrationFlow
instances at runtime, as the following example shows:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
This is useful when we have multiple configuration options and have to create several instances of similar flows.
To do so, we can iterate our options and create and register IntegrationFlow
instances within a loop.
Another variant is when our source of data is not Spring-based, so we must create it on the fly.
Such a sample is Reactive Streams event source, as the following example shows:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
The IntegrationFlowRegistrationBuilder
(as a result of the IntegrationFlowContext.registration()
) can be used to specify a bean name for the IntegrationFlow
to register, to control its autoStartup
, and to register, non-Spring Integration beans.
Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.
You can use the IntegrationFlowRegistration.destroy()
callback to remove a dynamically registered IntegrationFlow
and all its dependent beans when you no longer need them.
See the IntegrationFlowContext
Javadoc for more information.
Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with the flow ID as a prefix.
We recommend always specifying an explicit flow ID.
Otherwise, a synchronization barrier is initiated in the IntegrationFlowContext , to generate the bean name for the IntegrationFlow and register its beans.
We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.
|
Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix()
.
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler
.
An id attribute is required when you usE useFlowIdAsPrefix() .
|
IntegrationFlow
as a Gateway
The IntegrationFlow
can start from the service interface that provides a GatewayProxyFactoryBean
component, as the following example shows:
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}
All the proxy for interface methods are supplied with the channel to send messages to the next integration component in the IntegrationFlow
.
You can mark the service interface with the @MessagingGateway
annotation and mark the methods with the @Gateway
annotations.
Nevertheless, the requestChannel
is ignored and overridden with that internal channel for the next component in the IntegrationFlow
.
Otherwise, creating such a configuration by using IntegrationFlow
does not make sense.
By default, a GatewayProxyFactoryBean
gets a conventional bean name, such as [FLOW_BEAN_NAME.gateway]
.
You can change that ID by using the @MessagingGateway.name()
attribute or the overloaded IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
factory method.
Also, all the attributes from the @MessagingGateway
annotation on the interface are applied to the target GatewayProxyFactoryBean
.
When annotation configuration is not applicable, the Consumer<GatewayProxySpec>
variant can be used for providing appropriate option for the target proxy.
This DSL method is available starting with version 5.2.
With Java 8, you can even create an integration gateway with the java.util.function
interfaces, as the following example shows:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
That errorRecovererFlow
can be used as follows:
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL Extensions
Starting with version 5.3, an IntegrationFlowExtension
has been introduced to allow extension of the existing Java DSL with custom or composed EIP-operators.
All that is needed is an extension of this class that provides methods which can be used in the IntegrationFlow
bean definitions.
The extension class can also be used for custom IntegrationComponentSpec
configuration; for example, missed or default options can be implemented in the existing IntegrationComponentSpec
extension.
The sample below demonstrates a composite custom operator and usage of an AggregatorSpec
extension for a default custom outputProcessor
:
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
For a method chain flow the new DSL operator in these extensions must return the extension class.
This way a target IntegrationFlow
definition will work with new and existing DSL operators:
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
Integration Flows Composition
With the MessageChannel
abstraction as a first class citizen in Spring Integration, the composition of integration flows was always assumed.
The input channel of any endpoint in the flow can be used to send messages from any other endpoint and not only from the one which has this channel as an output.
Furthermore, with a @MessagingGateway
contract, Content Enricher components, composite endpoints like a <chain>
, and now with IntegrationFlow
beans (e.g. IntegrationFlowAdapter
), it is straightforward enough to distribute the business logic between shorter, reusable parts.
All that is needed for the final composition is knowledge about a MessageChannel
to send to or receive from.
Starting with version 5.5.4
, to abstract more from MessageChannel
and hide implementation details from the end-user, the IntegrationFlow
introduces the from(IntegrationFlow)
factory method to allow starting the current IntegrationFlow
from the output of an existing flow:
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
On the other hand, the IntegrationFlowDefinition
has added a to(IntegrationFlow)
terminal operator to continue the current flow at the input channel of some other flow:
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
The composition in the middle of the flow is simply achievable with an existing gateway(IntegrationFlow)
EIP-method.
This way we can build flows with any complexity by composing them from simpler, reusable logical blocks.
For example, you may add a library of IntegrationFlow
beans as a dependency and it is just enough to have their configuration classes imported to the final project and autowired for your IntegrationFlow
definitions.