Spring Integration Interaction

Spring Integration Framework extends the Spring programming model to support the well-known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. It also provides a high-level DSL to compose various operations (endpoints) into a logical integration flow. With a lambda style of this DSL configuration, Spring Integration already has a good level of java.util.function interfaces adoption. The @MessagingGateway proxy interface can also be as a Function or Consumer, which according to the Spring Cloud Function environment can be registered into a function catalog. See more information in Spring Integration ReferenceManual about its support for functions.

On the other hand, starting with version 4.0.3, Spring Cloud Function introduces a spring-cloud-function-integration module which provides deeper, more cloud-specific and auto-configuration based API for interaction with a FunctionCatalog from Spring Integration DSL perspective. The FunctionFlowBuilder is auto-configured and autowired with a FunctionCatalog and represents an entry point for function-specific DSL for target IntegrationFlow instance. In addition to standard IntegrationFlow.from() factories (for convenience), the FunctionFlowBuilder exposes a fromSupplier(String supplierDefinition) factory to lookup the target Supplier in the provided FunctionCatalog. Then this FunctionFlowBuilder leads to the FunctionFlowDefinition. This FunctionFlowDefinition is an implementation of the IntegrationFlowExtension and exposes apply(String functionDefinition) and accept(String consumerDefinition) operators to lookup Function or Consumer from the FunctionCatalog, respectively. See their Javadocs for more information.

The following example demonstrates the FunctionFlowBuilder in action alongside with the power of the rest of IntegrationFlow API:

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

Since the FunctionCatalog.lookup() functionality is not limited just to simple function names, a function composition feature can also be used in the mentioned apply() and accept() operators:

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

This API becomes more relevant, when we add into our Spring Cloud applications auto-configuration dependencies for predefined functions. For example Stream Applications project, in addition to application images, provides artifacts with functions for various integration use-case, e.g. debezium-supplier, elasticsearch-consumer, aggregator-function etc.

The following configuration is based on the http-supplier, spel-function and file-consumer, respectively:

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

What we would need else is just to add their configuration into an application.properties (if necessary):

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt