For the latest stable version, please use spring-cloud-stream 4.2.0!

Spring Cloud Stream Binder for Apache Pulsar

Spring for Apache Pulsar provides a binder for Spring Cloud Stream that we can use to build event-driven microservices using pub-sub paradigms. In this section, we will go through the basic details of this binder.

Usage

We need to include the following dependency on your application to use Apache Pulsar binder for Spring Cloud Stream.

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

Overview

The Spring Cloud Stream binder for Apache Pulsar allows the applications to focus on business logic rather than dealing with the lower-level details of managing and maintaining Pulsar. The binder takes care of all those details for the application developer. Spring Cloud Stream brings a powerful programming model based on Spring Cloud Function that allows the app developer to write complex event-driven applications using a functional style. Applications can start from a middleware-neutral manner and then map Pulsar topics as destinations in Spring Cloud Stream through Spring Boot configuration properties. Spring Cloud Stream is built on top of Spring Boot, and when writing an event-driven microservice using Spring Cloud Stream, you are essentially writing a Boot application. Here is a straightforward Spring Cloud Stream application.

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

The above sample application, a full-blown Spring Boot application, deserves a few explanations. However, on a first pass, you can see that this is just plain Java and a few Spring and Spring Boot annotations. We have three Bean methods here - a java.util.function.Supplier, a java.util.function.Function, and finally, a java.util.function.Consumer. The supplier produces the current time in milliseconds, the function takes this time and then enhances it by adding some random data, and then the consumer logs the enhanced time.

We omitted all the imports for brevity, but nothing Spring Cloud Stream specific in the entire application. How does it become a Spring Cloud Stream application that interacts with Apache Pulsar? You must include the above dependency for the binder in the application. Once that dependency is added, you must provide the following configuration properties.

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

With this, the above Spring Boot application has become an end-to-end event-driven application based on Spring Cloud Stream. Because we have the Pulsar binder on the classpath, the application interacts with Apache Pulsar. If there is only one function in the application, then we don’t need to tell Spring Cloud Stream to activate the function for execution since it does that by default. If there is more than one such function in the application, as in our example, we need to instruct Spring Cloud Stream which functions we would like to activate. In our case, we need all of them to be activated, and we do that through the spring.cloud.function.definition property. The bean name becomes part of the Spring Cloud Stream binding name by default. A binding is a fundamentally abstract concept in Spring Cloud Stream, using which the framework communicates with the middleware destination. Almost everything that Spring Cloud Stream does occurs over a concrete binding. A supplier has only an output binding; functions have input and output bindings, and consumers have only input binding. Let’s take as an example our supplier bean - timeSupplier. The default binding name for this supplier will be timeSupplier-out-0. Similarly, the default binding names for the timeProcessor function will be timeProcessor-in-0 on the inbound and timeProcessor-out-0 on the outbound. Please refer to the Spring Cloud Stream reference docs for details on how you can change the default binding names. In most situations, using the default binding names is enough. We set the destination on the binding names, as shown above. If a destination is not provided, the binding name becomes the value for the destination as in the case of timeSupplier-out-0.

When running the above app, you should see that the supplier executes every second, which is then consumed by the function and enhances the time consumed by the logger consumer.

Message Conversion in Binder-based Applications

In the above sample application, we provided no schema information for message conversion. That is because, by default, Spring Cloud Stream uses its message conversion mechanism using the messaging support established in Spring Framework through the Spring Messaging project. Unless specified, Spring Cloud Stream uses application/json as the content-type for message conversion on both inbound and outbound bindings. On the outbound, the data is serialized as byte[], and the Pulsar binder then uses Schema.BYTES to send it over the wire to the Pulsar topic. Similarly, on the inbound, the data is consumed as byte[] from the Pulsar topic and then converted into the target type using the proper message converter.

Using Native Conversion in Pulsar using Pulsar Schema

Although the default is to use the framework-provided message conversion, Spring Cloud Stream allows each binder to determine how the message should be converted. Suppose the application chooses to go this route. In that case, Spring Cloud Stream steers clear of using any Spring-provided message conversion facility and passes around the data it receives or produces. This feature in Spring Cloud Stream is known as native encoding on the producer side and native decoding on the consumer side. This means that the encoding and decoding natively occur on the target middleware, in our case, on Apache Pulsar. For the above application, we can use the following configuration to bypass the framework conversion and uses native encoding and decoding.

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

