Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for client and server message buffering #28

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

jvican
Copy link
Collaborator

@jvican jvican commented Jun 4, 2021

(work-in-progress)

One of the current problems with our implementation is that request is
only called when the downstream consumer finishes the processing of a
message received from the server.

That is, if the server sends 10 messages on the same pipe and then the
client consumer is busy processing the first message, the other nine
messages will not be populated in the observable and might not even
reach the client grpc netty buffer. Only when the client finishes
processing the first message, it then calls request, processes another
message, and the same pattern continues where every message is pulled
right before it's going to be processed and not before that. This means
there is obviously a performance penalty and that such behavior can
change the way the consumer processes messages that can be sent or
processed in parallel, resulting in unexpected user semantics.

This commit uses asyncBoundary to fix this problem and have monix-grpc
populate an internal buffer with as many messages as they fit, while
still allowing the consumers to process these messages individually
without waiting to completely fill the buffer. If the server sends more
messages than they fit in the buffer, then asyncBoundary stops
consuming messages from the grpc source and consequently stop running
request calls, so the grpc runtime doesn't call onMessage anymore
until the messages in the buffer are processed downstream.

This behavior is great because the messages processed by onMessage are
added to a hot observable and asyncBoundary gives us all the benefits
of a backpressured source without actually using that source. As a
result, this means our hot observable will never grow larger than our
buffer because onMessage is only called when request is called.

So this simple solution actually gives us the semantics we want. The
following PR is built on top of this simple mechanism.

This is a work-in-progress PR, here's the things I'm currently
working on adding:

  • Replicate the same implementation on the server call options
  • Add a battery test similar to the one found in reactive-grpc
  • Add a low tide argument to the client and server options
  • Consider calling request with N messages instead of 1 if needed

I'm actively working on this PR so I'll add these changes shortly.

It has been renamed to `ClientCallOptionsApi` so it's no longer needed.
One of the current problems with our implementation is that `request` is
only called when the downstream consumer finishes the processing of a
message received from the server.

That is, if the server sends 10 messages on the same pipe and then the
client consumer is busy processing the first message, the other nine
messages will not be populated in the observable and might not even
reach the client grpc netty buffer. Only when the client finishes
processing the first message, it then calls `request`, processes another
message, and the same pattern continues where every message is pulled
right before it's going to be processed and not before. This means that
there is obviously a performance penalty and that such behavior can
change the way the consumer processes messages that can be sent or
processed in parallel, resulting in unexpected user semantics.

This commit uses `asyncBoundary` to fix this problem and have monix-grpc
populate an internal buffer with as many messages as they fit, while
still allowing the consumers to process these messages individually
without waiting to completely fill the buffer. If the server sends more
messages than they fit in the buffer, then `asyncBoundary` stops
consuming messages from the grpc source and consequently stop running
`request` calls, so the grpc runtime doesn't call `onMessage` anymore
until the messages in the buffer are processed downstream.

This behavior is great because the messages processed by `onMessage` are
added to a hot observable and `asyncBoundary` gives us all the benefits
of a backpressured source without actually using that source. As a
result, this means our hot observable will never grow larger than our
buffer because `onMessage` is only called when `request` is called.

So this simple solution actually gives us the semantics we want.
However, there are still some things to improve, namely:

- Users might want to specify a "low tide" to this buffer. That is, a
  number below which we will always rehydrate the buffer until
  completing it but above which we will not rehydrate it. This argument
  gives the client the flexibility to optimize for memory if it doesn't
  want the buffer filled up to its limit to save space.
- We might want to `request` several messages at once instead of one at
  a time when repopulating the buffer. It's not clear if this gives us
  any benefit actually as grpc's internal engine is likely already
  buffering things in Netty byte buffers and calling `request` for each
  message might not poise any performance penalty.
@jvican jvican added the enhancement New feature or request label Jun 4, 2021
A similar adjustment to the one made for the client call options API.
I've taken the libery to redesign how server call options are passed in
regardless of binary compatibility as we don't promise backwards
compatibility until we make a stable release >= 1.0.

I have added some more documentation to the server call to make it clear
that we are trying to replicate the API in the underlying grpc java
library.
* @param call is the instance of the wrapped grpc server call.
* @param options is a field for custom user-defined server call options.
*/
final class ServerCall[Request, Response] private (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that is not clear to me yet is how you set this ServerCallOptions.
Is meant to be set while registering the Api to the GrpcServer?

* expected behavior of a configuration, check the grpc-java defaults.
*/
final class ServerCallOptions private (
val compressor: Option[grpc.Compressor],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the default values would make it easier to follow the code instead of using Option.
I.e. bufferSize can be None == 0 buffer or no buffer but
No compression is
Compressor.Identity the default could be Compressor.Gzip
enabledMessageCompression should be either true or false, having true false and None is a bit weird.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants