Spring Cloud Stream Binder for Apache Pulsar
Spring for Apache Pulsar provides a binder for Spring Cloud Stream that we can use to build event-driven microservices using pub-sub paradigms. In this section, we will go through the basic details of this binder.
Usage
We need to include the following dependency on your application to use Apache Pulsar binder for Spring Cloud Stream.
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}
Overview
The Spring Cloud Stream binder for Apache Pulsar allows the applications to focus on business logic rather than dealing with the lower-level details of managing and maintaining Pulsar. The binder takes care of all those details for the application developer. Spring Cloud Stream brings a powerful programming model based on Spring Cloud Function that allows the app developer to write complex event-driven applications using a functional style. Applications can start from a middleware-neutral manner and then map Pulsar topics as destinations in Spring Cloud Stream through Spring Boot configuration properties. Spring Cloud Stream is built on top of Spring Boot, and when writing an event-driven microservice using Spring Cloud Stream, you are essentially writing a Boot application. Here is a straightforward Spring Cloud Stream application.
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}
@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}
@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}
record Time(String time) {
}
record EnhancedTime(Time time, String extra) {
}
}
The above sample application, a full-blown Spring Boot application, deserves a few explanations. However, on a first pass, you can see that this is just plain Java and a few Spring and Spring Boot annotations.
We have three Bean
methods here - a java.util.function.Supplier
, a java.util.function.Function
, and finally, a java.util.function.Consumer
.
The supplier produces the current time in milliseconds, the function takes this time and then enhances it by adding some random data, and then the consumer logs the enhanced time.
We omitted all the imports for brevity, but nothing Spring Cloud Stream specific in the entire application. How does it become a Spring Cloud Stream application that interacts with Apache Pulsar? You must include the above dependency for the binder in the application. Once that dependency is added, you must provide the following configuration properties.
spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
With this, the above Spring Boot application has become an end-to-end event-driven application based on Spring Cloud Stream.
Because we have the Pulsar binder on the classpath, the application interacts with Apache Pulsar.
If there is only one function in the application, then we don’t need to tell Spring Cloud Stream to activate the function for execution since it does that by default.
If there is more than one such function in the application, as in our example, we need to instruct Spring Cloud Stream which functions we would like to activate.
In our case, we need all of them to be activated, and we do that through the spring.cloud.function.definition
property.
The bean name becomes part of the Spring Cloud Stream binding name by default.
A binding is a fundamentally abstract concept in Spring Cloud Stream, using which the framework communicates with the middleware destination.
Almost everything that Spring Cloud Stream does occurs over a concrete binding.
A supplier has only an output binding; functions have input and output bindings, and consumers have only input binding.
Let’s take as an example our supplier bean - timeSupplier.
The default binding name for this supplier will be timeSupplier-out-0
.
Similarly, the default binding names for the timeProcessor
function will be timeProcessor-in-0
on the inbound and timeProcessor-out-0
on the outbound.
Please refer to the Spring Cloud Stream reference docs for details on how you can change the default binding names.
In most situations, using the default binding names is enough.
We set the destination on the binding names, as shown above.
If a destination is not provided, the binding name becomes the value for the destination as in the case of timeSupplier-out-0
.
When running the above app, you should see that the supplier executes every second, which is then consumed by the function and enhances the time consumed by the logger consumer.
Message Conversion in Binder-based Applications
In the above sample application, we provided no schema information for message conversion.
That is because, by default, Spring Cloud Stream uses its message conversion mechanism using the messaging support established in Spring Framework through the Spring Messaging project.
Unless specified, Spring Cloud Stream uses application/json
as the content-type
for message conversion on both inbound and outbound bindings.
On the outbound, the data is serialized as byte[],
and the Pulsar binder then uses Schema.BYTES
to send it over the wire to the Pulsar topic.
Similarly, on the inbound, the data is consumed as byte[]
from the Pulsar topic and then converted into the target type using the proper message converter.
Using Native Conversion in Pulsar using Pulsar Schema
Although the default is to use the framework-provided message conversion, Spring Cloud Stream allows each binder to determine how the message should be converted. Suppose the application chooses to go this route. In that case, Spring Cloud Stream steers clear of using any Spring-provided message conversion facility and passes around the data it receives or produces. This feature in Spring Cloud Stream is known as native encoding on the producer side and native decoding on the consumer side. This means that the encoding and decoding natively occur on the target middleware, in our case, on Apache Pulsar. For the above application, we can use the following configuration to bypass the framework conversion and uses native encoding and decoding.
spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
The property to enable native encoding on the producer side is a binding level property from the core Spring Cloud Stream.
You set it on the producer binding - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding
and set this to true.
Similarly, use - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding
for consumer bindings and set it to true.
If we decide to use native encoding and decoding, in the case of Pulsar, we need to set the corresponding schema and the underlying message type information.
This information is provided as extended binding properties.
As you can see above in the configuration, the properties are - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type
for schema information and spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type
for the actual target type.
If you have both keys and values on the message, you can use message-key-type
and message-value-type
to specify their target types.
Any configured custom schema mappings will be consulted when the schema-type property is omitted.
|
Message Header Conversion
Each message typically has header information that needs to be carried along as the message traverses between Pulsar and Spring Messaging via Spring Cloud Stream input and output bindings. To support this traversal, the framework handles the necessary message header conversion.
Custom Header Mapper
The Pulsar binder is configured with a default header mapper that can be overridden by providing your own PulsarHeaderMapper
bean.
In the following example, a JSON header mapper is configured that:
-
maps all inbound headers (except those with keys “top” or “secret”)
-
maps outbound headers (except those with keys “id”, “timestamp”, or “userId”)
-
only trusts objects in the “com.acme” package for outbound deserialization
-
de/serializes any “com.acme.Money” header values w/ simple
toString()
encoding
@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
Using Pulsar Properties in the Binder
The binder uses basic components from Spring for Apache Pulsar framework to build its producer and consumer bindings.
Since binder-based applications are Spring Boot applications, binder, by default, uses the Spring Boot autoconfiguration for Spring for Apache Pulsar.
Therefore, all Pulsar Spring Boot properties available at the core framework level are also available through the binder.
For example, you can use properties with the prefix spring.pulsar.producer…
, spring.pulsar.consumer…
etc.
In addition, you can also set these Pulsar properties at the binder level.
For instance, this will also work - spring.cloud.stream.pulsar.binder.producer…
or spring.cloud.stream.pulsar.binder.consumer…
.
Either of the above approaches is fine, but when using properties like these, it is applied to the whole application.
If you have multiple functions in the application, they all get the same properties.
You can also set these Pulsar properties at the extended binding properties level to address this.
Extended binding properties are applied at the binding itself.
For instance, if you have an input and output binding, and both require a separate set of Pulsar properties, you must set them on the extended binding.
The pattern for producer binding is spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…
.
Similarly, for consumer binding, the pattern is spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…
.
This way, you can have a separate set of Pulsar properties applied for different bindings in the same application.
The highest precedence is for extended binding properties.
The precedence order of applying the properties in the binder is extended binding properties → binder properties → Spring Boot properties.
(going from highest to lowest).
Resources for Pulsar Binder Properties
Following are some resources to rely upon for finding more about the properties available through the Pulsar binder.
Pulsar producer binding configuration.
These properties need the spring.cloud.stream.bindings.<binding-name>.producer
prefix.
All the Spring Boot provided Pulsar producer properties are also available through this configuration class.
Pulsar consumer binding configuration.
These properties need the spring.cloud.stream.bindings.<binding-name>.consumer
prefix.
All the Spring Boot provided Pulsar consumer properties are also available through this configuration class.
For common Pulsar binder specific configuration properties, see this. These properties require a prefix of spring.cloud.stream.pulsar.binder
.
The above specified producer and consumer properties (including the Spring Boot ones) can be used at the binder using the spring.cloud.stream.pulsar.binder.producer
or spring.cloud.stream.pulsar.binder.consumer
prefix.
Pulsar Topic Provisioner
Spring Cloud Stream binder for Apache Pulsar comes with an out-of-the-box provisioner for Pulsar topics.
When running an application, if the necessary topics are absent, Pulsar will create the topics for you.
However, this is a basic non-partitioned topic, and if you want advanced features like creating a partitioned topic, you can rely on the topic provisioner in the binder.
Pulsar topic provisioner uses PulsarAdministration
from the framework, which uses the PulsarAdminBuilder.
For this reason, you need to set the spring.pulsar.administration.service-url
property unless you are running Pulsar on the default server and port.
Specifying partition count when creating the topic
When creating the topic, you can set the partition count in two ways.
First, you can set it at the binder level using the property spring.cloud.stream.pulsar.binder.partition-count
.
As we saw above, doing this way will make all the topics created by the application inherit this property.
Suppose you want granular control at the binding level for setting partitions.
In that case, you can set the partition-count
property per binding using the format spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count
.
This way, various topics created by different functions in the same application will have different partitions based on the application requirements.