Change Streams

As of MongoDB 3.6, Change Streams let applications get notified about changes without having to tail the oplog.

Change Stream support is only possible for replica sets or for a sharded cluster.

Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.

It is possible to watch both on a collection as well as database level, whereas the database level variant publishes changes from all collections within the database. When subscribing to a database change stream, make sure to use a suitable type for the event type as conversion might not apply correctly across different entity types. In doubt, use Document.

Change Streams with MessageListener

Listening to a Change Stream by using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component. In this case, we need to first create a MessageListenerContainer, which will be the main entry point for running the specific SubscriptionRequest tasks. Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate and is capable of creating and running Task instances for a ChangeStreamRequest.

The following example shows how to use Change Streams with MessageListener instances:

Example 1. Change Streams with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 Starting the container initializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately.
2 Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion.
3 Set the collection to listen to and provide additional options through ChangeStreamOptions.
4 Register the request. The returned Subscription can be used to check the current Task state and cancel it to free resources.
5 Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container.

Errors while processing are passed on to an org.springframework.util.ErrorHandler. If not stated otherwise a log appending ErrorHandler gets applied by default.
Please use register(request, body, errorHandler) to provide additional functionality.

Reactive Change Streams

Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as ChangeStreamOptions, remain the same. The following example shows how to use Change Streams emitting ChangeStreamEvents:

Example 2. Change Streams emitting ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion.
2 Use an aggregation pipeline or just a query Criteria to filter events.
3 Obtain a Flux of change stream events. The ChangeStreamEvent#getBody() is converted to the requested domain type from (2).

Resuming Change Streams

Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume token or the last known server time (in UTC). Use ChangeStreamOptions to set the value accordingly.

The following example shows how to set the resume offset using server time:

Example 3. Resume a Change Stream
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1 You may obtain the server time of an ChangeStreamEvent through the getTimestamp method or use the resumeToken exposed through getResumeToken.
In some cases an Instant might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native BsonTimestamp for that purpose.