gRPC builds on HTTP/2’s long-lived connections which provides a foundation for long-lived, real-time communication streams
and allows gRPC to support multiple communication patterns, one of which is server streaming RPC.
One use case for server streaming RPC could be to implement long-lived streaming of events or notifications from the server to interested clients,
and in this tutorial, we are going to look at how to implement this with gRPC Java using observer pattern.
1. Sample Domain
As an example, we will implement a service that simulates stock price updates and publishes events about it.
We will also create a listener for these events and use it to notify gRPC clients using streaming.
And we will not use any extra dependencies other than gRPC Java and the libraries included in it,
like Guava, which provides us with EventBus that we will use to implement a simple publish-subscribe communication.
First of all, let’s implement a utility class that will allow us to publish events and subscribe to them:
Next, we define a record for the event that will be published:
As well as a domain model that will publish an event on update:
We also implement an in-memory repository that we initialize with several stocks with random prices:
After that, we implement a listener for events, which for now will simply print them:
And we implement a task that will randomly update the price of one of the stocks every second:
Finally, we create the main method where we register the listener and run the task on a separate thread:
Now if we run the application, in the output we will see something like this:
2. RPC Implementation
gRPC, like many RPC systems, is based on the idea of defining a service and its methods that can be called remotely, which is what we need to do first.
2.1. Service definition
The only method in our service will be server streaming RPC that returns prices of a requested stock:
2.2. Server
Let’s move on to implementing this service in our application.
Using server streaming RPC allows us to send one or more messages to the client before completing the call,
but as an initial implementation, we will simply return the current price of the requested stock or an error if it is not found:
Once we’ve implemented our service, we also need to start up a gRPC server, which we will do in the main method:
2.3. Client
Now we can move on to creating a simple client.
We use a blocking stub to request the price of a certain stock and print responses:
By running the server and the client, we can verify that everything works, but there will be only one response in the output of the client,
after which the connection is closed from the server side and the client process is finished.
3. Long-lived Streaming
But after all, our goal was to get updates from the server, otherwise, we could just use unary RPC.
Therefore, we will improve the processing of the request on the server by implementing the observer pattern.
3.1. Observer Pattern
The observer pattern encompasses two types of objects — a subject and an observer.
The subject maintains a list of observers and notifies them of state changes, usually by calling one of their methods.
In our implementation, StreamObserver from gRPC will act as an observer, and we only need to implement a subject.
3.1.1. Registration
Our subject will allow observers to be registered by the stock symbol and will notify the right ones upon request.
We synchronize updates to the collection of observers, and to ensure that we notify only observers registered before the message is published to the subject but avoid holding a lock while notifying all observers, we implement copy-on-read and only synchronize this operation:
Note:StreamObservers are not thread safe, if multiple threads will be writing to a StreamObserver concurrently, you must synchronize those calls as well.
Now let’s update the gRPC service so that instead of completing the call, it registers an observer in the subject:
And also update the listener so that it requests notification of observers from the subject on events:
By restarting the server and the client, we can see that now the client process does not terminate, and it receives all price updates, but we can notice a small problem if we terminate the client manually.
The subject will try to notify the observer, but since the call is canceled, it will endlessly display an error about this:
To fix this, we need to implement the unregistration of observers when canceling a call.
3.1.2. Unregistration
First, we need to add a method for unregistering to our subject:
After that, we need to add a callback to StreamObserver which will unregister it when the call is canceled.
We can do this in the subject itself when registering the observer, or in the service where we register it:
Now if we terminate the client process, we see that the observer has been successfully unregistered and there are no more errors in the server output.
3.2. Client-side cancellation
Currently, we have to completely terminate the client process or at least the thread to cancel a call, otherwise we have to wait until the server completes the call, which may not suit us.
Fortunately, there are other ways to cancel the call from the client side, one of which is using CancellableContext, which will work for all kinds of stubs, both synchronous and asynchronous.
To take advantage of this feature, we need to obtain a context using Context.current().withCancellation() and then use it to start a call with CancellableContext#run(Runnable).
Now having a reference to the context, we can cancel the call at any stage using CancellableContext#cancel(Throwable).
Let’s look at its use in practice, for example, imagine that we want to receive only 5 responses from the server, and then cancel the call.
To do this, we start a call as described above, and also implement a counter that we will increment as each response is processed, then simply end the call when the counter reaches 5:
Running the updated client, we see that it will cancel the call after receiving 5 responses and continue the main method by printing the string we specified.
4. Conclusion
We looked at how to implement long-lived streaming in gRPC Java, reaffirming just how powerful and versatile this framework is.