5. Spring Cloud GCP for Pub/Sub

Spring Cloud GCP provides an abstraction layer to publish to and subscribe from Google Cloud Pub/Sub topics and to create, list or delete Google Cloud Pub/Sub topics and subscriptions.

A Spring Boot starter is provided to auto-configure the various required Pub/Sub components.

Maven coordinates, using Spring Cloud GCP BOM:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>

Gradle coordinates:

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-pubsub'
}

This starter is also available from Spring Initializr through the GCP Messaging entry.

A sample application is available.

5.1 Pub/Sub operations abstraction

PubSubOperations is an abstraction that allows Spring users to use Google Cloud Pub/Sub without depending on any Google Cloud Pub/Sub API semantics. It provides the common set of operations needed to interact with Google Cloud Pub/Sub. PubSubTemplate is the default implementation of PubSubOperations and it uses the Google Cloud Java Client for Pub/Sub to interact with Google Cloud Pub/Sub.

PubSubTemplate depends on a PublisherFactory and a SubscriberFactory. The PublisherFactory provides a Google Cloud Java Client for Pub/Sub Publisher. The SubscriberFactory provides the Subscriber for asynchronous message pulling, as well as a SubscriberStub for synchronous pulling and an Acknowledger, for the cases where messages are automatically acknowledged. The Spring Boot starter for GCP Pub/Sub auto-configures a PublisherFactory and SubscriberFactory with default settings and uses the GcpProjectIdProvider and CredentialsProvider auto-configured by the Spring Boot GCP starter.

The PublisherFactory implementation provided by Spring Cloud GCP Pub/Sub, DefaultPublisherFactory, caches Publisher instances by topic name, in order to optimize resource utilization.

5.1.1 Publishing to a topic

PubSubTemplate provides asynchronous methods to publish messages to a Google Cloud Pub/Sub topic. The publish() method takes in a topic name to post the message to, a payload of a generic type and, optionally, a map with the message headers.

Here is an example of how to publish a message to a Google Cloud Pub/Sub topic:

public void publishMessage() {
    this.pubSubTemplate.publish("topic", "your message payload", ImmutableMap.of("key1", "val1"));
}

By default, the SimplePubSubMessageConverter is used to convert payloads of type byte[], ByteString, ByteBuffer, and String to Pub/Sub messages.

For serialization and deserialization of POJOs using Jackson JSON, configure the PubSubTemplate to use the JacksonPubSubMessageConverter by calling the setMessageConverter() method.

5.1.2 Subscribing to a subscription

Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic. PubSubTemplate allows you to subscribe to subscriptions via the subscribe() method. It relies on a SubscriberFactory object, whose only task is to generate Google Cloud Pub/Sub Subscriber objects. When subscribing to a subscription, messages will be pulled from Google Cloud Pub/Sub asynchronously, on a certain interval.

The Spring Boot starter for Google Cloud Pub/Sub auto-configures a SubscriberFactory.

5.1.3 Pulling messages from a subscription

Google Cloud Pub/Sub supports synchronous pulling of messages from a subscription. This is different from subscribing to a subscription, in the sense that subscribing is an asynchronous task which polls the subscription on a set interval.

The pullNext() method allows for a single message to be pulled and automatically acknowledged from a subscription. The pull() method pulls a number of messages from a subscription, allowing for the retry settings to be configured. Any messages received by pull() are not automatically acknowledged. Instead, since they are of the kind AcknowledgeablePubsubMessage, you can acknowledge them by calling the ack() method, or negatively acknowledge them by calling the nack() method. The pullAndAck() method does the same as the pull() method and, additionally, acknowledges all received messages.

In order to acknowledge or negatively acknowledge the messages received from pull(), you can use the acknowledger provided by PubSubTemplate.getAcknowledger(). The provided acknowledger allows for acknowledging or negatively acknowledging a set of acknowledge IDs, pertaining to a subscription. The subscription name passed to the acknowledger must have the following format: project/[GCP_PROJECT_ID]/subscriptions/[SUBSCRIPTION_NAME].

PubSubTemplate uses a special subscriber generated by its SubscriberFactory to synchronously pull messages.

If the message payload contains a serialized POJO, it can be retrieved as a Class compatible with that serialized payload:

this.pubSubTemplate.getMessageConverter().fromMessage(message, MyPojo.class);

5.2 Pub/Sub management

PubSubAdmin is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources. It allows for the creation, deletion and listing of topics and subscriptions.

PubSubAdmin depends on GcpProjectIdProvider and either a CredentialsProvider or a TopicAdminClient and a SubscriptionAdminClient. If given a CredentialsProvider, it creates a TopicAdminClient and a SubscriptionAdminClient with the Google Cloud Java Library for Pub/Sub default settings. The Spring Boot starter for GCP Pub/Sub auto-configures a PubSubAdmin object using the GcpProjectIdProvider and the CredentialsProvider auto-configured by the Spring Boot GCP Core starter.

5.2.1 Creating a topic

PubSubAdmin implements a method to create topics:

public Topic createTopic(String topicName)

Here is an example of how to create a Google Cloud Pub/Sub topic:

public void newTopic() {
    pubSubAdmin.createTopic("topicName");
}

5.2.2 Deleting a topic

PubSubAdmin implements a method to delete topics:

public void deleteTopic(String topicName)

Here is an example of how to delete a Google Cloud Pub/Sub topic:

public void deleteTopic() {
    pubSubAdmin.deleteTopic("topicName");
}

5.2.3 Listing topics

PubSubAdmin implements a method to list topics:

public List<Topic> listTopics

Here is an example of how to list every Google Cloud Pub/Sub topic name in a project:

public List<String> listTopics() {
    return pubSubAdmin
        .listTopics()
        .stream()
        .map(Topic::getNameAsTopicName)
        .map(TopicName::getTopic)
        .collect(Collectors.toList());
}

5.2.4 Creating a subscription

PubSubAdmin implements a method to create subscriptions to existing topics:

public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)

Here is an example of how to create a Google Cloud Pub/Sub subscription:

public void newSubscription() {
    pubSubAdmin.createSubscription("subscriptionName", "topicName", 10, “http://my.endpoint/push”);
}

Alternative methods with default settings are provided for ease of use. The default value for ackDeadline is 10 seconds. If pushEndpoint isn’t specified, the subscription uses message pulling, instead.

public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, String pushEndpoint)

5.2.5 Deleting a subscription

PubSubAdmin implements a method to delete subscriptions:

public void deleteSubscription(String subscriptionName)

Here is an example of how to delete a Google Cloud Pub/Sub subscription:

public void deleteSubscription() {
    pubSubAdmin.deleteSubscription("subscriptionName");
}

5.2.6 Listing subscriptions

PubSubAdmin implements a method to list subscriptions:

public List<Subscription> listSubscriptions()

Here is an example of how to list every subscription name in a project:

public List<String> listSubscriptions() {
    return pubSubAdmin
        .listSubscriptions()
        .stream()
        .map(Subscription::getNameAsSubscriptionName)
        .map(SubscriptionName::getSubscription)
        .collect(Collectors.toList());
}

5.3 Configuration

The Spring Boot starter for Google Cloud Pub/Sub provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.gcp.pubsub.enabled

Enables or disables Pub/Sub auto-configuration

No

true

spring.cloud.gcp.pubsub.subscriber.executor-threads

Number of threads used by Subscriber instances created by SubscriberFactory

No

4

spring.cloud.gcp.pubsub.publisher.executor-threads

Number of threads used by Publisher instances created by PublisherFactory

No

4

spring.cloud.gcp.pubsub.project-id

GCP project ID where the Google Cloud Pub/Sub API is hosted, if different from the one in the Spring Cloud GCP Core Module

No

 

spring.cloud.gcp.pubsub.credentials.location

OAuth2 credentials for authenticating with the Google Cloud Pub/Sub API, if different from the ones in the Spring Cloud GCP Core Module

No

 

spring.cloud.gcp.pubsub.credentials.encoded-key

Base64-encoded contents of OAuth2 account private key for authenticating with the Google Cloud Pub/Sub API, if different from the ones in the Spring Cloud GCP Core Module

No

 

spring.cloud.gcp.pubsub.credentials.scopes

OAuth2 scope for Spring Cloud GCP Pub/Sub credentials

No

https://www.googleapis.com/auth/pubsub

spring.cloud.gcp.pubsub.subscriber.parallel-pull-count

The number of pull workers

No

The available number of processors

spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period

The maximum period a message ack deadline will be extended, in seconds

No

0

spring.cloud.gcp.pubsub.subscriber.pull-endpoint

The endpoint for synchronous pulling messages

No

pubsub.googleapis.com:443

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-seconds

TotalTimeout has ultimate control over how long the logic should keep trying the remote call until it gives up completely. The higher the total timeout, the more retries can be attempted.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-retry-delay-second

InitialRetryDelay controls the delay before the first retry. Subsequent retries will use this value adjusted according to the RetryDelayMultiplier.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.retry-delay-multiplier

RetryDelayMultiplier controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call.

No

1

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-retry-delay-seconds

MaxRetryDelay puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-attempts

MaxAttempts defines the maximum number of attempts to perform. If this value is greater than 0, and the number of attempts reaches this limit, the logic will give up retrying even if the total retry time is still lower than TotalTimeout.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.jittered

Jitter determines if the delay time should be randomized.

No

true

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-rpc-timeout-seconds

InitialRpcTimeout controls the timeout for the initial RPC. Subsequent calls will use this value adjusted according to the RpcTimeoutMultiplier.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.rpc-timeout-multiplier

RpcTimeoutMultiplier controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call.

No

1

spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-rpc-timeout-seconds

MaxRpcTimeout puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount.

No

0

spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count

Maximum number of outstanding elements to keep in memory before enforcing flow control.

No

unlimited

spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes

Maximum number of outstanding bytes to keep in memory before enforcing flow control.

No

unlimited

spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.limit-exceeded-behavior

The behavior when the specified limits are exceeded.

No

Block

spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold

The element count threshold to use for batching.

No

unset (threshold does not apply)

spring.cloud.gcp.pubsub.publisher.batching.request-byte-threshold

The request byte threshold to use for batching.

No

unset (threshold does not apply)

spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds

The delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent.

No

unset (threshold does not apply)

spring.cloud.gcp.pubsub.publisher.batching.enabled

Enables batching.

No

false