For the latest stable version, please use spring-cloud-stream 4.2.0! |
Sender Result Channel
Starting with version 4.0.3, you can configure the resultMetadataChannel
to receive SenderResult<?>
s to determine success/failure of sends.
The SenderResult
contains correlationMetadata
to allow you to correlate results with sends; it also contains RecordMetadata
, which indicates the TopicPartition
and offset of the sent record.
The resultMetadataChannel
must be a FluxMessageChannel
instance.
Here is an example of how to use this feature, with correlation metadata of type Integer
:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
To set the correlation metadata on an output record, set the CORRELATION_ID
header:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
When using the feature with a Function
, the function output type must be a Message<?>
with the correlation id header set to the desired value.
Metadata should be unique, at least for the duration of the send.