Spring Integration Interaction
Spring Integration Framework extends the Spring programming model to support the well-known Enterprise Integration Patterns.
It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
It also provides a high-level DSL to compose various operations (endpoints) into a logical integration flow.
With a lambda style of this DSL configuration, Spring Integration already has a good level of java.util.function
interfaces adoption.
The @MessagingGateway
proxy interface can also be as a Function
or Consumer
, which according to the Spring Cloud Function environment can be registered into a function catalog.
See more information in Spring Integration ReferenceManual about its support for functions.
On the other hand, starting with version 4.0.3
, Spring Cloud Function introduces a spring-cloud-function-integration
module which provides deeper, more cloud-specific and auto-configuration based API for interaction with a FunctionCatalog
from Spring Integration DSL perspective.
The FunctionFlowBuilder
is auto-configured and autowired with a FunctionCatalog
and represents an entry point for function-specific DSL for target IntegrationFlow
instance.
In addition to standard IntegrationFlow.from()
factories (for convenience), the FunctionFlowBuilder
exposes a fromSupplier(String supplierDefinition)
factory to lookup the target Supplier
in the provided FunctionCatalog
.
Then this FunctionFlowBuilder
leads to the FunctionFlowDefinition
.
This FunctionFlowDefinition
is an implementation of the IntegrationFlowExtension
and exposes apply(String functionDefinition)
and accept(String consumerDefinition)
operators to lookup Function
or Consumer
from the FunctionCatalog
, respectively.
See their Javadocs for more information.
The following example demonstrates the FunctionFlowBuilder
in action alongside with the power of the rest of IntegrationFlow
API:
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
Since the FunctionCatalog.lookup()
functionality is not limited just to simple function names, a function composition feature can also be used in the mentioned apply()
and accept()
operators:
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
This API becomes more relevant, when we add into our Spring Cloud applications auto-configuration dependencies for predefined functions.
For example Stream Applications project, in addition to application images, provides artifacts with functions for various integration use-case, e.g. debezium-supplier
, elasticsearch-consumer
, aggregator-function
etc.
The following configuration is based on the http-supplier
, spel-function
and file-consumer
, respectively:
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
What we would need else is just to add their configuration into an application.properties
(if necessary):
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt