Seeking to a Specific Offset
In order to seek, your listener must implement ConsumerSeekAware
, which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The registerSeekCallback
is called when the container is started and whenever partitions are assigned.
You should use this callback when seeking at some arbitrary time after initialization.
You should save a reference to the callback.
If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer
), you should store the callback in a ThreadLocal
or some other structure keyed by the listener Thread
.
When using group management, onPartitionsAssigned
is called when partitions are assigned.
You can use this method, for example, for setting initial offsets for the partitions, by calling the callback.
You can also use this method to associate this thread’s callback with the assigned partitions (see the example below).
You must use the callback argument, not the one passed into registerSeekCallback
.
Starting with version 2.5.5, this method is called, even when using manual partition assignment.
onPartitionsRevoked
is called when the container is stopped or Kafka revokes assignments.
You should discard this thread’s callback and remove any associations to the revoked partitions.
The callback has the following methods:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
The two different variants of the seek
methods provide a way to seek to an arbitrary offset.
The method that takes a Function
as an argument to compute the offset was added in version 3.2 of the framework.
This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched).
The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.
seekRelative
was added in version 2.3, to perform relative seeks.
-
offset
negative andtoCurrent
false
- seek relative to the end of the partition. -
offset
positive andtoCurrent
false
- seek relative to the beginning of the partition. -
offset
negative andtoCurrent
true
- seek relative to the current position (rewind). -
offset
positive andtoCurrent
true
- seek relative to the current position (fast forward).
The seekToTimestamp
methods were also added in version 2.3.
When seeking to the same timestamp for multiple partitions in the onIdleContainer or onPartitionsAssigned methods, the second method is preferred because it is more efficient to find the offsets for the timestamps in a single call to the consumer’s offsetsForTimes method.
When called from other locations, the container will gather all timestamp seek requests and make one call to offsetsForTimes .
|
You can also perform seek operations from onIdleContainer()
when an idle container is detected.
See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.
The seekToBeginning method that accepts a collection is useful, for example, when processing a compacted topic and you wish to seek to the beginning every time the application is started:
|
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback
for the appropriate thread.
Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting <Enter>
in the console causes all partitions to seek to the beginning.
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
To make things simpler, version 2.3 added the AbstractConsumerSeekAware
class, which keeps track of which callback is to be used for a topic/partition.
The following example shows how to seek to the last record processed, in each partition, each time the container goes idle.
It also has methods that allow arbitrary external calls to rewind partitions by one record.
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
Version 2.6 added convenience methods to the abstract class:
-
seekToBeginning()
- seeks all assigned partitions to the beginning. -
seekToEnd()
- seeks all assigned partitions to the end. -
seekToTimestamp(long timestamp)
- seeks all assigned partitions to the offset represented by that timestamp.
Example:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}