Error Handling

In this section we’ll explain the general idea behind error handling mechanisms provided by the framework. We’ll be using Rabbit binder as an example, since individual binders define different set of properties for certain supported mechanisms specific to underlying broker capabilities (such as Kafka binder).

Errors happen, and Spring Cloud Stream provides several flexible mechanisms to deal with them. Note, the techniques are dependent on binder implementation and the capability of the underlying messaging middleware as well as programming model (more on this later).

Whenever Message handler (function) throws an exception, it is propagated back to the binder, at which point binder will make several attempts at re-trying the same message (3 by default) using RetryTemplate provided by the Spring Retry library. If retries are unsuccessful it is up to the error handling mechanism which may drop the message, re-queue the message for re-processing or send the failed message to DLQ.

Both Rabbit and Kafka support these concepts (especially DLQ). However, other binders may not, so refer to your individual binder’s documentation for details on supported error-handling options.

Keep in mind however, the reactive function does NOT qualify as a Message handler, since it does not handle individual messages and instead provides a way to connect stream (i.e., Flux) provided by the framework with the one provided by the user. Why is this important? That is because anything you read later in this section with regard to Retry Template, dropping failed messages, retrying, DLQ and configuration properties that assist with all of it only applies to Message handlers (i.e., imperative functions).

Reactive API provides a very rich library of its own operators and mechanisms to assist you with error handling specific to variety of reactive uses cases which are far more complex then simple Message handler cases, So use them, such as public final Flux<T> retryWhen(Retry retrySpec); that you can find in reactor.core.publisher.Flux.

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

Drop Failed Messages

By default, the system provides error handlers. The first error handler will simply log error message. The second error handler is binder specific error handler which is responsible for handling error message in the context of a specific messaging system (e.g., send to DLQ). But since no additional error handling configuration was provided (in this current scenario) this handler will not do anything. So essentially after being logged, the message will be dropped.

While acceptable in some cases, for most cases, it is not, and we need some recovery mechanism to avoid message loss.

Handle Error Messages

In the previous section we mentioned that by default messages that resulted in error are effectively logged and dropped. The framework also exposes mechanism for you to provide custom error handler (i.e., to send notification or write to database, etc). You can do so by adding Consumer that is specifically designed to accept ErrorMessage which aside form all the information about the error (e.g., stack trace etc) contains the original message (the one that triggered the error). NOTE: Custom error handler is mutually exclusive with framework provided error handlers (i.e., logging and binder error handler - see previous section) to ensure that they do not interfere.

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

To identify such consumer as an error handler all you need is to provide error-handler-definition property pointing to the function name - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler.

For example, for binding name uppercase-in-0 the property would look like this:

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

And if you used special mapping instruction to map binding to a more readable name - spring.cloud.stream.function.bindings.uppercase-in-0=upper, then this property would look like this:

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
If by accident you declare such handler as a Function, it will still work with the exception that nothing is going to be done with its output. However, given that such handler is still relying on functionality provided by Spring Cloud Function, you can also benefit from function composition in the event your handler has some complexity which you would like to address through function composition (however unlikely).

Default Error Handler

If you want to have a single error handler for all function beans, you can use the standard spring-cloud-stream mechanism for defining default properties spring.cloud.stream.default.error-handler-definition=myErrorHandler

DLQ - Dead Letter Queue

Perhaps the most common mechanism, DLQ allows failed messages to be sent to a special destination: the Dead Letter Queue.

When configured, failed messages are sent to this destination for subsequent re-processing or auditing and reconciliation.

Consider the following example:

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

As a reminder, in this example uppercase-in-0 segment of the property corresponds to the name of the input destination binding. The consumer segment indicates that it is a consumer property.

When using DLQ, at least the group property must be provided for proper naming of the DLQ destination. However group is often used together with destination property, as in our example.

Aside from some standard properties we also set the auto-bind-dlq to instruct the binder to create and configure DLQ destination for uppercase-in-0 binding which corresponds to uppercase destination (see corresponding property), which results in an additional Rabbit queue named uppercase.myGroup.dlq (see Kafka documentation for Kafka specific DLQ properties).

Once configured, all failed messages are routed to this destination preserving the original message for further actions.

And you can see that the error message contains more information relevant to the original error, as follows:

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

You can also facilitate immediate dispatch to DLQ (without re-tries) by setting max-attempts to '1'. For example,

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

Retry Template

In this section we cover configuration properties relevant to configuration of retry capabilities.

The RetryTemplate is part of the Spring Retry library. While it is out of scope of this document to cover all of the capabilities of the RetryTemplate, we will mention the following consumer properties that are specifically related to the RetryTemplate:

maxAttempts

The number of attempts to process the message.

Default: 3.

backOffInitialInterval

The backoff initial interval on retry.

Default 1000 milliseconds.

backOffMaxInterval

The maximum backoff interval.

Default 10000 milliseconds.

backOffMultiplier

The backoff multiplier.

Default 2.0.

defaultRetryable

Whether exceptions thrown by the listener that are not listed in the retryableExceptions are retryable.

Default: true.

retryableExceptions

A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see defaultRetriable. Example: spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false.

Default: empty.

While the preceding settings are sufficient for the majority of the customization requirements, they may not satisfy certain complex requirements, at which point you may want to provide your own instance of the RetryTemplate. To do so configure it as a bean in your application configuration. The application provided instance will override the one provided by the framework. Also, to avoid conflicts you must qualify the instance of the RetryTemplate you want to be used by the binder as @StreamRetryTemplate. For example,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

As you can see from the above example you don’t need to annotate it with @Bean since @StreamRetryTemplate is a qualified @Bean.

If you need to be more precise with your RetryTemplate, you can specify the bean by name in your ConsumerProperties to associate the specific retry bean per binding.

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>