gRPC Streaming – Grape Up

Earlier articles offered what Protobuf is and the way it may be mixed with gRPC to implement easy synchronous API. Nevertheless, it didn’t current the true energy of gRPC, which is streaming, totally using the capabilities of HTTP/2.0.

Contract definition

We should outline the tactic with enter and output parameters just like the earlier service. To comply with the separation of considerations, let’s create a devoted service for GPS monitoring functions. Our current proto must be prolonged with the next snippet.

message SubscribeRequest {
  string vin = 1;

service GpsTracker {
  rpc Subscribe(SubscribeRequest) returns (stream Geolocation);

Essentially the most essential half right here of enabling streaming is specifying it in enter or output kind. To do this, a key phrase stream is used. It signifies that the server will preserve the connection open, and we are able to anticipate Geolocation messages to be despatched by it.


public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {

The easy implementation of the tactic doesn’t differ from the implementation of a unary name. The one distinction is in how onNext the tactic behaves; in common synchronous implementation, the tactic can’t be invoked greater than as soon as. Nevertheless, for methodology working on stream, onNext could also be invoked as many instances as you need.

As you might discover on the hooked up screenshot, the geolocation place was returned however the connection continues to be established and the consumer awaits extra information to be despatched within the stream. If the server desires to tell the consumer that there isn’t a extra information, it ought to invoke: the onCompleted methodology; nevertheless, sending single messages shouldn’t be why we wish to use stream.

Use instances for streaming capabilities are primarily transferring vital responses as streams of knowledge chunks or real-time occasions. I’ll attempt to reveal the second use case with this service. Implementation can be primarily based on the reactor ( ) as it really works nicely for the offered use case.

Let’s put together a easy implementation of the service. To make it work, internet flux dependency can be required.

implementation 'org.springframework.boot:spring-boot-starter-webflux'

We should put together a service for publishing geolocation occasions for a selected automobile.

import com.grapeup.grpc.instance.mannequin.GeolocationEvent;
import org.springframework.stereotype.Service;
import reactor.core.writer.Flux;
import reactor.core.writer.Sinks;

public class InMemoryGeolocationService implements GeolocationService {

    non-public remaining Sinks.Many<GeolocationEvent> sink = Sinks.many().multicast().directAllOrNothing();

    public void publish(GeolocationEvent occasion) {

    public Flux<GeolocationEvent> getRealTimeEvents(String vin) {
        return sink.asFlux().filter(occasion ->;


Let’s modify the GRPC service ready within the earlier article to insert the tactic and use our new service to publish occasions.

public void insert(Geolocation request, StreamObserver<Empty> responseObserver) {
    GeolocationEvent geolocationEvent = convertToGeolocationEvent(request);;


Lastly, let’s transfer to our GPS tracker implementation; we are able to exchange the earlier dummy implementation with the next one:

public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {
        .subscribe(occasion -> responseObserver.onNext(toProto(occasion)),

Right here we make the most of utilizing Reactor, as we not solely can subscribe for incoming occasions but in addition deal with errors and completion of stream in the identical approach.

To map our inner mannequin to response, the next helper methodology is used:

non-public static Geolocation toProto(GeolocationEvent occasion) {
    return Geolocation.newBuilder()


As you might be observed, we despatched the next requests with GPS place and obtained them in real-time from our open stream connection. Streaming information utilizing gRPC or one other device like Kafka is broadly utilized in many IoT programs, together with Automotive.

Bidirectional stream

What if our consumer wish to obtain information for a number of autos however with out preliminary information about all autos they’re thinking about? Creating new connections for every automobile isn’t the perfect strategy. However fear no extra! Whereas utilizing gRPC, the consumer might reuse the identical connection because it helps bidirectional streaming, which implies that each consumer and server might ship messages utilizing open channels.

rpc SubscribeMany(stream SubscribeRequest) returns (stream Geolocation);

Sadly, IntelliJ doesn’t permit us to check this performance with their built-in consumer, so we have now to develop one ourselves.

localhost:9090/com. grapeup.geolocation.GpsTracker/SubscribeMany

com.intellij.grpc.requests.RejectedRPCException: Unsupported methodology is known as

Our dummy consumer may look one thing like that, primarily based on generated courses from the protobuf contract:

var channel = ManagedChannelBuilder.forTarget("localhost:9090")
var observer = GpsTrackerGrpc.newStub(channel)
    .subscribeMany(new StreamObserver<>() {
        public void onNext(Geolocation worth) {

        public void onError(Throwable t) {
            System.err.println("Error " + t.getMessage());

        public void onCompleted() {
whereas (true) {} // to maintain consumer subscribing for demo functions :)

For those who ship the updates for the next random VINs: JF2SJAAC1EH511148, 1YVGF22C3Y5152251, you must be capable of see the output within the console. Test it out!

Tip of the iceberg

Introduced examples are simply gRPC fundamentals; there’s way more to it, like disconnecting from the channel from each ends and reconnecting to the server in case of community failure. The next articles had been meant to share with YOU that gRPC structure has a lot to supply, and there are many prospects for the way it may be utilized in programs. Particularly in programs requiring low latency or the power to offer consumer code with strict contract validation.


Leave a Reply

    Your Cart
    Your cart is emptyReturn to Shop