This part of the reference shows how to use the spring-integration-kafka
module of Spring Integration.
This documentation pertains to versions 2.0.0 and above; for documentation for earlier releases, see the 1.3.x README.
Spring Integration Kafka is now based on the Spring for Apache Kafka project. It provides the following components:
These are discussed in the following sections.
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka topics.
The channel is defined in the application context and then wired into the application that sends messages to Kafka.
Sender applications can publish to Kafka via Spring Integration messages, which are internally converted
to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be
used to populate the payload of the Kafka message, and (by default) the kafka_messageKey
header of the Spring
Integration message will be used to populate the key of the Kafka message.
The target topic and partition for publishing the message can be customized through the kafka_topic
and kafka_partitionId
headers, respectively.
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic, and
target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive
pairs of attributes topic
/topic-expression
, message-key
/message-key-expression
, and
partition-id
/partition-id-expression
, to allow the specification of topic
,message-key
and partition-id
respectively as static values on the adapter, or to dynamically evaluate their values at runtime against
the request message.
Important | |
---|---|
The |
NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
.
The adapter requires a KafkaTemplate
.
Here is an example of how the Kafka outbound channel adapter is configured with XML:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-template="template" auto-startup="false" channel="inputToKafka" topic="foo" sync="false" message-key-expression="'bar'" send-failure-channel="failures" send-success-channel="successes" error-message-strategy="ems" partition-id-expression="2"> </int-kafka:outbound-channel-adapter> <bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg> <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092" /> ... <!-- more producer properties --> </map> </constructor-arg> </bean> </constructor-arg> </bean>
As you can see, the adapter requires a KafkaTemplate
which, in turn, requires a suitably configured KafkaProducerFactory
.
When using Java Configuration:
@Bean @ServiceActivator(inputChannel = "toKafka") public MessageHandler handler() throws Exception { KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate()); handler.setTopicExpression(new LiteralExpression("someTopic")); handler.setMessageKeyExpression(new LiteralExpression("someKey")); handler.setFailureChannel(failures()); return handler; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // set more properties return new DefaultKafkaProducerFactory<>(props); }
When using Spring Integration Java DSL:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)); } @Bean public IntegrationFlow sendToKafkaFlow() { return f -> f .<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null) .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC1) .timestampExpression("T(Long).valueOf('1487694048633')"), e -> e.id("kafkaProducer1"))) .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC2) .timestamp(m -> 1487694048644L), e -> e.id("kafkaProducer2"))) ); } @Bean public DefaultKafkaHeaderMapper mapper() { return new DefaultKafkaHeaderMapper(); } private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler( ProducerFactory<Integer, String> producerFactory, String topic) { return Kafka .outboundChannelAdapter(producerFactory) .messageKey(m -> m .getHeaders() .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)) .headerMapper(mapper()) .partitionId(m -> 10) .topicExpression("headers[kafka_topic] ?: '" + topic + "'") .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic)); }
If a send-failure-channel
is provided, if a send failure is received (sync or async), an ErrorMessage
is sent to the channel.
The payload is a KafkaSendFailureException
with properties failedMessage
, record
(the ProducerRecord
) and cause
.
The DefaultErrorMessageStrategy
can be overridden via the error-message-strategy
property.
If a send-success-channel
is provided, a message with a payload of type org.apache.kafka.clients.producer.RecordMetadata
will be sent after a successful send.
When using Java configuration, use setOutputChannel
for this purpose.
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) uses a spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
.
Starting with spring-integration-kafka version 2.1, the mode
attribute is available (record
or batch
, default record
).
For record
mode, each message payload is converted from a single ConsumerRecord
; for mode batch
the payload is a list of objects which are converted from all the ConsumerRecord
s returned by the consumer poll.
As with the batched @KafkaListener
, the KafkaHeaders.RECEIVED_MESSAGE_KEY
, KafkaHeaders.RECEIVED_PARTITION_ID
, KafkaHeaders.RECEIVED_TOPIC
and KafkaHeaders.OFFSET
headers are also lists with, positions corresponding to the position in the payload.
An example of xml configuration variant is shown here:
<int-kafka:message-driven-channel-adapter id="kafkaListener" listener-container="container1" auto-startup="false" phase="100" send-timeout="5000" mode="record" retry-template="template" recovery-callback="callback" error-message-strategy="ems" channel="someChannel" error-channel="errorChannel" /> <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> <constructor-arg> <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092" /> ... </map> </constructor-arg> </bean> </constructor-arg> <constructor-arg> <bean class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg name="topics" value="foo" /> </bean> </constructor-arg> </bean>
When using Java Configuration:
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) { KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return kafkaMessageDrivenChannelAdapter; } @Bean public KafkaMessageListenerContainer<String, String> container() throws Exception { ContainerProperties properties = new ContainerProperties(this.topic); // set more properties return new KafkaMessageListenerContainer<>(consumerFactory(), properties); } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // set more properties return new DefaultKafkaConsumerFactory<>(props); }
When using Spring Integration Java DSL:
@Bean public IntegrationFlow topic1ListenerFromKafkaFlow() { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1) .configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL) .id("topic1ListenerContainer")) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(), new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true)) .filter(Message.class, m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101, f -> f.throwExceptionOnRejection(true)) .<String, String>transform(String::toUpperCase) .channel(c -> c.queue("listeningFromKafkaResults1")) .get(); }
Received messages will have certain headers populated.
Refer to the KafkaHeaders
class for more information.
Important | |
---|---|
The |
When a retry-template
is provided, delivery failures will be retried according to its retry policy.
An error-channel
is not allowed in this case.
The recovery-callback
can be used to handle the error when retries are exhausted.
In most cases, this will be an ErrorMessageSendingRecoverer
which will send the ErrorMessage
to a channel.
When building ErrorMessage
(for use in the error-channel
or recovery-callback
), you can customize the error message using the error-message-strategy
property.
By default, a RawRecordHeaderErrorMessageStrategy
is used; providing access to the converted message as well as the raw ConsumerRecord
.
Starting with Spring for Apache Kafka version 2.2 (Spring Integration Kafka 3.1), the container factory used for @KafkaListener
annotations can also be used to create ConcurrentMessageListenerContainer
s for other purposes.
See the section called “Container factory” for an example.
With the Java DSL, the container does not have to be configured as a @Bean
because the DSL will register the container as a bean.
@Bean public IntegrationFlow topic2ListenerFromKafkaFlow() { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), KafkaMessageDrivenChannelAdapter.ListenerMode.record) .id("topic2Adapter")) ... get(); }
Notice that, in this case, the adapter is given an id
("topic2Adapter"); the container will be registered in the application context with the name topic2Adapter.container
.
If the adapter does not have an id
property, the container’s bean name will be the container’s fully qualified class name + #n
where n
is incremented for each container.
The outbound gateway is for request/reply operations; it is different to most Spring Integration gateways in that the sending thread does not block in the gateway, the reply is processed on the reply listener container thread. Of course, if user code invokes the gateway behind a synchronous Messaging Gateway, the user thread will block there until the reply is received (or a timeout occurs).
Important | |
---|---|
the gateway will not accept requests until the reply container has been assigned its topics and partitions.
It is suggested that you add a |
Here is an example of configuring a gateway, with Java Configuration:
@Bean @ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies") public KafkaProducerMessageHandler<String, String> outGateway( ReplyingKafkaTemplate<String, String, String> kafkaTemplate) { return new KafkaProducerMessageHandler<>(kafkaTemplate); }
Notice that the same class as the outbound channel adapter is used, the only difference being that the kafka template passed into the constructor is a ReplyingKafkaTemplate
- see the section called “ReplyingKafkaTemplate” for more information.
The outbound topic, partition, key etc, are determined the same way as the outbound adapter. The reply topic is determined as follows:
KafkaHeaders.REPLY_TOPIC
, if present (must have a String
or byte[]
value) - validated against the template’s reply container subscribed topics.
replyContainer
is subscribed to just one topic, it will be used.
You can also specify a KafkaHeaders.REPLY_PARTITION
header to determine a specific partition to be used for replies.
Again, this is validated against the template’s reply container subscriptions.
Configuring with the Java DSL:
@Bean public IntegrationFlow outboundGateFlow( ReplyingKafkaTemplate<String, String, String> kafkaTemplate) { return IntegrationFlows.from("kafkaRequests") .handle(Kafka.outboundGateway(kafkaTemplate)) .channel("kafkaReplies") .get(); }
Or:
@Bean public IntegrationFlow outboundGateFlow() { return IntegrationFlows.from("kafkaRequests") .handle(Kafka.outboundGateway(producerFactory(), replyContainer()) .configureKafkaTemplate(t -> t.replyTimeout(30_000))) .channel("kafkaReplies") .get(); }
XML configuration is not currently available for this component.
The inbound gateway is for request/reply operations.
Configuring an inbound gateway with Java Configuration:
@Bean public KafkaInboundGateway<Integer, String, String> inboundGateway( AbstractMessageListenerContainer<Integer, String>container, KafkaTemplate<Integer, String> replyTemplate) { KafkaInboundGateway<Integer, String, String> gateway = new KafkaInboundGateway<>(container, replyTemplate); gateway.setRequestChannel(requests); gateway.setReplyChannel(replies); gateway.setReplyTimeout(30_000); return gateway; }
Configuring a simple upper case converter with the Java DSL:
@Bean public IntegrationFlow serverGateway( ConcurrentMessageListenerContainer<Integer, String> container, KafkaTemplate<Integer, String> replyTemplate) { return IntegrationFlows .from(Kafka.inboundGateway(container, template) .replyTimeout(30_000)) .<String, String>transform(String::toUpperCase) .get(); }
Or:
@Bean public IntegrationFlow serverGateway() { return IntegrationFlows .from(Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory()) .replyTimeout(30_000)) .<String, String>transform(String::toUpperCase) .get(); }
XML configuration is not currently available for this component.
Starting with Spring for Apache Kafka version 2.2 (Spring Integration Kafka 3.1), the container factory used for @KafkaListener
annotations can also be used to create ConcurrentMessageListenerContainer
s for other purposes.
See the section called “Container factory” and Section 5.1.3, “Message Driven Channel Adapter” for examples.
A StringJsonMessageConverter
is provided, see Section 4.1.5, “Serialization/Deserialization and Message Conversion” for more information.
When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted.
This is achieved by setting the payload-type
attribute (payloadType
property) on the adapter.
<int-kafka:message-driven-channel-adapter id="kafkaListener" listener-container="container1" auto-startup="false" phase="100" send-timeout="5000" channel="nullChannel" message-converter="messageConverter" payload-type="com.example.Foo" error-channel="errorChannel" /> <bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) { KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); kafkaMessageDrivenChannelAdapter.setMessageConverter(converter()); kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class); return kafkaMessageDrivenChannelAdapter; }
See the Spring for Apache Kafka Project Page for a matrix of compatible spring-kafka
and kafka-clients
versions.
The 2.1.x branch introduced the following changes:
spring-kafka
1.1.x; including support of batch payloads
sync
outbound requests via XML configuration
payload-type
for inbound channel adapters
The 2.3.x branch introduced the following changes:
spring-kafka
1.3.x; including support for transactions and header mapping provided by kafka-clients
0.11.0.0