In a previous publication, we implemented a gRPC service capable of streaming data. In this article, we will reuse this example as a foundation and focus on creating a client for the service, that will expose the streamed data through a reactive endpoint using Spring WebFlux.
1. Creating Application
We start by creating a Spring Boot application in which we expose one endpoint for retrieving stock prices.
At this stage, it will return an empty
Flux. However, we will soon integrate it with gRPC streaming to populate the stock prices dynamically:
Now we need to set up a connection with our server. While we could utilize the convenience of the gRPC Spring Boot starter, for simplicity, we will configure it manually:
With this setup, we are ready to proceed with integrating the gRPC streaming functionality into our application.
2. Integration of gRPC Streaming with Flux
To facilitate the mapping of the gRPC stream to a reactive stream publisher, we will implement a custom gRPC
This observer will leverage a construct called
Sink, which allows us to programmatically push Reactive Streams signals.
Sinks.many().unicast().onBackpressureBuffer(), we create a
Sink object that will broadcast multiple signals to a single subscriber, and buffer emitted values when subscriber is not ready to receive them, preventing data loss.
While we could also extend
Flux or implement
Publisher to use the same object in the reactive endpoint, we will instead expose the
Sink as a
Flux through a method.
Additionally, we need to implement the mapping from the proto message used in gRPC to a DTO class used in our API, but since we may want to reuse this observer in multiple places, we will keep it abstract, with an abstract method responsible for the mapping:
With the observer in place, we can now proceed to utilize it in the mapping of the gRPC stream to the reactive stream publisher.
We will declare and initialize an instance of the observer specific to our use case using an anonymous class.
Then, we will use this observer in a call to the gRPC service and return the
Flux obtained from it:
If we start both the gRPC server and our Spring application and navigate to http://localhost:8081/stocks/AMZN in the browser, we will start receiving updated prices for the specified stock.
However, there is an issue with the current implementation. When we close the browser, the gRPC call is not canceled, and the server continues publishing data to it, which can be observed in the server logs.
To address this issue, we can use
CancellableContext to cancel the call from the client-side, as discussed in the previous article about long-lived streaming.
We can implement this cancellation in a callback on
Now everything works fine, and the gRPC call is canceled when needed.
3. Encapsulating Cancellation Logic
The only issue is that if we have multiple streaming calls, we would need to repeat this cancellation logic for each one.
To simplify our code and avoid duplication, we can encapsulate this cancellation logic into our custom observer, by introducing an
This method will accept the request and a
BiConsumer representing the RPC we want to execute.
observe method will be responsible for creating a cancellable context, executing the call within that context, and returning a
Flux with a registered callback for canceling the call:
We can now utilize the
observe in all streaming calls, making our code more concise and reusable:
In this article, we explored the mapping of gRPC streams to an external API that uses reactive streams. By leveraging a custom observer and incorporating cancellation logic, we were able to seamlessly integrate gRPC streaming with Spring WebFlux, enabling the reactive handling of data streams. This approach provides a concise and reusable solution for bridging the gap between gRPC and reactive systems.
Full source code can be found on GitHub.