Class ListenerUtils
java.lang.Object
org.springframework.kafka.listener.ListenerUtils
Listener utilities.
- Since:
- 2.0
- Author:
- Gary Russell, Francois Rosiere, Antonio Tomac, Wang Zhiyang, Sanghyeok An
-
Method Summary
Modifier and TypeMethodDescriptionstatic voidconditionalSleep(Supplier<Boolean> shouldSleepCondition, long interval) Sleep for the desired timeout, as long as shouldSleepCondition supplies true.static voidconditionalSleepWithPoll(Supplier<Boolean> shouldSleepCondition, long interval, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Sleep for the desired timeout, as long as shouldSleepCondition supplies true.static org.apache.kafka.clients.consumer.OffsetAndMetadatacreateOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) Create a newOffsetAndMetadatausing the given container and offset.static ListenerTypedetermineListenerType(Object listener) Determine the type of the listener.static voidstoppableSleep(@Nullable MessageListenerContainer container, long interval) Sleep for the desired timeout, as long as the container continues to run.static voidunrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions, Map<Thread, Long> lastIntervals, @Nullable MessageListenerContainer container) Sleep according to theBackOff; when theBackOffExecutionreturnsBackOffExecution.STOPsleep for the previous backOff.
-
Method Details
-
determineListenerType
Determine the type of the listener.- Parameters:
listener- the listener.- Returns:
- the
ListenerType.
-
unrecoverableBackOff
public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions, Map<Thread, Long> lastIntervals, @Nullable MessageListenerContainer container) throws InterruptedExceptionSleep according to theBackOff; when theBackOffExecutionreturnsBackOffExecution.STOPsleep for the previous backOff.- Parameters:
backOff- theBackOffto create a newBackOffExecution.executions- a thread local containing theBackOffExecutionfor this thread.lastIntervals- a thread local containing the previousBackOffinterval for this thread.container- the container or parent container.- Throws:
InterruptedException- if the thread is interrupted.- Since:
- 3.1
-
stoppableSleep
public static void stoppableSleep(@Nullable MessageListenerContainer container, long interval) throws InterruptedException Sleep for the desired timeout, as long as the container continues to run.- Parameters:
container- the container.interval- the timeout.- Throws:
InterruptedException- if the thread is interrupted.- Since:
- 2.7
-
conditionalSleep
public static void conditionalSleep(Supplier<Boolean> shouldSleepCondition, long interval) throws InterruptedException Sleep for the desired timeout, as long as shouldSleepCondition supplies true.- Parameters:
shouldSleepCondition- to.interval- the timeout.- Throws:
InterruptedException- if the thread is interrupted.- Since:
- 3.0.9
-
conditionalSleepWithPoll
public static void conditionalSleepWithPoll(Supplier<Boolean> shouldSleepCondition, long interval, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) throws InterruptedExceptionSleep for the desired timeout, as long as shouldSleepCondition supplies true. This method requires that the consumer is paused; otherwise, ConsumerRecord may be lost. Periodically callsConsumer.poll(Duration.ZERO)to prevent a paused consumer from being rebalanced.- Parameters:
shouldSleepCondition- to.interval- the timeout.consumer- the kafka consumer to call poll().- Throws:
InterruptedException- if the thread is interrupted.
-
createOffsetAndMetadata
public static org.apache.kafka.clients.consumer.OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) Create a newOffsetAndMetadatausing the given container and offset.- Parameters:
container- a container.offset- an offset.- Returns:
- an offset and metadata.
- Since:
- 2.8.6
-