diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 843087c5c..434816f96 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -23,6 +23,7 @@ private[consumer] final class Runloop private ( sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, commandQueue: Queue[RunloopCommand], + commitAvailableQueue: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, @@ -491,6 +492,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) + .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { @@ -583,9 +585,11 @@ object Runloop { partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + // A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll + commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) @@ -596,7 +600,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commandQueue.offer(RunloopCommand.CommitAvailable).unit + commitAvailableQueue.offer(true).unit ) rebalanceCoordinator = new RebalanceCoordinator( lastRebalanceEvent, @@ -612,6 +616,7 @@ object Runloop { sameThreadRuntime = sameThreadRuntime, consumer = consumer, commandQueue = commandQueue, + commitAvailableQueue = commitAvailableQueue, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval,