For the latest stable version, please use Spring Integration 6.4.1!

Apache Kafka Support

Overview

Spring Integration for Apache Kafka is based on the Spring for Apache Kafka project.

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>6.3.7</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.3.7"

It provides the following components:

Outbound Channel Adapter

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka. Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows:

  • The payload of the Spring Integration message is used to populate the payload of the Kafka record.

  • By default, the kafka_messageKey header of the Spring Integration message is used to populate the key of the Kafka record.

You can customize the target topic and partition for publishing the message through the kafka_topic and kafka_partitionId headers, respectively.

In addition, the <int-kafka:outbound-channel-adapter> provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message. To that end, it supports three mutually exclusive pairs of attributes:

  • topic and topic-expression

  • message-key and message-key-expression

  • partition-id and partition-id-expression

These let you specify topic, message-key, and partition-id, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message.

The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers['messageKey']" and topic-expression="headers['topic']" on the <int-kafka:outbound-channel-adapter>. Alternatively, you can change the headers upstream to the new headers from KafkaHeaders by using a <header-enricher> or a MessageBuilder. If you use constant values, you can also configure them on the adapter by using topic and message-key.

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as the following:

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"

The adapter requires a KafkaTemplate, which, in turn, requires a suitably configured KafkaProducerFactory.

If a send-failure-channel (sendFailureChannel) is provided and a send() failure (sync or async) is received, an ErrorMessage is sent to the channel. The payload is a KafkaSendFailureException with failedMessage, record (the ProducerRecord) and cause properties. You can override the DefaultErrorMessageStrategy by setting the error-message-strategy property.

If a send-success-channel (sendSuccessChannel) is provided, a message with a payload of type org.apache.kafka.clients.producer.RecordMetadata is sent after a successful send.

If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a transactionIdPrefix on the KafkaTemplate to override the prefix used by the container or transaction manager. The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances. The prefix used for producer-only transactions must be unique on all application instances.

You can configure a flushExpression which must resolve to a boolean value. Flushing after sending several messages might be useful if you are using the linger.ms and batch.size Kafka producer properties; the expression should evaluate to Boolean.TRUE on the last message and an incomplete batch will be sent immediately. By default, the expression looks for a Boolean value in the KafkaIntegrationHeaders.FLUSH header (kafka_flush). The flush will occur if the value is true and not if it’s false or the header is absent.

The KafkaProducerMessageHandler.sendTimeoutExpression default has changed from 10 seconds to the delivery.timeout.ms Kafka producer property + 5000 so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.

Configuration

The following example shows how to configure the outbound channel adapter for Apache Kafka:

  • Java DSL

  • Java

  • XML

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

Message-driven Channel Adapter

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.

Also, the mode attribute is available. It can accept values of record or batch (default: record). For record mode, each message payload is converted from a single ConsumerRecord. For batch mode, the payload is a list of objects that are converted from all the ConsumerRecord instances returned by the consumer poll. As with the batched @KafkaListener, the KafkaHeaders.RECEIVED_KEY, KafkaHeaders.RECEIVED_PARTITION, KafkaHeaders.RECEIVED_TOPIC, and KafkaHeaders.OFFSET headers are also lists, with positions corresponding to the position in the payload.

Received messages have certain headers populated. See the KafkaHeaders class for more information.

The Consumer object (in the kafka_consumer header) is not thread-safe. You must invoke its methods only on the thread that calls the listener within the adapter. If you hand off the message to another thread, you must not call its methods.

When a retry-template is provided, delivery failures are retried according to its retry policy. If an error-channel is also supplied, a default ErrorMessageSendingRecoverer will be used as the recovery callback after retries are exhausted. You can also use the recovery-callback to specify some other action to take in that case, or set it to null to throw the final exception to the listener container so it is handled there.

When building an ErrorMessage (for use in the error-channel or recovery-callback), you can customize the error message by setting the error-message-strategy property. By default, a RawRecordHeaderErrorMessageStrategy is used, to provide access to the converted message as well as the raw ConsumerRecord.

This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

Configuration

The following example shows how to configure a message-driven channel adapter:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

You can also use the container factory that is used for @KafkaListener annotations to create ConcurrentMessageListenerContainer instances for other purposes. See the Spring for Apache Kafka documentation for an example.

With the Java DSL, the container does not have to be configured as a @Bean, because the DSL registers the container as a bean. The following example shows how to do so:

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}

Notice that, in this case, the adapter is given an id (topic2Adapter). The container is registered in the application context with a name of topic2Adapter.container. If the adapter does not have an id property, the container’s bean name is the container’s fully qualified class name plus #n, where n is incremented for each container.

Inbound Channel Adapter

The KafkaMessageSource provides a pollable channel adapter implementation.

Configuration

  • Java DSL

  • Kotlin

  • Java

  • XML

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
                          e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
    integrationFlow(Kafka.inboundChannelAdapter(cf,
        ConsumerProperties(TEST_TOPIC3).also {
            it.groupId = "kotlinMessageSourceGroup"
        }),
        { poller(Pollers.fixedDelay(100)) }) {
        handle { m ->

        }
    }
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
	consumerProperties.setGroupId("myGroupId");
	consumerProperties.setClientId("myClientId");
    retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        consumer-properties="consumerProperties1"
        ack-factory="ackFactory"
        channel="inbound"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>

<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="max.poll.records" value="1"/>
        </map>
    </constructor-arg>
</bean>

<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
    <constructor-arg name="topics" value="topic1"/>
    <property name="groupId" value="group"/>
    <property name="clientId" value="client"/>
