Kotlin DSL
The Kotlin DSL is a wrapper and extension to Java DSL and aimed to make Spring Integration development on Kotlin as smooth and straightforward as possible with interoperability with the existing Java API and Kotlin language-specific structures.
All you need to get started is just an import for org.springframework.integration.dsl.integrationFlow
- an overloaded global function for Kotlin DSL.
For IntegrationFlow
definitions as lambdas we typically don’t need anything else from Kotlin and just declare a bean like this:
@Bean
fun oddFlow() =
IntegrationFlow { flow ->
flow.handle<Any> { _, _ -> "odd" }
}
In this case Kotlin understands that the lambda should be translated into IntegrationFlow
anonymous instance and target Java DSL processor parses this construction properly into Java objects.
As an alternative to the construction above and for consistency with use-cases explained below, a Kotlin-specif DSL should be used for declaring integration flows in the builder pattern style:
@Bean
fun flowLambda() =
integrationFlow {
filter<String> { it === "test" }
wireTap {
handle { println(it.payload) }
}
transform<String, String> { it.toUpperCase() }
}
Such a global integrationFlow()
function expects a lambda in builder style for a KotlinIntegrationFlowDefinition
(a Kotlin wrapper for the IntegrationFlowDefinition
) and produces a regular IntegrationFlow
lambda implementation.
See more overloaded integrationFlow()
variants below.
Many other scenarios require an IntegrationFlow
to be started from source of data (e.g. JdbcPollingChannelAdapter
, JmsInboundGateway
or just an existing MessageChannel
).
For this purpose, the Spring Integration Java DSL provides an IntegrationFlows
factory with its large number of overloaded from()
methods.
This factory can be used in Kotlin as well:
@Bean
fun flowFromSupplier() =
IntegrationFlows.from<String>({ "bar" }) { e -> e.poller { p -> p.fixedDelay(10).maxMessagesPerPoll(1) } }
.channel { c -> c.queue("fromSupplierQueue") }
.get()
But unfortunately not all from()
methods are compatible with Kotlin structures.
To fix the gap, this project provides a Kotlin DSL around an IntegrationFlows
factory.
It is implemented as a set of overloaded integrationFlow()
functions.
With a consumer for a KotlinIntegrationFlowDefinition
to declare the rest of the flow as an IntegrationFlow
lambda to reuse the mentioned above experience and also avoid get()
call in the end.
For example:
@Bean
fun functionFlow() =
integrationFlow<Function<String, String>>({ beanName("functionGateway") }) {
transform<String, String> { it.toUpperCase() }
}
@Bean
fun messageSourceFlow() =
integrationFlow(MessageProcessorMessageSource { "testSource" },
{ poller { it.fixedDelay(10).maxMessagesPerPoll(1) } }) {
channel { queue("fromSupplierQueue") }
}
In addition, Kotlin extensions are provided for the Java DSL API which needs some refinement for Kotlin structures.
For example IntegrationFlowDefinition<*>
requires a reifying for many methods with Class<P>
argument:
@Bean
fun convertFlow() =
integrationFlow("convertFlowInput") {
convert<TestPojo>()
}