Thread Safety
When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:
-
Use
n
containers withconcurrency=1
with a prototype scopedMessageListener
bean so that each container gets its own instance (this is not possible when using@KafkaListener
). -
Keep the state in
ThreadLocal<?>
instances. -
Have the singleton listener delegate to a bean that is declared in
SimpleThreadScope
(or a similar scope).
To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent
when each thread exits.
You can consume these events with an ApplicationListener
or @EventListener
method to remove ThreadLocal<?>
instances or remove()
thread-scoped beans from the scope.
Note that SimpleThreadScope
does not destroy beans that have a destruction interface (such as DisposableBean
), so you should destroy()
the instance yourself.
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. |