</bean>

Refer to the javadocs for available properties.

By default, max.poll.records must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a DefaultKafkaConsumerFactory. You can set the property allowMultiFetch to true to override this behavior.

You must poll the consumer within max.poll.interval.ms to avoid a rebalance. If you set allowMultiFetch to true you must process all the retrieved records, and poll again, within max.poll.interval.ms.

Messages emitted by this adapter contain a header kafka_remainingRecords with a count of records remaining from the previous poll.

Starting with version 6.2, the KafkaMessageSource supports an ErrorHandlingDeserializer provided in the consumer properties. A DeserializationException is extracted from record headers and thrown to the called. With a SourcePollingChannelAdapter this exception is wrapped into an ErrorMessage and published to its errorChannel. See ErrorHandlingDeserializer documentation for more information.

Outbound Gateway

The outbound gateway is for request/reply operations. It differs from most Spring Integration gateways in that the sending thread does not block in the gateway, and the reply is processed on the reply listener container thread. If your code invokes the gateway behind a synchronous Messaging Gateway, the user thread blocks there until the reply is received (or a timeout occurs).

The gateway does not accept requests until the reply container has been assigned its topics and partitions. It is suggested that you add a ConsumerRebalanceListener to the template’s reply container properties and wait for the onPartitionsAssigned call before sending messages to the gateway.

The KafkaProducerMessageHandler sendTimeoutExpression default is delivery.timeout.ms Kafka producer property + 5000 so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may time out the send(), while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default, so you may wish to reduce it to get more timely failures.

Configuration

The following example shows how to configure a gateway:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>

Refer to the javadocs for available properties.

Notice that the same class as the outbound channel adapter is used, the only difference being that the KafkaTemplate passed into the constructor is a ReplyingKafkaTemplate. See the Spring for Apache Kafka documentation for more information.

The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter. The reply topic is determined as follows:

  1. A message header named KafkaHeaders.REPLY_TOPIC (if present, it must have a String or byte[] value) is validated against the template’s reply container’s subscribed topics.

  2. If the template’s replyContainer is subscribed to only one topic, it is used.

You can also specify a KafkaHeaders.REPLY_PARTITION header to determine a specific partition to be used for replies. Again, this is validated against the template’s reply container’s subscriptions.

Alternatively, you can also use a configuration similar to the following bean:

@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}

Inbound Gateway

The inbound gateway is for request/reply operations.

Configuration

The following example shows how to configure an inbound gateway:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlow
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>

Refer to the javadocs for available properties.

When a RetryTemplate is provided, delivery failures are retried according to its retry policy. If an error-channel is also supplied, a default ErrorMessageSendingRecoverer will be used as the recovery callback after retries are exhausted. You can also use the recovery-callback to specify some other action to take in that case, or set it to null to throw the final exception to the listener container so it is handled there.

When building an ErrorMessage (for use in the error-channel or recovery-callback), you can customize the error message by setting the error-message-strategy property. By default, a RawRecordHeaderErrorMessageStrategy is used, to provide access to the converted message as well as the raw ConsumerRecord.

This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

The following example shows how to configure a simple upper case converter with the Java DSL:

Alternatively, you could configure an upper-case converter by using code similar to the following:

@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlow
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}

You can also use the container factory that is used for @KafkaListener annotations to create ConcurrentMessageListenerContainer instances for other purposes. See the Spring for Apache Kafka documentation and Message-driven Channel Adapter for examples.

Channels Backed by Apache Kafka Topics

Spring Integration has MessageChannel implementations backed by an Apache Kafka topic for persistence.

Each channel requires a KafkaTemplate for the sending side and either a listener container factory (for subscribable channels) or a KafkaMessageSource for a pollable channel.

Java DSL Configuration

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />

Message Conversion

A StringJsonMessageConverter is provided. See the Spring for Apache Kafka documentation for more information.

When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted. This is achieved by setting the payload-type attribute (payloadType property) on the adapter. The following example shows how to do so in XML configuration:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Thing"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

The following example shows how to set the payload-type attribute (payloadType property) on the adapter in Java configuration:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
    return kafkaMessageDrivenChannelAdapter;
}

Null Payloads and Log Compaction 'Tombstone' Records

Spring Messaging Message<?> objects cannot have null payloads. When you use the endpoints for Apache Kafka, null payloads (also known as tombstone records) are represented by a payload of type KafkaNull. See the Spring for Apache Kafka documentation for more information.

The POJO methods for Spring Integration endpoints can use a true null value instead of KafkaNull. To do so, mark the parameter with @Payload(required = false). The following example shows how to do so:

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

Calling a Spring Integration flow from a KStream

You can use a MessagingTransformer to invoke an integration flow from a KStream:

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a @Qualifier if needed.

Performance Considerations for read/process/write Scenarios

Many applications consume from a topic, perform some processing and write to another topic. In most, cases, if the write fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic. This functionality is supported by the underlying message listener container, together with a suitably configured error handler. However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container. When consuming single records, this is achieved by setting the sync property on the outbound adapter. However, when consuming batches, using sync causes a significant performance degradation because the application would wait for the result of each send before sending the next message. You also can perform multiple sends and then wait for the results of those sends afterwards. This is achieved by adding a futuresChannel to the message handler. To enable the feature add KafkaIntegrationHeaders.FUTURE_TOKEN to the outbound messages; this can then be used to correlate a Future to a particular sent message. Here is an example of how you might use this feature:

@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}