Protobuf Definitions for Chat Stream Service

The Protobuf definitions for the chat stream are defined in the server file:

chat-service/src/main/proto/ChatService.proto

The primary protobuf definition to understand is ChatStreamService:

service ChatStreamService {
    rpc chat(stream ChatMessage) returns (stream ChatMessageFromServer);
}

This defines a binary streaming channel between the client and server.

Implement Chat Stream Service

Open chat-service/src/main/java/com/example/chat/grpc/ChatStreamServiceImpl.java. ChatStreamServiceImpl.chat(…) is the bidirectional stream stub we need to implement.

On the server side, we’ll need to listen to incoming streamed messages. To do this, the server needs to return a StreamObserver, to listen to incoming messages:

@Override
public StreamObserver<ChatMessage> chat(
  StreamObserver<ChatMessageFromServer> responseObserver) {
  final String username = Constant.USER_ID_CTX_KEY.get();

  return new StreamObserver<ChatMessage>() {
    @Override
    public void onNext(ChatMessage chatMessage) {

    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }
  };
}

The responseObserver in the method parameter is what the server needs to use to stream data to the client.

The ChatMessage being streamed to the server may have different types, such as JOIN a room, LEAVE a room, or simply a TEXT message for a room. Implement the following:

  1. When joining a chat room, add the responseObserver to the room’s set of all currently connected observers.
@Override
public void onNext(ChatMessage chatMessage) {
  Set<StreamObserver<ChatMessageFromServer>> observers =
      getRoomObservers(chatMessage.getRoomName());
  switch (chatMessage.getType()) {
    case JOIN:
      observers.add(responseObserver);
      break;
  }
}
  1. When leaving a chat room, remove the responseObserver from the room’s observers set.
@Override
public void onNext(ChatMessage chatMessage) {
  Set<StreamObserver<ChatMessageFromServer>> observers =
      getRoomObservers(chatMessage.getRoomName());
  switch (chatMessage.getType()) {
    case JOIN:
      observers.add(responseObserver);
      break;
    case LEAVE:
      observers.remove(responseObserver);
      break;
  }
}
  1. When sending a message, first make sure user is in the room, and then send to all the connected observers in the room:
@Override
public void onNext(ChatMessage chatMessage) {
  Set<StreamObserver<ChatMessageFromServer>> observers =
      getRoomObservers(chatMessage.getRoomName());
  switch (chatMessage.getType()) {
    case JOIN:
      observers.add(responseObserver);
      break;
    case LEAVE:
      observers.remove(responseObserver);
      break;
    case TEXT:
      if (!observers.contains(responseObserver)) {
        responseObserver.onError(
          Status.PERMISSION_DENIED.withDescription("You are not in the room " +
            chatMessage.getRoomName()).asRuntimeException());
        return;
      }
      Timestamp now = Timestamp.newBuilder()
          .setSeconds(new Date().getTime()).build();
      ChatMessageFromServer messageFromServer =
          ChatMessageFromServer.newBuilder()
              .setType(chatMessage.getType())
              .setTimestamp(now)
              .setFrom(username)
              .setMessage(chatMessage.getMessage())
              .setRoomName(chatMessage.getRoomName())
          .build();
      observers.stream().forEach(o -> o.onNext(messageFromServer));
      break;
  }
}
  1. If there is an error, or when the client closes connection, remove the responseObserver from all rooms
@Override
public void onError(Throwable throwable) {
    logger.log(Level.SEVERE, "gRPC error", throwable);
    removeObserverFromAllRooms(responseObserver);
}

@Override
public void onCompleted() {
  removeObserverFromAllRooms(responseObserver);
}
  1. Run the authserver and chatserver in separate terminals:
//start auth server
$ cd auth-service
$ mvn install exec:java
...
INFO: Server started on port 9091

//start chat server
$ cd chat-service
$ mvn install exec:java