-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protect agains user diagnostics, better rebalance events #1102
Conversation
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the `Diagnostics` trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues: - the rebalance events are replace by a single event which is emitted from outside the rebalance listener, - all invocations of the diagnostics trait are forked (unless they are run from a finalizer).
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
What about hiding the diagnostics from the users? What's the purpose of exposing it? Shouldn't it be an internal thing? |
Well, we have had some feature requests around observability that are covered by diagnostics. But I don't know the original rationale for introducing this. |
I just pushed a new implementation that first emits to a queue and then runs the user's Diagnostics on a separate thread. |
Observability indeed. Though it could probably use some more attention at some point. |
zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala
Outdated
Show resolved
Hide resolved
If we use the approach, do we still need to modify the emitted events? |
No we don't. However, IMHO the new event is more useful then the old events. They contain more data (endedStreams) but also there is only event per rebalance. Before you could get multiple events without knowledge which happened in the same rebalance. |
Some tests depend on diagnostics. They will have to adapted... |
... by using `forkDaemon` iso `forkScoped`. When the scope closes, the queue is shutdown which leads to an interrupt in the stream when it tries to poll from it (and not while it is still processing the previous item).
... by using `forkDaemon` iso `forkScoped`. When the scope closes, the queue is shutdown (after 10ms) which leads to an interrupt in the stream when it tries to poll from it (and not while it is still processing the previous item).
Done. |
zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala
Outdated
Show resolved
Hide resolved
... by waiting till the end event is processed (@svroonland's idea).
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the Diagnostics trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events. In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance. **Breaking change** Since the rebalance events are changed, this is a breaking change.
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the
Diagnostics
trait, there are no guarantees on how well it behaves.This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use.
To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events.
In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance.
Breaking change
Since the rebalance events are changed, this is a breaking change.