Kotlin DSL

The Kotlin DSL is a wrapper and extension to Java DSL and aimed to make Spring Integration development on Kotlin as smooth and straightforward as possible with interoperability with the existing Java API and Kotlin language-specific structures.

All you need to get started is just an import for org.springframework.integration.dsl.integrationFlow - an overloaded global function for Kotlin DSL.

For IntegrationFlow definitions as lambdas we typically don’t need anything else from Kotlin and just declare a bean like this:

@Bean
fun oddFlow() =
IntegrationFlow { flow ->
    flow.handle<Any> { _, _ -> "odd" }
}

In this case Kotlin understands that the lambda should be translated into IntegrationFlow anonymous instance and target Java DSL processor parses this construction properly into Java objects.

As an alternative to the construction above and for consistency with use-cases explained below, a Kotlin-specif DSL should be used for declaring integration flows in the builder pattern style:

@Bean
fun flowLambda() =
    integrationFlow {
        filter<String> { it === "test" }
        wireTap {
                    handle { println(it.payload) }
                }
        transform<String, String> { it.toUpperCase() }
    }

Such a global integrationFlow() function expects a lambda in builder style for a KotlinIntegrationFlowDefinition (a Kotlin wrapper for the IntegrationFlowDefinition) and produces a regular IntegrationFlow lambda implementation. See more overloaded integrationFlow() variants below.

Many other scenarios require an IntegrationFlow to be started from source of data (e.g. JdbcPollingChannelAdapter, JmsInboundGateway or just an existing MessageChannel). For this purpose, the Spring Integration Java DSL provides an IntegrationFlows factory with its large number of overloaded from() methods. This factory can be used in Kotlin as well:

@Bean
fun flowFromSupplier() =
         IntegrationFlows.from<String>({ "bar" }) { e -> e.poller { p -> p.fixedDelay(10).maxMessagesPerPoll(1) } }
                 .channel { c -> c.queue("fromSupplierQueue") }
                 .get()

But unfortunately not all from() methods are compatible with Kotlin structures. To fix the gap, this project provides a Kotlin DSL around an IntegrationFlows factory. It is implemented as a set of overloaded integrationFlow() functions. With a consumer for a KotlinIntegrationFlowDefinition to declare the rest of the flow as an IntegrationFlow lambda to reuse the mentioned above experience and also avoid get() call in the end. For example:

@Bean
fun functionFlow() =
        integrationFlow<Function<String, String>>({ beanName("functionGateway") }) {
            transform<String, String> { it.toUpperCase() }
        }

@Bean
fun messageSourceFlow() =
        integrationFlow(MessageProcessorMessageSource { "testSource" },
                { poller { it.fixedDelay(10).maxMessagesPerPoll(1) } }) {
            channel { queue("fromSupplierQueue") }
        }

In addition, Kotlin extensions are provided for the Java DSL API which needs some refinement for Kotlin structures. For example IntegrationFlowDefinition<*> requires a reifying for many methods with Class<P> argument:

@Bean
fun convertFlow() =
    integrationFlow("convertFlowInput") {
        convert<TestPojo>()
    }
The reified type can be a whole Message<*> if there need access to headers as well in the lambda of the operator.