Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect agains user diagnostics, better rebalance events #1102

Merged
merged 7 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.kafka.common._
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess }
import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.utils.SslHelper
Expand Down Expand Up @@ -175,15 +176,23 @@ object Consumer {
} yield consumer
}

/**
* A new consumer.
*
* @param diagnostics
* an optional callback for key events in the consumer life-cycle. The callbacks will be executed in a separate
* fiber. Since the events are queued, failure to handle these events leads to out of memory errors
*/
def make(
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics)
_ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ object DiagnosticEvent {
final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit
}

sealed trait Rebalance extends DiagnosticEvent
object Rebalance {
final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance
final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance
final case class Lost(partitions: Set[TopicPartition]) extends Rebalance
}
final case class Rebalance(
revoked: Set[TopicPartition],
assigned: Set[TopicPartition],
lost: Set[TopicPartition],
ended: Set[TopicPartition]
) extends DiagnosticEvent

sealed trait Finalization extends DiagnosticEvent
object Finalization {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zio.kafka.consumer.diagnostics

import zio.{ Queue, Scope, UIO, ZIO }
import zio.stream.ZStream
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.ConsumerFinalized

trait Diagnostics {
def emit(event: => DiagnosticEvent): UIO[Unit]
Expand All @@ -10,11 +12,32 @@ object Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = ZIO.unit
}

final case class SlidingQueue private (queue: Queue[DiagnosticEvent]) extends Diagnostics {
final case class SlidingQueue private[Diagnostics] (queue: Queue[DiagnosticEvent]) extends Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit
}

object SlidingQueue {
def make(queueSize: Int = 16): ZIO[Scope, Nothing, SlidingQueue] =
ZIO.acquireRelease(Queue.sliding[DiagnosticEvent](queueSize))(_.shutdown).map(SlidingQueue(_))
}

object ConcurrentDiagnostics {

/**
* @return
* a `Diagnostics` that runs the wrapped `Diagnostics` concurrently in a separate fiber. Events are emitting to
* the fiber via an unbounded queue
*/
def make(wrapped: Diagnostics): ZIO[Scope, Nothing, Diagnostics] =
if (wrapped == Diagnostics.NoOp) ZIO.succeed(Diagnostics.NoOp)
else {
for {
queue <- ZIO.acquireRelease(Queue.unbounded[DiagnosticEvent])(_.shutdown)
fib <- ZStream.fromQueue(queue).tap(wrapped.emit(_)).takeUntil(_ == ConsumerFinalized).runDrain.forkScoped
_ <- ZIO.addFinalizer(queue.offer(ConsumerFinalized) *> fib.await)
} yield new Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance }
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.fetch.FetchStrategy
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
Expand Down Expand Up @@ -91,7 +91,6 @@ private[consumer] final class Runloop private (
onAssigned = (assignedTps, _) =>
for {
_ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assignedTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
Expand All @@ -103,7 +102,6 @@ private[consumer] final class Runloop private (
onRevoked = (revokedTps, _) =>
for {
_ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revokedTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
Expand All @@ -115,7 +113,6 @@ private[consumer] final class Runloop private (
onLost = (lostTps, _) =>
for {
_ <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
Expand Down Expand Up @@ -307,7 +304,7 @@ private[consumer] final class Runloop private (
_ <- ZIO.logDebug(
s"Starting poll with ${state.pendingRequests.size} pending requests and" +
s" ${state.pendingCommits.size} pending commits," +
s" resuming ${partitionsToFetch} partitions"
s" resuming $partitionsToFetch partitions"
)
_ <- currentStateRef.set(state)
pollResult <-
Expand Down Expand Up @@ -387,6 +384,14 @@ private[consumer] final class Runloop private (
_ <-
committedOffsetsRef.update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit]

_ <- diagnostics.emit(
Rebalance(
revoked = revokedTps,
assigned = assignedTps,
lost = lostTps,
ended = endedStreams.map(_.tp).toSet
)
)
} yield Runloop.PollResult(
records = polledRecords,
ignoreRecordsForTps = ignoreRecordsForTps,
Expand Down
Loading