The property to enable native encoding on the producer side is a binding level property from the core Spring Cloud Stream. You set it on the producer binding - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding and set this to true. Similarly, use - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding for consumer bindings and set it to true. If we decide to use native encoding and decoding, in the case of Pulsar, we need to set the corresponding schema and the underlying message type information. This information is provided as extended binding properties. As you can see above in the configuration, the properties are - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type for schema information and spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type for the actual target type. If you have both keys and values on the message, you can use message-key-type and message-value-type to specify their target types.

Any configured custom schema mappings will be consulted when the schema-type property is omitted.

Message Header Conversion

Each message typically has header information that needs to be carried along as the message traverses between Pulsar and Spring Messaging via Spring Cloud Stream input and output bindings. To support this traversal, the framework handles the necessary message header conversion.

Custom Header Mapper

The Pulsar binder is configured with a default header mapper that can be overridden by providing your own PulsarHeaderMapper bean.

In the following example, a JSON header mapper is configured that:

  • maps all inbound headers (except those with keys “top” or “secret”)

  • maps outbound headers (except those with keys “id”, “timestamp”, or “userId”)

  • only trusts objects in the “com.acme” package for outbound deserialization

  • de/serializes any “com.acme.Money” header values w/ simple toString() encoding

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

Using Pulsar Properties in the Binder

The binder uses basic components from Spring for Apache Pulsar framework to build its producer and consumer bindings. Since binder-based applications are Spring Boot applications, binder, by default, uses the Spring Boot autoconfiguration for Spring for Apache Pulsar. Therefore, all Pulsar Spring Boot properties available at the core framework level are also available through the binder. For example, you can use properties with the prefix spring.pulsar.producer…​, spring.pulsar.consumer…​ etc. In addition, you can also set these Pulsar properties at the binder level. For instance, this will also work - spring.cloud.stream.pulsar.binder.producer…​ or spring.cloud.stream.pulsar.binder.consumer…​.

Either of the above approaches is fine, but when using properties like these, it is applied to the whole application. If you have multiple functions in the application, they all get the same properties. You can also set these Pulsar properties at the extended binding properties level to address this. Extended binding properties are applied at the binding itself. For instance, if you have an input and output binding, and both require a separate set of Pulsar properties, you must set them on the extended binding. The pattern for producer binding is spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​. Similarly, for consumer binding, the pattern is spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​. This way, you can have a separate set of Pulsar properties applied for different bindings in the same application.

The highest precedence is for extended binding properties. The precedence order of applying the properties in the binder is extended binding properties → binder properties → Spring Boot properties. (going from highest to lowest).

Following are some resources to rely upon for finding more about the properties available through the Pulsar binder.

Pulsar producer binding configuration. These properties need the spring.cloud.stream.bindings.<binding-name>.producer prefix. All the Spring Boot provided Pulsar producer properties are also available through this configuration class.

Pulsar consumer binding configuration. These properties need the spring.cloud.stream.bindings.<binding-name>.consumer prefix. All the Spring Boot provided Pulsar consumer properties are also available through this configuration class.

For common Pulsar binder specific configuration properties, see this. These properties require a prefix of spring.cloud.stream.pulsar.binder. The above specified producer and consumer properties (including the Spring Boot ones) can be used at the binder using the spring.cloud.stream.pulsar.binder.producer or spring.cloud.stream.pulsar.binder.consumer prefix.

Pulsar Topic Provisioner

Spring Cloud Stream binder for Apache Pulsar comes with an out-of-the-box provisioner for Pulsar topics. When running an application, if the necessary topics are absent, Pulsar will create the topics for you. However, this is a basic non-partitioned topic, and if you want advanced features like creating a partitioned topic, you can rely on the topic provisioner in the binder. Pulsar topic provisioner uses PulsarAdministration from the framework, which uses the PulsarAdminBuilder. For this reason, you need to set the spring.pulsar.administration.service-url property unless you are running Pulsar on the default server and port.

Specifying partition count when creating the topic

When creating the topic, you can set the partition count in two ways. First, you can set it at the binder level using the property spring.cloud.stream.pulsar.binder.partition-count. As we saw above, doing this way will make all the topics created by the application inherit this property. Suppose you want granular control at the binding level for setting partitions. In that case, you can set the partition-count property per binding using the format spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count. This way, various topics created by different functions in the same application will have different partitions based on the application requirements.