Scatter-Gather

Starting with version 4.1, Spring Integration provides an implementation of the scatter-gather enterprise integration pattern. It is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results. As noted in Enterprise Integration Patterns, it is a component for scenarios such as “best quote”, where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.

Previously, the pattern could be configured by using discrete components. This enhancement brings more convenient configuration.

The ScatterGatherHandler is a request-reply endpoint that combines a PublishSubscribeChannel (or a RecipientListRouter) and an AggregatingMessageHandler. The request message is sent to the scatter channel, and the ScatterGatherHandler waits for the reply that the aggregator sends to the outputChannel.

Functionality

The Scatter-Gather pattern suggests two scenarios: “auction” and “distribution”. In both cases, the aggregation function is the same and provides all the options available for the AggregatingMessageHandler. (Actually, the ScatterGatherHandler requires only an AggregatingMessageHandler as a constructor argument.) See Aggregator for more information.

Auction

The auction Scatter-Gather variant uses “publish-subscribe” logic for the request message, where the “scatter” channel is a PublishSubscribeChannel with apply-sequence="true". However, this channel can be any MessageChannel implementation (as is the case with the request-channel in the ContentEnricher — see Content Enricher). However, in this case, you should create your own custom correlationStrategy for the aggregation function.

Distribution

The distribution Scatter-Gather variant is based on the RecipientListRouter (see RecipientListRouter) with all available options for the RecipientListRouter. This is the second ScatterGatherHandler constructor argument. If you want to rely on only the default correlationStrategy for the recipient-list-router and the aggregator, you should specify apply-sequence="true". Otherwise, you should supply a custom correlationStrategy for the aggregator. Unlike the PublishSubscribeChannel variant (the auction variant), having a recipient-list-router selector option lets filter target suppliers based on the message. With apply-sequence="true", the default sequenceSize is supplied, and the aggregator can release the group correctly. The distribution option is mutually exclusive with the auction option.

For both the auction and the distribution variants, the request (scatter) message is enriched with the gatherResultChannel header to wait for a reply message from the aggregator.

By default, all suppliers should send their result to the replyChannel header (usually by omitting the output-channel from the ultimate endpoint). However, the gatherChannel option is also provided, letting suppliers send their reply to that channel for the aggregation.

Configuring a Scatter-Gather Endpoint

The following example shows Java configuration for the bean definition for Scatter-Gather:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

In the preceding example, we configure the RecipientListRouter distributor bean with applySequence="true" and the list of recipient channels. The next bean is for an AggregatingMessageHandler. Finally, we inject both those beans into the ScatterGatherHandler bean definition and mark it as a @ServiceActivator to wire the scatter-gather component into the integration flow.

The following example shows how to configure the <scatter-gather> endpoint by using the XML namespace:

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 The id of the endpoint. The ScatterGatherHandler bean is registered with an alias of id + '.handler'. The RecipientListRouter bean is registered with an alias of id + '.scatterer'. The AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'. Optional. (The BeanFactory generates a default id value.)
2 Lifecycle attribute signaling whether the endpoint should be started during application context initialization. In addition, the ScatterGatherHandler also implements Lifecycle and starts and stops gatherEndpoint, which is created internally if a gather-channel is provided. Optional. (The default is true.)
3 The channel on which to receive request messages to handle them in the ScatterGatherHandler. Required.
4 The channel to which the ScatterGatherHandler sends the aggregation results. Optional. (Incoming messages can specify a reply channel themselves in the replyChannel message header).
5 The channel to which to send the scatter message for the auction scenario. Optional. Mutually exclusive with the <scatterer> sub-element.
6 The channel on which to receive replies from each supplier for the aggregation. It is used as the replyChannel header in the scatter message. Optional. By default, the FixedSubscriberChannel is created.
7 The order of this component when more than one handler is subscribed to the same DirectChannel (use for load balancing purposes). Optional.
8 Specifies the phase in which the endpoint should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest. By default, this value is Integer.MAX_VALUE, meaning that this container starts as late as possible and stops as soon as possible. Optional.
9 The timeout interval to wait when sending a reply Message to the output-channel. By default, the send blocks for one second. It applies only if the output channel has some 'sending' limitations — for example, a QueueChannel with a fixed 'capacity' that is full. In this case, a MessageDeliveryException is thrown. The send-timeout is ignored for AbstractSubscribableChannel implementations. For group-timeout(-expression), the MessageDeliveryException from the scheduled expire task leads this task to be rescheduled. Optional.
10 Lets you specify how long the scatter-gather waits for the reply message before returning. By default, it waits indefinitely. 'null' is returned if the reply times out. Optional. It defaults to -1, meaning to wait indefinitely.
11 Specifies whether the scatter-gather must return a non-null value. This value is true by default. Consequently, a ReplyRequiredException is thrown when the underlying aggregator returns a null value after gather-timeout. Note, if null is a possibility, the gather-timeout should be specified to avoid an indefinite wait.
12 The <recipient-list-router> options. Optional. Mutually exclusive with scatter-channel attribute.
13 The <aggregator> options. Required.

Error Handling

Since Scatter-Gather is a multi request-reply component, error handling has some extra complexity. In some cases, it is better to just catch and ignore downstream exceptions if the ReleaseStrategy allows the process to finish with fewer replies than requests. In other cases something like a “compensation message” should be considered for returning from sub-flow, when an error happens.

Every async sub-flow should be configured with a errorChannel header for the proper error message sending from the MessagePublishingErrorHandler. Otherwise, an error will be sent to the global errorChannel with the common error handling logic. See Error Handling for more information about async error processing.

Synchronous flows may use an ExpressionEvaluatingRequestHandlerAdvice for ignoring the exception or returning a compensation message. When an exception is thrown from one of the sub-flows to the ScatterGatherHandler, it is just re-thrown to upstream. This way all other sub-flows will work for nothing and their replies are going to be ignored in the ScatterGatherHandler. This might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer.

Starting with version 5.1.3, the ScatterGatherHandler is supplied with the errorChannelName option. It is populated to the errorChannel header of the scatter message and is used in the when async error happens or can be used in the regular synchronous sub-flow for directly sending an error message.

The sample configuration below demonstrates async error handling by returning a compensation message:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

To produce a proper reply, we have to copy headers (including replyChannel and errorChannel) from the failedMessage of the MessagingException that has been sent to the scatterGatherErrorChannel by the MessagePublishingErrorHandler. This way the target exception is returned to the gatherer of the ScatterGatherHandler for reply messages group completion. Such an exception payload can be filtered out in the MessageGroupProcessor of the gatherer or processed other way downstream, after the scatter-gather endpoint.

Before sending scattering results to the gatherer, ScatterGatherHandler reinstates the request message headers, including reply and error channels if any. This way errors from the AggregatingMessageHandler are going to be propagated to the caller, even if an async hand off is applied in scatter recipient subflows. For successful operation, a gatherResultChannel, originalReplyChannel and originalErrorChannel headers must be transferred back to replies from scatter recipient subflows. In this case a reasonable, finite gatherTimeout must be configured for the ScatterGatherHandler. Otherwise it is going to be blocked waiting for a reply from the gatherer forever, by default.