Class WakingKafkaConsumerTimingAdjuster
- java.lang.Object
-
- org.springframework.kafka.listener.WakingKafkaConsumerTimingAdjuster
-
- All Implemented Interfaces:
KafkaConsumerTimingAdjuster
public class WakingKafkaConsumerTimingAdjuster extends java.lang.Object implements KafkaConsumerTimingAdjuster
Adjusts timing by creating a thread that will wakeup the consumer from polling, considering that, if consumption is paused, it will check for consumption resuming in increments of 'pollTimeout'. This works best if the consumer is handling a single partition.- Since:
- 2.7
- Author:
- Tomaz Fernandes
- See Also:
KafkaConsumerBackoffManager
-
-
Constructor Summary
Constructors Constructor Description WakingKafkaConsumerTimingAdjuster(org.springframework.core.task.TaskExecutor timingAdjustmentTaskExecutor)
WakingKafkaConsumerTimingAdjuster(org.springframework.core.task.TaskExecutor timingAdjustmentTaskExecutor, org.springframework.retry.backoff.Sleeper sleeper)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description long
adjustTiming(org.apache.kafka.clients.consumer.Consumer<?,?> consumerToAdjust, org.apache.kafka.common.TopicPartition topicPartition, long pollTimeout, long timeUntilDue)
Adjusts the timing with the provided parameters.void
setPollTimeoutsForAdjustmentWindow(int pollTimeoutsForAdjustmentWindow)
Sets how many pollTimeouts prior to the dueTimeout the adjustment will take place.void
setTimingAdjustmentThreshold(java.time.Duration timingAdjustmentThreshold)
Sets the threshold for the timing adjustment to take place.
-
-
-
Constructor Detail
-
WakingKafkaConsumerTimingAdjuster
public WakingKafkaConsumerTimingAdjuster(org.springframework.core.task.TaskExecutor timingAdjustmentTaskExecutor, org.springframework.retry.backoff.Sleeper sleeper)
-
WakingKafkaConsumerTimingAdjuster
public WakingKafkaConsumerTimingAdjuster(org.springframework.core.task.TaskExecutor timingAdjustmentTaskExecutor)
-
-
Method Detail
-
setPollTimeoutsForAdjustmentWindow
public void setPollTimeoutsForAdjustmentWindow(int pollTimeoutsForAdjustmentWindow)
Sets how many pollTimeouts prior to the dueTimeout the adjustment will take place. Default is 2.- Parameters:
pollTimeoutsForAdjustmentWindow
- the amount of pollTimeouts in the adjustment window.
-
setTimingAdjustmentThreshold
public void setTimingAdjustmentThreshold(java.time.Duration timingAdjustmentThreshold)
Sets the threshold for the timing adjustment to take place. If the time difference between the probable instant the message will be consumed and the instant it should is lower than this value, no adjustment will be applied. Default is 100ms.- Parameters:
timingAdjustmentThreshold
- the threshold to be set.
-
adjustTiming
public long adjustTiming(org.apache.kafka.clients.consumer.Consumer<?,?> consumerToAdjust, org.apache.kafka.common.TopicPartition topicPartition, long pollTimeout, long timeUntilDue)
Adjusts the timing with the provided parameters.- Specified by:
adjustTiming
in interfaceKafkaConsumerTimingAdjuster
- Parameters:
consumerToAdjust
- theConsumer
that will be adjustedtopicPartition
- theTopicPartition
that will be adjustedpollTimeout
- the pollConfiguration for the consumer's containertimeUntilDue
- the amount of time until the message is due for consumption- Returns:
- the adjusted amount in milliseconds
-
-