Implementing Custom Binders
In order to implement a custom Binder
, all you need is to:
-
Add the required dependencies
-
Provide a ProvisioningProvider implementation
-
Provide a MessageProducer implementation
-
Provide a MessageHandler implementation
-
Provide a Binder implementation
-
Create a Binder Configuration
-
Define your binder in META-INF/spring.binders
Add the required dependencies
Add the spring-cloud-stream
dependency to your project (eg. for Maven):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring.cloud.stream.version}</version>
</dependency>
Provide a ProvisioningProvider implementation
The ProvisioningProvider
is responsible for the provisioning of consumer and producer destinations, and is required to convert the logical destinations included in the application.yml or application.properties file in physical destination references.
Below an example of ProvisioningProvider implementation that simply trims the destinations provided via input/output bindings configuration:
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
@Override
public ProducerDestination provisionProducerDestination(
final String name,
final ProducerProperties properties) {
return new FileMessageDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(
final String name,
final String group,
final ConsumerProperties properties) {
return new FileMessageDestination(name);
}
private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
private final String destination;
private FileMessageDestination(final String destination) {
this.destination = destination;
}
@Override
public String getName() {
return destination.trim();
}
@Override
public String getNameForPartition(int partition) {
throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
}
}
}
Provide a MessageProducer implementation
The MessageProducer
is responsible for consuming events and handling them as messages to the client application that is configured to consume such events.
Here is an example of MessageProducer implementation that extends the MessageProducerSupport
abstraction in order to poll on a file that matches the trimmed destination name and is located in the project path, while also archiving read messages and discarding consequent identical messages:
public class FileMessageProducer extends MessageProducerSupport {
public static final String ARCHIVE = "archive.txt";
private final ConsumerDestination destination;
private String previousPayload;
public FileMessageProducer(ConsumerDestination destination) {
this.destination = destination;
}
@Override
public void doStart() {
receive();
}
private void receive() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
String payload = getPayload();
if(payload != null) {
Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
archiveMessage(payload);
sendMessage(receivedMessage);
}
}, 0, 50, MILLISECONDS);
}
private String getPayload() {
try {
List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
String currentPayload = allLines.get(allLines.size() - 1);
if(!currentPayload.equals(previousPayload)) {
previousPayload = currentPayload;
return currentPayload;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
private void archiveMessage(String payload) {
try {
Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
When implementing a custom binder, this step is not strictly mandatory as you could always resort to using an already existing MessageProducer implementation! |
Provide a MessageHandler implementation
The MessageHandler
provides the logic required to produce an event.
Here is an example of MessageHandler implementation:
public class FileMessageHandler implements MessageHandler{
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//write message to file
}
}
When implementing a custom binder, this step is not strictly mandatory as you could always resort to using an already existing MessageHandler implementation! |
Provide a Binder implementation
You are now able to provide your own implementation of the Binder
abstraction. This can be easily done by:
-
extending the
AbstractMessageChannelBinder
class -
specifying your ProvisioningProvider as a generic argument of the AbstractMessageChannelBinder
-
overriding the
createProducerMessageHandler
andcreateConsumerEndpoint
methods
eg.:
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
public FileMessageBinder(
String[] headersToEmbed,
FileMessageBinderProvisioner provisioningProvider) {
super(headersToEmbed, provisioningProvider);
}
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
final ProducerProperties producerProperties,
final MessageChannel errorChannel) throws Exception {
return message -> {
String fileName = destination.getName();
String payload = new String((byte[])message.getPayload()) + "\n";
try {
Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
@Override
protected MessageProducer createConsumerEndpoint(
final ConsumerDestination destination,
final String group,
final ConsumerProperties properties) throws Exception {
return new FileMessageProducer(destination);
}
}
Create a Binder Configuration
It is strictly required that you create a Spring Configuration to initialize the bean for your binder implementation (and all other beans that you might need):
@Configuration
public class FileMessageBinderConfiguration {
@Bean
@ConditionalOnMissingBean
public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
return new FileMessageBinderProvisioner();
}
@Bean
@ConditionalOnMissingBean
public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
return new FileMessageBinder(null, fileMessageBinderProvisioner);
}
}
Define your binder in META-INF/spring.binders
Finally, you must define your binder in a META-INF/spring.binders
file on the classpath, specifying both the name of the binder and the full qualified name of your Binder Configuration class:
myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration