@KafkaListener @Payload Validation

Starting with version 2.2, it is now easier to add a Validator to validate @KafkaListener @Payload arguments. Previously, you had to configure a custom DefaultMessageHandlerMethodFactory and add it to the registrar. Now, you can add the validator to the registrar itself. The following code shows how to do so:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }

}
When you use Spring Boot with the validation starter, a LocalValidatorFactoryBean is auto-configured, as the following example shows:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

The following examples show how to validate:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

Starting with version 2.5.11, validation now works on payloads for @KafkaHandler methods in a class-level listener. See @KafkaListener on a Class.

Starting with version 3.1, you can perform validation in an ErrorHandlingDeserializer instead. See Using ErrorHandlingDeserializer for more information.