Apache Kafka Support
Overview
Spring Integration for Apache Kafka is based on the Spring for Apache Kafka project.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>6.2.0-M1</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.2.0-M1"
It provides the following components:
Outbound Channel Adapter
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka. Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows:
-
The payload of the Spring Integration message is used to populate the payload of the Kafka record.
-
By default, the
kafka_messageKey
header of the Spring Integration message is used to populate the key of the Kafka record.
You can customize the target topic and partition for publishing the message through the kafka_topic
and kafka_partitionId
headers, respectively.
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message.
To that end, it supports three mutually exclusive pairs of attributes:
-
topic
andtopic-expression
-
message-key
andmessage-key-expression
-
partition-id
andpartition-id-expression
These let you specify topic
, message-key
, and partition-id
, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message.
The KafkaHeaders interface (provided by spring-kafka ) contains constants used for interacting with
headers.
The messageKey and topic default headers now require a kafka_ prefix.
When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers['messageKey']" and topic-expression="headers['topic']" on the <int-kafka:outbound-channel-adapter> .
Alternatively, you can change the headers upstream to the new headers from KafkaHeaders by using a <header-enricher> or a MessageBuilder .
If you use constant values, you can also configure them on the adapter by using topic and message-key .
|
NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as the following:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
The adapter requires a KafkaTemplate
, which, in turn, requires a suitably configured KafkaProducerFactory
.
If a send-failure-channel
(sendFailureChannel
) is provided and a send()
failure (sync or async) is received, an ErrorMessage
is sent to the channel.
The payload is a KafkaSendFailureException
with failedMessage
, record
(the ProducerRecord
) and cause
properties.
You can override the DefaultErrorMessageStrategy
by setting the error-message-strategy
property.
If a send-success-channel
(sendSuccessChannel
) is provided, a message with a payload of type org.apache.kafka.clients.producer.RecordMetadata
is sent after a successful send.
If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a transactionIdPrefix on the KafkaTemplate to override the prefix used by the container or transaction manager.
The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances.
The prefix used for producer-only transactions must be unique on all application instances.
|
You can configure a flushExpression
which must resolve to a boolean value.
Flushing after sending several messages might be useful if you are using the linger.ms
and batch.size
Kafka producer properties; the expression should evaluate to Boolean.TRUE
on the last message and an incomplete batch will be sent immediately.
By default, the expression looks for a Boolean
value in the KafkaIntegrationHeaders.FLUSH
header (kafka_flush
).
The flush will occur if the value is true
and not if it’s false
or the header is absent.
The KafkaProducerMessageHandler.sendTimeoutExpression
default has changed from 10 seconds to the delivery.timeout.ms
Kafka producer property + 5000
so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
Configuration
The following example shows how to configure the outbound channel adapter for Apache Kafka:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Message-driven Channel Adapter
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) uses a spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
.
Also, the mode
attribute is available.
It can accept values of record
or batch
(default: record
).
For record
mode, each message payload is converted from a single ConsumerRecord
.
For batch
mode, the payload is a list of objects that are converted from all the ConsumerRecord
instances returned by the consumer poll.
As with the batched @KafkaListener
, the KafkaHeaders.RECEIVED_KEY
, KafkaHeaders.RECEIVED_PARTITION
, KafkaHeaders.RECEIVED_TOPIC
, and KafkaHeaders.OFFSET
headers are also lists, with positions corresponding to the position in the payload.
Received messages have certain headers populated.
See the KafkaHeaders
class for more information.
The Consumer object (in the kafka_consumer header) is not thread-safe.
You must invoke its methods only on the thread that calls the listener within the adapter.
If you hand off the message to another thread, you must not call its methods.
|
When a retry-template
is provided, delivery failures are retried according to its retry policy.
If an error-channel
is also supplied, a default ErrorMessageSendingRecoverer
will be used as the recovery callback after retries are exhausted.
You can also use the recovery-callback
to specify some other action to take in that case, or set it to null
to throw the final exception to the listener container so it is handled there.
When building an ErrorMessage
(for use in the error-channel
or recovery-callback
), you can customize the error message by setting the error-message-strategy
property.
By default, a RawRecordHeaderErrorMessageStrategy
is used, to provide access to the converted message as well as the raw ConsumerRecord
.
This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property.
Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer .
|
Configuration
The following example shows how to configure a message-driven channel adapter:
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
You can also use the container factory that is used for @KafkaListener
annotations to create ConcurrentMessageListenerContainer
instances for other purposes.
See the Spring for Apache Kafka documentation for an example.
With the Java DSL, the container does not have to be configured as a @Bean
, because the DSL registers the container as a bean.
The following example shows how to do so:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
Notice that, in this case, the adapter is given an id
(topic2Adapter
).
The container is registered in the application context with a name of topic2Adapter.container
.
If the adapter does not have an id
property, the container’s bean name is the container’s fully qualified class name plus #n
, where n
is incremented for each container.
Inbound Channel Adapter
The KafkaMessageSource
provides a pollable channel adapter implementation.
Configuration
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
integrationFlow(Kafka.inboundChannelAdapter(cf,
ConsumerProperties(TEST_TOPIC3).also {
it.groupId = "kotlinMessageSourceGroup"
}),
{ poller(Pollers.fixedDelay(100)) }) {
handle { m ->
}
}
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
consumerProperties.setGroupId("myGroupId");
consumerProperties.setClientId("myClientId");
retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
consumer-properties="consumerProperties1"
ack-factory="ackFactory"
channel="inbound"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="max.poll.records" value="1"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
<constructor-arg name="topics" value="topic1"/>
<property name="groupId" value="group"/>
<property name="clientId" value="client"/>
</bean>
Refer to the javadocs for available properties.
By default, max.poll.records
must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a DefaultKafkaConsumerFactory
.
You can set the property allowMultiFetch
to true
to override this behavior.
You must poll the consumer within max.poll.interval.ms to avoid a rebalance.
If you set allowMultiFetch to true you must process all the retrieved records, and poll again, within max.poll.interval.ms .
|
Messages emitted by this adapter contain a header kafka_remainingRecords
with a count of records remaining from the previous poll.
Outbound Gateway
The outbound gateway is for request/reply operations. It differs from most Spring Integration gateways in that the sending thread does not block in the gateway, and the reply is processed on the reply listener container thread. If your code invokes the gateway behind a synchronous Messaging Gateway, the user thread blocks there until the reply is received (or a timeout occurs).
The gateway does not accept requests until the reply container has been assigned its topics and partitions.
It is suggested that you add a ConsumerRebalanceListener to the template’s reply container properties and wait for the onPartitionsAssigned call before sending messages to the gateway.
|
The KafkaProducerMessageHandler
sendTimeoutExpression
default is delivery.timeout.ms
Kafka producer property + 5000
so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may time out the send()
, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
Configuration
The following example shows how to configure a gateway:
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
Refer to the javadocs for available properties.
Notice that the same class as the outbound channel adapter is used, the only difference being that the KafkaTemplate
passed into the constructor is a ReplyingKafkaTemplate
.
See the Spring for Apache Kafka documentation for more information.
The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter. The reply topic is determined as follows:
-
A message header named
KafkaHeaders.REPLY_TOPIC
(if present, it must have aString
orbyte[]
value) is validated against the template’s reply container’s subscribed topics. -
If the template’s
replyContainer
is subscribed to only one topic, it is used.
You can also specify a KafkaHeaders.REPLY_PARTITION
header to determine a specific partition to be used for replies.
Again, this is validated against the template’s reply container’s subscriptions.
Alternatively, you can also use a configuration similar to the following bean:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
Inbound Gateway
The inbound gateway is for request/reply operations.
Configuration
The following example shows how to configure an inbound gateway:
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
Refer to the javadocs for available properties.
When a RetryTemplate
is provided, delivery failures are retried according to its retry policy.
If an error-channel
is also supplied, a default ErrorMessageSendingRecoverer
will be used as the recovery callback after retries are exhausted.
You can also use the recovery-callback
to specify some other action to take in that case, or set it to null
to throw the final exception to the listener container so it is handled there.
When building an ErrorMessage
(for use in the error-channel
or recovery-callback
), you can customize the error message by setting the error-message-strategy
property.
By default, a RawRecordHeaderErrorMessageStrategy
is used, to provide access to the converted message as well as the raw ConsumerRecord
.
This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property.
Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer .
|
The following example shows how to configure a simple upper case converter with the Java DSL:
Alternatively, you could configure an upper-case converter by using code similar to the following:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
You can also use the container factory that is used for @KafkaListener
annotations to create ConcurrentMessageListenerContainer
instances for other purposes.
See the Spring for Apache Kafka documentation and Message-driven Channel Adapter for examples.
Channels Backed by Apache Kafka Topics
Spring Integration has MessageChannel
implementations backed by an Apache Kafka topic for persistence.
Each channel requires a KafkaTemplate
for the sending side and either a listener container factory (for subscribable channels) or a KafkaMessageSource
for a pollable channel.
Java DSL Configuration
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
Message Conversion
A StringJsonMessageConverter
is provided.
See the Spring for Apache Kafka documentation for more information.
When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted.
This is achieved by setting the payload-type
attribute (payloadType
property) on the adapter.
The following example shows how to do so in XML configuration:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
The following example shows how to set the payload-type
attribute (payloadType
property) on the adapter in Java configuration:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
Null Payloads and Log Compaction 'Tombstone' Records
Spring Messaging Message<?>
objects cannot have null
payloads.
When you use the endpoints for Apache Kafka, null
payloads (also known as tombstone records) are represented by a payload of type KafkaNull
.
See the Spring for Apache Kafka documentation for more information.
The POJO methods for Spring Integration endpoints can use a true null
value instead of KafkaNull
.
To do so, mark the parameter with @Payload(required = false)
.
The following example shows how to do so:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
Calling a Spring Integration flow from a KStream
You can use a MessagingTransformer
to invoke an integration flow from a KStream
:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a @Qualifier
if needed.
Performance Considerations for read/process/write Scenarios
Many applications consume from a topic, perform some processing and write to another topic.
In most, cases, if the write
fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic.
This functionality is supported by the underlying message listener container, together with a suitably configured error handler.
However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container.
When consuming single records, this is achieved by setting the sync
property on the outbound adapter.
However, when consuming batches, using sync
causes a significant performance degradation because the application would wait for the result of each send before sending the next message.
You also can perform multiple sends and then wait for the results of those sends afterwards.
This is achieved by adding a futuresChannel
to the message handler.
To enable the feature add KafkaIntegrationHeaders.FUTURE_TOKEN
to the outbound messages; this can then be used to correlate a Future
to a particular sent message.
Here is an example of how you might use this feature:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}