|
This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 7.0.1! |
gRPC Support
Starting with version 7.1, Spring Integration provides inbound and outbound gateways to communicate via gRPC protocol.
This dependency is required for the project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-grpc</artifactId>
<version>7.1.0-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-grpc:7.1.0-SNAPSHOT"
Spring Integration components for gRPC are not generated from Protocol buffers, and they are not type-safe as typical gRPC service and stub implementations.
This is mostly due to the generic nature of the Spring Integration framework itself, where the unit of work is a Message abstraction and the payload type of this message is usually out of integration component internal logic scope.
Therefore, gRPC messages for service calls are sent and received as is without conversion assumptions.
For example, if gRCP service methods are like this:
service TestHelloWorld {
// Sends a greeting
rpc SayHello(HelloRequest) returns (HelloReply) {}
// Sends a greeting and something else
rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {}
// Sends a greeting to everyone presenting
rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {}
// Streams requests and replies
rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {}
}
The HelloRequest will be a request message payload on the inbound gateway (server) side, and has to be on the outbound gateway (client) side request.
Therefore, the HelloReply has to be a reply message payload on the inbound gateway, and will be received on the outbound gateway.
The GrpcHeaders class contains convenient constants for header names used (and populated) in messages before and after gRPC gateways.
For example, the GrpcHeaders.METHOD_TYPE header contains a io.grpc.MethodDescriptor.MethodType enum value on the server side (inbound gateway) for easier downstream routing.
Another useful header is a GrpcHeaders.SERVICE_METHOD which indicates what gRPC service method was called on the server, or what gRPC service method to call from the client stub.
The GrpcHeaders.SERVICE_METHOD header on the inbound gateway has a value of the gRPC service method name exactly as it is declared in the Protobuf (see .proto example above) and how it is stored into the io.grpc.MethodDescriptor of the service definition.
|
Inbound Gateway for gRPC
The GrpcInboundGateway is a MessagingGatewaySupport implementation to receive gRPC requests, send messages downstream flow and produce gRPC responses.
For initialization, the instance of this gateway requires only an abstract gRPC service class implementing BindableService, usually generated from Protobuf and comes with a *ImplBase class name.
Only standard gRPC services are supported: a generated AsyncService contract is what GrpcInboundGateway logic based on.
The Reactor and Kotlin-based service generation don’t make sense in Spring Integration logic since those types are not exposed from the gateway definition.
|
The gateway uses the mentioned AsyncService interface to create proxy and intercept gRPC service methods.
The following example demonstrates how to configure a GrpcInboundGateway:
@Bean
GrpcInboundGateway helloWorldService() {
return new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class);
}
The GrpcInboundGateway implements a BindableService and exposes a ServerServiceDefinition based on the mentioned proxy for an AsyncService contract of the gRPC service.
Therefore, an instance of this gateway has to be registered into a ServerBuilder and no need in any other *ImplBase implementations in the application.
With Spring gRPC and its auto-discovery for BindableService implementations, the GrpcInboundGateway has to be declared as a top-level bean.
Therefore, Java DSL API like IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class)) is not recommended because such a BindableService implementation won’t make it visible for respective Spring gRPC infrastructure.
|
The GrpcInboundGateway uses a sendAndReceiveMessageReactive() API to interact with the downstream flow and adapts a Mono reply to the gRPC StreamObserver.
As mentioned before, the request message payload is exactly a gRPC request message, and it expects a reply in a from of gRPC response message.
The downstream logic can be type-safe and deal with gRPC messages similar way as if *ImplBase would be implemented manually.
The MethodDescriptor.MethodType.UNARY and MethodDescriptor.MethodType.BIDI_STREAMING are same from the downstream handling logic perspective.
In other words, the BIDI_STREAMING is handled as a loop on request items and the gateway produces a response item immediately into the response StreamObserver.
For different BIDI_STREAMING logic, the regular gRPC service implementation is recommended.
The MethodDescriptor.MethodType.CLIENT_STREAMING mode produces a message with a Flux as a payload of gRPC request items.
For the MethodDescriptor.MethodType.SERVER_STREAMING mode a reply payload can be a single gRPC response message or a Flux of them.
The following example demonstrates an IntegrationFlow implementation for the mentioned TestHelloWorldGrpc.TestHelloWorldImplBase service:
@Bean
IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) {
return IntegrationFlow.from(helloWorldService)
.route(Message.class, message ->
message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class),
router -> router
.subFlowMapping("SayHello", flow -> flow
.transform(this::requestReply))
.subFlowMapping("StreamSayHello", flow -> flow
.transform(this::streamReply))
.subFlowMapping("HelloToEveryOne", flow -> flow
.transformWith(transformSpec -> transformSpec
.transformer(this::streamRequest)
.async(true)))
.subFlowMapping("BidiStreamHello", flow -> flow
.transform(this::requestReply))
)
.get();
}
private HelloReply requestReply(HelloRequest helloRequest) {
return newHelloReply("Hello " + helloRequest.getName());
}
private Flux<HelloReply> streamReply(HelloRequest helloRequest) {
return Flux.just(
newHelloReply("Hello " + helloRequest.getName()),
newHelloReply("Hello again!"));
}
private Mono<HelloReply> streamRequest(Flux<HelloRequest> request) {
return request
.map(HelloRequest::getName)
.collectList()
.map(names -> StringUtils.collectionToDelimitedString(names, ", "))
.map("Hello "::concat)
.map(TestConfig::newHelloReply);
}
private static HelloReply newHelloReply(String message) {
return HelloReply.newBuilder().setMessage(message).build();
}
The routing is done on the GrpcHeaders.SERVICE_METHOD header populated by the GrpcInboundGateway.
All the downstream transformer business methods are type-safe in regard to gRPC messages for the TestHelloWorldGrpc.TestHelloWorldImplBase service.