Publisher Confirms

There are two mechanisms to get the result of publishing a message; in each case, the connection factory must have publisherConfirmType set ConfirmType.CORRELATED. The "legacy" mechanism is to set the confirmAckChannel to the bean name of a message channel from which you can retrieve the confirmations asynchronously; negative acks are sent to the error channel (if enabled) - see Error Channels.

The preferred mechanism, added in version 3.1 is to use a correlation data header and wait for the result via its Future<Confirm> property. This is particularly useful with a batch listener because you can send multiple messages before waiting for the result. To use this technique, set the useConfirmHeader property to true The following simple application is an example of using this technique:

spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true

spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Autowired
	private StreamBridge bridge;

	@Bean
	Consumer<List<String>> input() {
		return list -> {
			List<MyCorrelationData> results = new ArrayList<>();
			list.forEach(str -> {
				log.info("Received: " + str);
				MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
				results.add(corr);
				this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
						.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
						.build());
			});
			results.forEach(correlation -> {
				try {
					Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
					log.info(confirm + " for " + correlation.getPayload());
					if (correlation.getReturnedMessage() != null) {
						log.error("Message for " + correlation.getPayload() + " was returned ");

						// throw some exception to invoke binder retry/error handling

					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new IllegalStateException(e);
				}
				catch (ExecutionException | TimeoutException e) {
					throw new IllegalStateException(e);
				}
			});
		};
	}


	@Bean
	public ApplicationRunner runner(BatchingRabbitTemplate template) {
		return args -> IntStream.range(0, 10).forEach(i ->
				template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
	}

	@Bean
	public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
		BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
		return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
	}

}

class MyCorrelationData extends CorrelationData {

	private final String payload;

	MyCorrelationData(String id, String payload) {
		super(id);
		this.payload = payload;
	}

	public String getPayload() {
		return this.payload;
	}

}

As you can see, we send each message and then await for the publication results. If the messages can’t be routed, then correlation data is populated with the returned message before the future is completed.

The correlation data must be provided with a unique id so that the framework can perform the correlation.

You cannot set both useConfirmHeader and confirmAckChannel but you can still receive returned messages in the error channel when useConfirmHeader is true, but using the correlation header is more convenient.