The Spring Integration JavaConfig and DSL provides a set of convenient Builders and a fluent API to configure Spring Integration message flows from Spring @Configuration
classes.
@Configuration @EnableIntegration public class MyConfiguration { @Bean public AtomicInteger integerSource() { return new AtomicInteger(); } @Bean public IntegrationFlow myFlow() { return IntegrationFlows.from(integerSource::getAndIncrement, c -> c.poller(Pollers.fixedRate(100))) .channel("inputChannel") .filter((Integer p) -> p > 0) .transform(Object::toString) .channel(MessageChannels.queue()) .get(); } }
As the result after ApplicationContext
start up Spring Integration endpoints and Message Channels will be created as is the case after XML parsing.
Such configuration can be used to replace XML configuration or along side with it.
The Java DSL for Spring Integration is essentially a facade for Spring Integration.
The DSL provides a simple way to embed Spring Integration Message Flows into your application using the fluent Builder
pattern together with existing Java and Annotation configurations from Spring Framework and Spring Integration as well.
Another useful tool to simplify configuration is Java 8 Lambdas.
The cafe is a good example of using the DSL.
The DSL is presented by the IntegrationFlows
Factory for the IntegrationFlowBuilder
.
This produces the IntegrationFlow
component, which should be registered as a Spring bean (@Bean
).
The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that may accept Lambdas as arguments.
The IntegrationFlowBuilder
just collects integration components (MessageChannel
s, AbstractEndpoint
s etc.) in the IntegrationFlow
bean for further parsing and registration of concrete beans in the application context by the IntegrationFlowBeanPostProcessor
.
The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing. However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline Lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration’s support for the Spring Expression Language (SpEL) and inline scripting address this, but Java Lambdas are easier and much more powerful.
The org.springframework.integration.dsl
package contains the IntegrationFlowBuilder
API mentioned above and a bunch of IntegrationComponentSpec
implementations which are builders too and provide the fluent API to configure concrete endpoints.
The IntegrationFlowBuilder
infrastructure provides common EIP for message based applications, such as channels, endpoints, pollers and channel interceptors.
Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:
Transformer
Filter
ServiceActivator
Splitter
Aggregator
Router
Bridge
Conceptually, integration processes are constructed by composing these endpoints into one or more message flows.
Note that EIP does not formally define the term message flow, but it is useful to think of it as a unit of work that uses well known messaging patterns.
The DSL provides an IntegrationFlow
component to define a composition of channels and endpoints between them, but now IntegrationFlow
plays only the configuration role to populate real beans in the application context and isn’t used at runtime:
@Bean public IntegrationFlow integerFlow() { return IntegrationFlows.from("input") .<String, Integer>transform(Integer::parseInt) .get(); }
Here we use the IntegrationFlows
factory to define an IntegrationFlow
bean using EIP-methods from IntegrationFlowBuilder
.
The transform
method accepts a Lambda as an endpoint argument to operate on the message payload.
The real argument of this method is GenericTransformer<S, T>
, hence any out-of-the-box transformers (ObjectToJsonTransformer
, FileToStringTransformer
etc.) can be used here.
Under the covers, IntegrationFlowBuilder
recognizes the MessageHandler
and endpoint for that: MessageTransformingHandler
and ConsumerEndpointFactoryBean
, respectively.
Let’s look at another example:
@Bean public IntegrationFlow myFlow() { return IntegrationFlows.from("input") .filter("World"::equals) .transform("Hello "::concat) .handle(System.out::println) .get(); }
The above example composes a sequence of Filter -> Transformer -> Service Activator
.
The flow is one way, that is it does not provide a a reply message but simply prints the payload to STDOUT.
The endpoints are automatically wired together using direct channels.
In addition to the IntegrationFlowBuilder
with EIP-methods the Java DSL provides a fluent API to configure MessageChannel
s.
For this purpose the MessageChannels
builder factory is provided:
@Bean public MessageChannel priorityChannel() { return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup") .interceptor(wireTap()) .get(); }
The same MessageChannels
builder factory can be used in the channel()
EIP-method from IntegrationFlowBuilder
to wire endpoints similar to an input-channel
/output-channel
pair in the XML configuration.
By default endpoints are wired via DirectChannel
s where the bean name is based on the pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]
.
This rule is applied for unnamed channels produced by inline MessageChannels
builder factory usage, too.
However all MessageChannels
methods have a channelId
-aware variant to create the bean names for MessageChannel
s.
The MessageChannel
references can be used as well as beanName
, as bean-method invocations.
Here is a sample with possible variants of channel()
EIP-method usage:
@Bean public MessageChannel queueChannel() { return MessageChannels.queue().get(); } @Bean public MessageChannel publishSubscribe() { return MessageChannels.publishSubscribe().get(); } @Bean public IntegrationFlow channelFlow() { return IntegrationFlows.from("input") .fixedSubscriberChannel() .channel("queueChannel") .channel(publishSubscribe()) .channel(MessageChannels.executor("executorChannel", this.taskExecutor)) .channel("output") .get(); }
from("input")
means: find and use the MessageChannel
with the "input" id, or create one;
fixedSubscriberChannel()
produces an instance of FixedSubscriberChannel
and registers it with name channelFlow.channel#0
;
channel("queueChannel")
works the same way but, of course, uses an existing "queueChannel" bean;
channel(publishSubscribe())
- the bean-method reference;
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
the IntegrationFlowBuilder
unwraps IntegrationComponentSpec
to the ExecutorChannel
and registers it as "executorChannel";
channel("output")
- registers the DirectChannel
bean with "output" name as long as there are no beans with this name.
Note: the IntegrationFlow
definition shown above is valid and all of its channels are applied to endpoints with BridgeHandler
s.
Important | |
---|---|
Be careful to use the same inline channel definition via |
@Bean public IntegrationFlow startFlow() { return IntegrationFlows.from("input") .transform(...) .channel(MessageChannels.queue("queueChannel")) .get(); } @Bean public IntegrationFlow endFlow() { return IntegrationFlows.from(MessageChannels.queue("queueChannel")) .handle(...) .get(); }
You end up with:
Caused by: java.lang.IllegalStateException: Could not register object [queueChannel] under bean name 'queueChannel': there is already object [queueChannel] bound at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
To make it working there is just need to declare @Bean
for that channel and use its bean-method from different IntegrationFlow
s.
A similar fluent API is provided to configure PollerMetadata
for AbstractPollingEndpoint
implementations.
The Pollers
builder factory can be used to configure common bean definitions or those created from IntegrationFlowBuilder
EIP-methods:
@Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerSpec poller() { return Pollers.fixedRate(500) .errorChannel("myErrors"); }
See Pollers
and PollerSpec
Java Docs for more information.
Important | |
---|---|
If you use the DSL to construct a |
All IntegrationFlowBuilder
EIP-methods have a variant to apply the Lambda parameter to provide options for AbstractEndpoint
s: SmartLifecycle
, PollerMetadata
, request-handler-advice-chain
etc.
Each of them has generic arguments, so it allows you to simply configure an endpoint and even its MessageHandler
in the context:
@Bean public IntegrationFlow flow2() { return IntegrationFlows.from(this.inputChannel) .transform(new PayloadSerializingTransformer(), c -> c.autoStartup(false).id("payloadSerializingTransformer")) .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice())) .get(); }
In addition the EndpointSpec
provides an id()
method to allow you to register an endpoint bean with a given bean name, rather than a generated one.
The DSL API provides a convenient, fluent Transformers
factory to be used as inline target object definition within .transform()
EIP-method:
@Bean public IntegrationFlow transformFlow() { return IntegrationFlows.from("input") .transform(Transformers.fromJson(MyPojo.class)) .transform(Transformers.serializer()) .get(); }
It avoids inconvenient coding using setters and makes the flow definition more straightforward.
Note, that Transformers
can be use to declare target Transformer
s as @Bean
s and, again, use them from IntegrationFlow
definition as bean-methods.
Nevertheless, the DSL parser takes care about bean declarations for inline objects, if they aren’t defined as beans yet.
See Transformers
Java Docs for more information and supported factory methods.
Typically message flows start from some Inbound Channel Adapter (e.g. <int-jdbc:inbound-channel-adapter>
).
The adapter is configured with <poller>
and it asks a MessageSource<?>
for producing messages periodically.
Java DSL allows to start IntegrationFlow
from a MessageSource<?>
, too.
For this purpose IntegrationFlows
builder factory provides overloaded IntegrationFlows.from(MessageSource<?> messageSource)
method.
The MessageSource<?>
may be configured as a bean and provided as argument for that method.
The second parameter of IntegrationFlows.from()
is a Consumer<SourcePollingChannelAdapterSpec>
Lambda and allows to provide options for the SourcePollingChannelAdapter
, e.g. PollerMetadata
or SmartLifecycle
:
@Bean public MessageSource<Object> jdbcMessageSource() { return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM foo"); } @Bean public IntegrationFlow pollingFlow() { return IntegrationFlows.from(jdbcMessageSource(), c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1))) .transform(Transformers.toJson()) .channel("furtherProcessChannel") .get(); }
There is also an IntegrationFlows.from()
variant based on the java.util.function.Supplier
if there is no requirements to build Message
objects directly.
The result of the Supplier.get()
is wrapped to the Message
(if it isn’t message already) by Framework automatically.
The next sections discuss selected endpoints which require further explanation.
Spring Integration natively provides specialized router types including:
HeaderValueRouter
PayloadTypeRouter
ExceptionTypeRouter
RecipientListRouter
XPathRouter
As with many other DSL IntegrationFlowBuilder
EIP-methods the route()
method can apply any out-of-the-box AbstractMessageRouter
implementation, or for convenience a String
as a SpEL expression, or a ref
/method
pair.
In addition route()
can be configured with a Lambda - the inline method invocation case, and with a Lambda for a Consumer<RouterSpec<MethodInvokingRouter>>
.
The fluent API also provides AbstractMappingMessageRouter
options like channelMapping(String key, String channelName)
pairs:
@Bean public IntegrationFlow routeFlow() { return IntegrationFlows.from("routerInput") .<Integer, Boolean>route(p -> p % 2 == 0, m -> m.suffix("Channel") .channelMapping("true", "even") .channelMapping("false", "odd") ) .get(); }
A simple expression-based router:
@Bean public IntegrationFlow routeFlow() { return IntegrationFlows.from("routerInput") .route("headers['destChannel']") .get(); }
The routeToRecipients()
method takes a Consumer<RecipientListRouterSpec>
:
@Bean public IntegrationFlow recipientListFlow() { return IntegrationFlows.from("recipientListInput") .<String, String>transform(p -> p.replaceFirst("Payload", "")) .routeToRecipients(r -> r .recipient("foo-channel", "'foo' == payload") .recipient("bar-channel", m -> m.getHeaders().containsKey("recipient") && (boolean) m.getHeaders().get("recipient")) .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload", f -> f.<String, String>transform(String::toUpperCase) .channel(c -> c.queue("recipientListSubFlow1Result"))) .recipientFlow((String p) -> p.startsWith("baz"), f -> f.transform("Hello "::concat) .channel(c -> c.queue("recipientListSubFlow2Result"))) .recipientFlow(new FunctionExpression<Message<?>>(m -> "bax".equals(m.getPayload())), f -> f.channel(c -> c.queue("recipientListSubFlow3Result"))) .defaultOutputToParentFlow()) .get(); }
The .defaultOutputToParentFlow()
of the .routeToRecipients()
allows to make the router’s defaultOutput
as a gateway to continue a process for the unmatched messages in the main flow.
A splitter is created using the split()
EIP-method.
By default, if the payload is a Iterable
, Iterator
, Array
, Stream
or Reactive Publisher
, this will output each item as an individual message.
This takes a Lambda, SpEL expression, any AbstractMessageSplitter
implementation, or can be used without parameters to provide the DefaultMessageSplitter
.
For example:
@Bean public IntegrationFlow splitFlow() { return IntegrationFlows.from("splitInput") .split(s -> s.applySequence(false).get().getT2().setDelimiters(",")) .channel(MessageChannels.executor(this.taskExecutor())) .get(); }
This creates a splitter that splits a message containing a comma delimited String.
Note: the getT2()
method comes from Tuple
Collection
which is the result of EndpointSpec.get()
and represents a pair of ConsumerEndpointFactoryBean
and DefaultMessageSplitter
for the example above.
An Aggregator
is conceptually the converse of a Splitter
.
It aggregates a sequence of individual messages into a single message and is necessarily more complex.
By default, an aggregator will return a message containing a collection of payloads from incoming messages.
The same rules are applied for the Resequencer
:
@Bean public IntegrationFlow splitAggregateFlow() { return IntegrationFlows.from("splitAggregateInput") .split() .channel(MessageChannels.executor(this.taskExecutor())) .resequence() .aggregate() .get(); }
The above is a canonical example of splitter/aggregator pattern.
The split()
method splits the list into individual messages and sends them to the ExecutorChannel
.
The resequence()
method reorders messages by sequence details from message headers.
The aggregate()
method just collects those messages to the result list.
However, you may change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following:
.aggregate(a -> a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey")) .releaseStrategy(g -> g.size() > 10) .messageStore(messageStore()))
The similar Lambda configurations are provided for the resequence()
EIP-method.
The .handle()
EIP-method’s goal is to invoke any MessageHandler
implementation or any method on some POJO.
Another option to define "activity" via Lambda expression.
Hence a generic GenericHandler<P>
functional interface has been introduced.
Its handle
method requires two arguments - P payload
and Map<String, Object> headers
.
Having that we can define a flow like this:
@Bean public IntegrationFlow myFlow() { return IntegrationFlows.from("flow3Input") .<Integer>handle((p, h) -> p * 2) .get(); }
However one main goal of Spring Integration an achieving of loose coupling
via runtime type conversion from message payload to target arguments of message handler.
Since Java doesn’t support generic type resolution for Lambda classes, we introduced a workaround with additional payloadType
argument for the most EIP-methods and LambdaMessageProcessor
, which delegates the hard conversion work to the Spring’s ConversionService
using provided type
and requested message to target method arguments.
The IntegrationFlow
might look like this:
@Bean public IntegrationFlow integerFlow() { return IntegrationFlows.from("input") .<byte[], String>transform(p - > new String(p, "UTF-8")) .handle(Integer.class, (p, h) -> p * 2) .get(); }
Of course we register some custom BytesToIntegerConverter
within ConversionService
and get rid of that additional .transform()
.
For convenience to log the message journey throw the Spring Integration flow (<logging-channel-adapter>
), a log()
operator is presented.
Underneath it is represented by the WireTap
ChannelInterceptor
and LoggingHandler
as subscriber.
It is responsible to log message incoming into the next endpoint or for the current channel:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
In this example an id
header will be logged with ERROR
level onto "test.category" only for messages passed the filter and before routing.
A .wireTap()
fluent API exists for MessageChannelSpec
builders.
A target configuration gains much more from Java DSL usage:
@Bean public QueueChannelSpec myChannel() { return MessageChannels.queue() .wireTap("loggingFlow.input"); } @Bean public IntegrationFlow loggingFlow() { return f -> f.log(); }
Important | |
---|---|
The |
@Bean MessageChannel myChannel() { return new DirectChannel(); } ... .channel(myChannel()) .log() }
When current MessageChannel
doesn’t implement ChannelInterceptorAware
, an implicit DirectChannel
and BridgeHandler
are injected into the IntegrationFlow
and the WireTap
is added to this new DirectChannel
.
And when there is not any channel declaration like in this sample:
.handle(...) .log() }
an implicit DirectChannel
is injected in the current position of the IntegrationFlow
and it is used as an output channel for the currently configured ServiceActivatingHandler
(the .handle()
above).
Important | |
---|---|
If |
@Bean public IntegrationFlow sseFlow() { return IntegrationFlows .from(WebFlux.inboundGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) .handle((p, h) -> Flux.just("foo", "bar", "baz")) .log(LoggingHandler.Level.WARN) .bridge() .get(); }
As we have seen, IntegrationFlowBuilder
provides a top level API to produce Integration components wired to message flows.
This is convenient if your integration may be accomplished with a single flow (which is often the case).
Alternately IntegrationFlow
s can be joined via MessageChannel
s.
By default, the MessageFlow behaves as a Chain in Spring Integration parlance.
That is, the endpoints are automatically wired implicitly via DirectChannel
s.
The message flow is not actually constructed as a chain, affording much more flexibility.
For example, you may send a message to any component within the flow, if you know its inputChannel
name, i.e., explicitly define it.
You may also reference externally defined channels within a flow to allow the use of channel adapters to enable remote transport protocols, file I/O, and the like, instead of direct channels.
As such, the DSL does not support the Spring Integration chain element since it doesn’t add much value.
Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration
infrastructure, it can be used together with Integration XML definitions and wired with Spring Integration Messaging Annotations configuration.
Another alternative to define direct IntegrationFlow
s is based on a fact that IntegrationFlow
can be declared as Lambda too:
@Bean public IntegrationFlow lambdaFlow() { return f -> f.filter("World"::equals) .transform("Hello "::concat) .handle(System.out::println); }
The result of this definition is the same bunch of Integration components wired with implicit direct channel.
Only limitation is here, that this flow is started with named direct channel - lambdaFlow.input
.
And Lambda flow can’t start from MessageSource
or MessageProducer
.
Starting with version 5.0.6, the generated bean names for the components in an IntegrationFlow
include the flow bean followed by a dot as a prefix.
For example the ConsumerEndpointFactoryBean
for the .transform("Hello "::concat)
in the sample above, will end up with te bean name like lambdaFlow.org.springframework.integration.config.
ConsumerEndpointFactoryBean#0
.
The Transformer
implementation bean for that endpoint will have a bean name such as lambdaFlow.org.springframework.integration.transformer.
MethodInvokingTransformer#0
.
These generated bean names are prepended with the flow id prefix for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime.
See Section 9.20, “Dynamic and runtime Integration Flows” for more information.
The FunctionExpression
(an implementation of SpEL Expression
) has been introduced to get a gain of Java and Lambda usage for the method and its generics
context.
The Function<T, R>
option is provided for the DSL components alongside with expression
option, when there is the implicit Strategy
variant from Core Spring Integration.
The usage may look like:
.enrich(e -> e.requestChannel("enrichChannel") .requestPayload(Message::getPayload) .propertyFunction("date", m -> new Date()))
The FunctionExpression
also supports runtime type conversion as it is done in the standard SpelExpression
.
Some of if...else
and publish-subscribe
components provide the support to specify their logic or mapping using Sub Flows.
The simplest sample is .publishSubscribeChannel()
:
@Bean public IntegrationFlow subscribersFlow() { return flow -> flow .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s .subscribe(f -> f .<Integer>handle((p, h) -> p / 2) .channel(c -> c.queue("subscriber1Results"))) .subscribe(f -> f .<Integer>handle((p, h) -> p * 2) .channel(c -> c.queue("subscriber2Results")))) .<Integer>handle((p, h) -> p * 3) .channel(c -> c.queue("subscriber3Results")); }
Of course the same result we can achieve with separate IntegrationFlow
@Bean
definitions, but we hope you’ll find the subflow style of logic composition useful.
Similar publish-subscribe
subflow composition provides .routeToRecipients()
.
Another sample is .discardFlow()
on the .filter()
instead of .discardChannel()
.
The .route()
deserves special attention.
As a sample:
@Bean public IntegrationFlow routeFlow() { return f -> f .<Integer, Boolean>route(p -> p % 2 == 0, m -> m.channelMapping("true", "evenChannel") .subFlowMapping("false", sf -> sf.<Integer>handle((p, h) -> p * 3))) .transform(Object::toString) .channel(c -> c.queue("oddChannel")); }
The .channelMapping()
continues to work as in regular Router
mapping, but the .subFlowMapping()
tied that subflow with main flow.
In other words, any router’s subflow returns to the main flow after .route()
.
Of course, subflows can be nested with any depth, but we don’t recommend to do that because, in fact, even in the router case, adding complex subflows within a flow would quickly begin to look like a plate of spaghetti and difficult for a human to parse.
All of the examples so far illustrate how the DSL supports a messaging architecture using the Spring Integration programming model, but we haven’t done any real integration yet. This requires access to remote resources via http, jms, amqp, tcp, jdbc, ftp, smtp, and the like, or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL will continually be catching up with Spring Integration.
Anyway we are providing the hi-level API to define protocol-specific seamlessly.
This is achieved with Factory and Builder patterns and, of course, with Lambdas.
The factory classes can be considered "Namespace Factories", because they play the same role as XML namespace for components from the concrete protocol-specific Spring Integration modules.
Currently, Spring Integration Java DSL supports Amqp
, Feed
, Jms
, Files
, (S)Ftp
, Http
, JPA
, MongoDb
, TCP/UDP
, Mail
, WebFlux
and Scripts
namespace factories:
@Bean public IntegrationFlow amqpFlow() { return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue())) .transform("hello "::concat) .transform(String.class, String::toUpperCase) .get(); } @Bean public IntegrationFlow jmsOutboundGatewayFlow() { return IntegrationFlows.from("jmsOutboundGatewayChannel") .handle(Jms.outboundGateway(this.jmsConnectionFactory) .replyContainer(c -> c.concurrentConsumers(3) .sessionTransacted(true)) .requestDestination("jmsPipelineTest")) .get(); } @Bean public IntegrationFlow sendMailFlow() { return IntegrationFlows.from("sendMailChannel") .handle(Mail.outboundAdapter("localhost") .port(smtpPort) .credentials("user", "pw") .protocol("smtp") .javaMailProperties(p -> p.put("mail.debug", "true")), e -> e.id("sendMailEndpoint")) .get(); }
We show here the usage of namespace factories as inline adapters declarations, however they can be used from @Bean
definitions to make the IntegrationFlow
method-chain more readable.
We are soliciting community feedback on these namespace factories before we spend effort on others; we’d also appreciate some prioritization for which adapters/gateways we should support next.
See more Java DSL samples in the protocol-specific chapter throughout this reference manual.
All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow
:
@Bean public QueueChannelSpec wrongMessagesChannel() { return MessageChannels .queue() .wireTap("wrongMessagesWireTapChannel"); } @Bean public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) { return IntegrationFlows.from("inputChannel") .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"), e -> e.discardChannel(wrongMessagesChannel)) .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId()) .route(xpathRouter(wrongMessagesChannel)) .get(); } @Bean public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) { XPathRouter router = new XPathRouter("local-name(/*)"); router.setEvaluateAsString(true); router.setResolutionRequired(false); router.setDefaultOutputChannel(wrongMessagesChannel); router.setChannelMapping("Tags", "splittingChannel"); router.setChannelMapping("Tag", "receivedChannel"); return router; }
The IntegrationFlow
as an interface can be implemented directly and specified as component for scanning:
@Component public class MyFlow implements IntegrationFlow { @Override public void configure(IntegrationFlowDefinition<?> f) { f.<String, String>transform(String::toUpperCase); } }
And yes, it is picked up by the IntegrationFlowBeanPostProcessor
and correctly parsed and registered in the application context.
For convenience and loosely coupled architecture the IntegrationFlowAdapter
base class implementation is provided.
It requires a buildFlow()
method implementation to produce an IntegrationFlowDefinition
using one of from()
support methods:
@Component public class MyFlowAdapter extends IntegrationFlowAdapter { private final AtomicBoolean invoked = new tomicBoolean(); public Date nextExecutionTime(TriggerContext triggerContext) { return this.invoked.getAndSet(true) ? null : new Date(); } @Override protected IntegrationFlowDefinition<?> buildFlow() { return from(this, "messageSource", e -> e.poller(p -> p.trigger(this::nextExecutionTime))) .split(this) .transform(this) .aggregate(a -> a.processor(this, null), null) .enrichHeaders(Collections.singletonMap("foo", "FOO")) .filter(this) .handle(this) .channel(c -> c.queue("myFlowAdapterOutput")); } public String messageSource() { return "B,A,R"; } @Splitter public String[] split(String payload) { return StringUtils.commaDelimitedListToStringArray(payload); } @Transformer public String transform(String payload) { return payload.toLowerCase(); } @Aggregator public String aggregate(List<String> payloads) { return payloads.stream().collect(Collectors.joining()); } @Filter public boolean filter(@Header Optional<String> foo) { return foo.isPresent(); } @ServiceActivator public String handle(String payload, @Header String foo) { return payload + ":" + foo; } }
The IntegrationFlow
s and therefore all its dependant components can be registered at runtime.
This was done previously by the BeanFactory.registerSingleton()
hook and now via newly introduced in the Spring Framework 5.0
programmatic BeanDefinition
registration with the instanceSupplier
hook:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
and all the necessary bean initialization and lifecycle is done automatically as it is with the standard context configuration bean definitions.
To simplify the development experience Spring Integration introduced IntegrationFlowContext
to register and manage IntegrationFlow
instances at runtime:
@Autowired private AbstractServerConnectionFactory server1; @Autowired private IntegrationFlowContext flowContext; ... @Test public void testTcpGateways() { TestingUtilities.waitListening(this.server1, null); IntegrationFlow flow = f -> f .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort()) .serializer(TcpCodecs.crlf()) .deserializer(TcpCodecs.lengthHeader1()) .id("client1")) .remoteTimeout(m -> 5000)) .transform(Transformers.objectToString()); IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); }
This is useful when we have multi configuration options and have to create several instances of similar flows.
So, we can iterate our options and create and register IntegrationFlow
s within loop.
Another variant when our source of data isn’t Spring-based and we must create it on the fly.
Such a sample is Reactive Streams event source:
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4") .map(v -> v.split(",")) .flatMapIterable(Arrays::asList) .map(Integer::parseInt) .map(GenericMessage<Integer>::new); QueueChannel resultChannel = new QueueChannel(); IntegrationFlow integrationFlow = IntegrationFlows.from(messageFlux) .<Integer, Integer>transform(p -> p * 2) .channel(resultChannel) .get(); this.integrationFlowContext.registration(integrationFlow) .register();
The IntegrationFlowRegistrationBuilder
(as a result of the IntegrationFlowContext.registration()
) can be used to specify a bean name for the IntegrationFlow
to register, to control its autoStartup
and also for additional, non Integration beans registration.
Usually those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP etc.), serializers/deserializers or any other required support components.
Such a dynamically registered IntegrationFlow
and all its dependant beans can be removed afterwards using IntegrationFlowRegistration.destroy()
callback.
See IntegrationFlowContext
JavaDocs for more information.
Note | |
---|---|
Starting with version 5.0.6, all generated bean names in an |
Also, starting with version 5.0.6, the registration builder API has a new method useFlowIdAsPrefix()
.
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions if components in the flows have the same id.
For example:
private void registerFlows() { IntegrationFlowRegistration flow1 = this.flowContext.registration(buildFlow(1234)) .id("tcp1") .useFlowIdAsPrefix() .register(); IntegrationFlowRegistration flow2 = this.flowContext.registration(buildFlow(1235)) .id("tcp2") .useFlowIdAsPrefix() .register(); } private IntegrationFlow buildFlow(int port) { return f -> f .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port) .serializer(TcpCodecs.crlf()) .deserializer(TcpCodecs.lengthHeader1()) .id("client")) .remoteTimeout(m -> 5000)) .transform(Transformers.objectToString()); }
In this case, the message handler for the first flow can be referenced with bean name tcp1.client.handler
.
Note | |
---|---|
an |
The IntegrationFlow
can start from the service interface providing GatewayProxyFactoryBean
component:
public interface ControlBusGateway { void send(String command); } ... @Bean public IntegrationFlow controlBusFlow() { return IntegrationFlows.from(ControlBusGateway.class) .controlBus() .get(); }
All the proxy for interface methods are supplied with the channel to send messages to the next integration component in the IntegrationFlow
.
The service interface can be marked with the @MessagingGateway
as well as methods with the @Gateway
annotations.
Nevertheless the requestChannel
is ignored and overridden with that internal channel for the next component in the IntegrationFlow
.
Otherwise such a configuration via IntegrationFlow
won’t make sense.
By default a GatewayProxyFactoryBean
gets a conventional bean name like [FLOW_BEAN_NAME.gateway]
.
That id can be changed via @MessagingGateway.name()
attribute or the overloaded from(Class<?> serviceInterface, String beanName)
factory method.
With the Java 8 on board we even can create such an Integration Gateway with the java.util.function
interfaces:
@Bean public IntegrationFlow errorRecovererFlow() { return IntegrationFlows.from(Function.class, "errorRecovererFunction") .handle((GenericHandler<?>) (p, h) -> { throw new RuntimeException("intentional"); }, e -> e.advice(retryAdvice())) .get(); }
That can be used lately as:
@Autowired @Qualifier("errorRecovererFunction") private Function<String, String> errorRecovererFlowGateway;