Change Streams
As of MongoDB 3.6, Change Streams let applications get notified about changes without having to tail the oplog.
Change Stream support is only possible for replica sets or for a sharded cluster. |
Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.
It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
changes from all collections within the database. When subscribing to a database change stream, make sure to use a
suitable type for the event type as conversion might not apply correctly across different entity types.
In doubt, use Document
.
Change Streams with MessageListener
Listening to a Change Stream by using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component.
In this case, we need to first create a MessageListenerContainer
which will be the main entry point for running the specific SubscriptionRequest
tasks.
Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate
and is capable of creating and running Task
instances for a ChangeStreamRequest
.
The following example shows how to use Change Streams with MessageListener
instances:
MessageListener
instancesMessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)
// ...
container.stop(); (5)
1 | Starting the container initializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately. |
2 | Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion. |
3 | Set the collection to listen to and provide additional options through ChangeStreamOptions . |
4 | Register the request. The returned Subscription can be used to check the current Task state and cancel it to free resources. |
5 | Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container. |
Errors while processing are passed on to an |
Reactive Change Streams
Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as ChangeStreamOptions
, remain the same. The following example shows how to use Change Streams emitting ChangeStreamEvent
s:
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
.watchCollection("people")
.filter(where("age").gte(38)) (2)
.listen(); (3)
1 | The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. |
2 | Use an aggregation pipeline or just a query Criteria to filter events. |
3 | Obtain a Flux of change stream events. The ChangeStreamEvent#getBody() is converted to the requested domain type from (2). |
Resuming Change Streams
Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume
token or the last known server time (in UTC). Use ChangeStreamOptions
to set the value accordingly.
The following example shows how to set the resume offset using server time:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) (1)
.listen();
1 | You may obtain the server time of an ChangeStreamEvent through the getTimestamp method or use the resumeToken
exposed through getResumeToken . |
In some cases an Instant might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native
BsonTimestamp for that purpose.
|