From 4a0176f9b42cdfda81449195fdc195da809f435a Mon Sep 17 00:00:00 2001 From: svroonland Date: Sat, 7 Dec 2024 10:19:07 +0100 Subject: [PATCH] Convert commitAsync callback handling to ZIO sooner (#1404) KafkaConsumer's `commitAsync` takes a callback, which we program against with complicated followup code. This PR attempts to convert everything to ZIO's earlier on, making chaining followup effects easier to reason about. As this changes some functionality around locking and same / single threads, here's a summary of what do we need to ensure: * We have an exclusive lock on the consumer when calling `commitAsync`. In `Runloop.run` this is done using `ConsumerAccess`. In the rebalance coordinator (while rebalancing) we already have the lock as we're calling `poll()` so no need for extra locking. * The consumer is not used from more than one thread at the same time. For use in `Runloop.run` we get this for free by guaranteeing exclusive access. In the rebalance coordinator a `poll()` call is in the middle of being executed and we need to call `commitAsync` on the same thread as the rebalance listener is invoked. Anything that is not calling commitAsync is free to run on any thread as executed by the default ZIO runtime. --------- Co-authored-by: Erik van Oosten --- .../zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../consumer/internal/CommitterSpec.scala | 144 ++++++++++-------- .../internal/RebalanceCoordinatorSpec.scala | 33 ++-- .../kafka/consumer/internal/Committer.scala | 11 +- .../consumer/internal/LiveCommitter.scala | 143 +++++++++-------- .../internal/RebalanceCoordinator.scala | 15 +- .../zio/kafka/consumer/internal/Runloop.scala | 19 ++- 7 files changed, 198 insertions(+), 169 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 13d51c789..ce7cbae65 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1591,6 +1591,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSome[Scope & Kafka](producer) .provideSomeShared[Scope]( Kafka.embedded - ) @@ withLiveClock @@ timeout(2.minutes) + ) @@ withLiveClock @@ timeout(2.minutes) @@ TestAspect.timed } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala index 9e0832201..c505dc237 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala @@ -1,28 +1,26 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ MockConsumer, OffsetAndMetadata, OffsetCommitCallback, OffsetResetStrategy } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RebalanceInProgressException import zio.kafka.consumer.diagnostics.Diagnostics import zio.test._ -import zio.{ durationInt, Promise, Ref, ZIO } +import zio.{ durationInt, Promise, Queue, Ref, Task, Unsafe, ZIO } import java.util.{ Map => JavaMap } -import scala.jdk.CollectionConverters.MapHasAsJava +import scala.jdk.CollectionConverters.{ MapHasAsJava, MapHasAsScala } object CommitterSpec extends ZIOSpecDefault { override def spec = suite("Committer")( test("signals that a new commit is available") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter .make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) _ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped @@ -31,81 +29,82 @@ object CommitterSpec extends ZIOSpecDefault { }, test("handles a successful commit by completing the commit effect") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) - _ <- commitFiber.join + consumer <- createMockConsumer(offsets => ZIO.succeed(offsets)) + _ <- committer.processQueuedCommits(consumer) + _ <- commitFiber.join } yield assertCompletes }, test("handles a failed commit by completing the commit effect with a failure") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => - ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed"))) - ) - result <- commitFiber.await + consumer <- createMockConsumer(_ => ZIO.fail(new RuntimeException("Commit failed"))) + _ <- committer.processQueuedCommits(consumer) + result <- commitFiber.await } yield assertTrue(result.isFailure) }, test("retries when rebalancing") { for { - runtime <- ZIO.runtime[Any] - commitAvailable <- Promise.make[Nothing, Unit] + commitAvailable <- Queue.bounded[Unit](1) committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.offer(()).unit ) tp = new TopicPartition("topic", 0) commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped - _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => - ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress"))) - ) - _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) - result <- commitFiber.await - } yield assertTrue(result.isSuccess) + _ <- commitAvailable.take + callCount <- Ref.make(0) + consumer <- + createMockConsumer { offsets => + callCount.updateAndGet(_ + 1).flatMap { count => + if (count == 1) { + ZIO.fail(new RebalanceInProgressException("Rebalance in progress")) + } else { + ZIO.succeed(offsets) + } + } + } + _ <- committer.processQueuedCommits(consumer) + _ <- commitAvailable.take + _ <- committer.processQueuedCommits(consumer) + _ <- commitFiber.join + } yield assertCompletes }, test("adds 1 to the committed last offset") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) _ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped _ <- commitAvailable.await committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] - _ <- committer.processQueuedCommits((offsets, callback) => - committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) - ) + consumer <- createMockConsumer(offsets => committedOffsets.succeed(offsets.asJava).as(offsets)) + _ <- committer.processQueuedCommits(consumer) offsetsCommitted <- committedOffsets.await } yield assertTrue( offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava @@ -113,7 +112,6 @@ object CommitterSpec extends ZIOSpecDefault { }, test("batches commits from multiple partitions and offsets") { for { - runtime <- ZIO.runtime[Any] commitsAvailable <- Promise.make[Nothing, Unit] nrCommitsDone <- Ref.make(0) committer <- LiveCommitter.make( @@ -121,8 +119,7 @@ object CommitterSpec extends ZIOSpecDefault { Diagnostics.NoOp, new DummyMetrics, onCommitAvailable = - ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit, - sameThreadRuntime = runtime + ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit ) tp = new TopicPartition("topic", 0) tp2 = new TopicPartition("topic", 1) @@ -131,9 +128,8 @@ object CommitterSpec extends ZIOSpecDefault { commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped _ <- commitsAvailable.await committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] - _ <- committer.processQueuedCommits((offsets, callback) => - committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) - ) + consumer <- createMockConsumer(offsets => committedOffsets.succeed(offsets.asJava).as(offsets)) + _ <- committer.processQueuedCommits(consumer) _ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join offsetsCommitted <- committedOffsets.await } yield assertTrue( @@ -142,63 +138,85 @@ object CommitterSpec extends ZIOSpecDefault { }, test("keeps track of pending commits") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) - commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped - _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + consumer <- createMockConsumer(offsets => ZIO.succeed(offsets)) + _ <- committer.processQueuedCommits(consumer) pendingCommitsDuringCommit <- committer.pendingCommitCount + _ <- commitFiber.join _ <- committer.cleanupPendingCommits pendingCommitsAfterCommit <- committer.pendingCommitCount - _ <- commitFiber.join } yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0) }, test("keep track of committed offsets") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) - commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped - _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) - committedOffsets <- committer.getCommittedOffsets + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + consumer <- createMockConsumer(offsets => ZIO.succeed(offsets)) + _ <- committer.processQueuedCommits(consumer) _ <- commitFiber.join + committedOffsets <- committer.getCommittedOffsets } yield assertTrue(committedOffsets.offsets == Map(tp -> 0L)) }, test("clean committed offsets of no-longer assigned partitions") { for { - runtime <- ZIO.runtime[Any] commitAvailable <- Promise.make[Nothing, Unit] committer <- LiveCommitter.make( 10.seconds, Diagnostics.NoOp, new DummyMetrics, - onCommitAvailable = commitAvailable.succeed(()).unit, - sameThreadRuntime = runtime + onCommitAvailable = commitAvailable.succeed(()).unit ) tp = new TopicPartition("topic", 0) - commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped - _ <- commitAvailable.await - _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) - _ <- committer.keepCommitsForPartitions(Set.empty) - committedOffsets <- committer.getCommittedOffsets + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + consumer <- createMockConsumer(offsets => ZIO.succeed(offsets)) + _ <- committer.processQueuedCommits(consumer) _ <- commitFiber.join + _ <- committer.keepCommitsForPartitions(Set.empty) + committedOffsets <- committer.getCommittedOffsets } yield assertTrue(committedOffsets.offsets.isEmpty) } ) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100) + + private def createMockConsumer( + onCommitAsync: Map[TopicPartition, OffsetAndMetadata] => Task[Map[TopicPartition, OffsetAndMetadata]] + ): ZIO[Any, Nothing, MockConsumer[Array[Byte], Array[Byte]]] = + ZIO.runtime[Any].map { runtime => + new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.LATEST) { + override def commitAsync( + offsets: JavaMap[TopicPartition, OffsetAndMetadata], + callback: OffsetCommitCallback + ): Unit = + Unsafe.unsafe { implicit unsafe => + runtime.unsafe + .run( + onCommitAsync(offsets.asScala.toMap) + .tapBoth( + e => ZIO.attempt(callback.onComplete(offsets, e.asInstanceOf[Exception])), + offsets => ZIO.attempt(callback.onComplete(offsets.asJava, null)) + ) + .ignore + ) + .getOrThrowFiberFailure() + } + + } + } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 562db3f2c..514063acf 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -5,6 +5,7 @@ import org.apache.kafka.common.TopicPartition import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.LiveCommitter.Commit import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord @@ -96,8 +97,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { records = createTestRecords(3) recordsPulled <- Promise.make[Nothing, Unit] _ <- streamControl.offerRecords(records) - runtime <- ZIO.runtime[Any] - committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit, runtime) + committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit) streamDrain <- streamControl.stream @@ -173,16 +173,19 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second), rebalanceSafeCommits: Boolean = false ): ZIO[Scope, Throwable, RebalanceCoordinator] = - Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess => - new RebalanceCoordinator( - lastEvent, - settings.withRebalanceSafeCommits(rebalanceSafeCommits), - consumerAccess, - 5.seconds, - ZIO.succeed(assignedStreams), - committer - ) - } + Semaphore + .make(1) + .map(new ConsumerAccess(mockConsumer, _)) + .map { consumerAccess => + new RebalanceCoordinator( + lastEvent, + settings.withRebalanceSafeCommits(rebalanceSafeCommits), + consumerAccess, + 5.seconds, + ZIO.succeed(assignedStreams), + committer + ) + } private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] = Chunk.fromIterable( @@ -205,10 +208,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { abstract class MockCommitter extends Committer { override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit - override def processQueuedCommits( - commitAsync: (java.util.Map[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => zio.Task[Unit], - executeOnEmpty: Boolean - ): zio.Task[Unit] = ZIO.unit + override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit + override def queueSize: UIO[Int] = ZIO.succeed(0) override def pendingCommitCount: UIO[Int] = ZIO.succeed(0) override def getPendingCommits: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala index ab8b98ac5..a40427dcc 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -1,13 +1,13 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.LiveCommitter.Commit import zio.{ Chunk, Task, UIO } import java.lang.Math.max -import java.util.{ Map => JavaMap } import scala.collection.mutable private[internal] trait Committer { @@ -21,14 +21,13 @@ private[internal] trait Committer { * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations * may be used. Please see [[RebalanceCoordinator]] for more information. * - * @param commitAsync - * Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes. - * The caller should ensure exclusive access to the KafkaConsumer. + * @param consumer + * KafkaConsumer to use. The caller is responsible or guaranteeing exclusive access. * @param executeOnEmpty * Execute commitAsync() even if there are no commits */ def processQueuedCommits( - commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean = false ): Task[Unit] diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala index f976c1f9e..6ababd40f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -5,10 +5,10 @@ import org.apache.kafka.common.errors.RebalanceInProgressException import zio.kafka.consumer.Consumer.CommitTimeout import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.LiveCommitter.Commit -import zio.{ durationLong, Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO } +import zio.{ durationLong, Cause, Chunk, Duration, Exit, Promise, Queue, Ref, Scope, Task, UIO, Unsafe, ZIO } -import java.util import java.util.{ Map => JavaMap } import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -20,7 +20,6 @@ private[consumer] final class LiveCommitter( consumerMetrics: ConsumerMetrics, onCommitAvailable: UIO[Unit], committedOffsetsRef: Ref[CommitOffsets], - sameThreadRuntime: Runtime[Any], pendingCommits: Ref.Synchronized[Chunk[Commit]] ) extends Committer { @@ -44,31 +43,32 @@ private[consumer] final class LiveCommitter( * may be used. Please see [[RebalanceCoordinator]] for more information. */ override def processQueuedCommits( - commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean = false ): Task[Unit] = for { commits <- commitQueue.takeAll _ <- ZIO.logDebug(s"Processing ${commits.size} commits") - _ <- ZIO.unless(commits.isEmpty && !executeOnEmpty) { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - pendingCommits.update(_ ++ commits) *> - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - commitAsync(offsets, callback) - .catchAll(onFailure) + _ <- ZIO.when(commits.nonEmpty || executeOnEmpty) { + val offsets = mergeCommitOffsets(commits) + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + + for { + _ <- pendingCommits.update(_ ++ commits) + startTime <- ZIO.clockWith(_.nanoTime) + _ <- commitAsyncZIO( + consumer, + offsetsWithMetaData, + doOnComplete = handleCommitCompletion(commits, offsetsWithMetaData, startTime, _) + ) + // We don't wait for the completion of the commit here, because it will only complete once we poll again. + } yield () } } yield () - /** - * Merge commits and prepare parameters for calling `consumer.commitAsync`. - * - * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations - * may be used. Please see [[RebalanceCoordinator]] for more information. - */ - private def asyncCommitParameters( - commits: Chunk[Commit] - ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { - val offsets = commits + private def mergeCommitOffsets(commits: Chunk[Commit]): Map[TopicPartition, OffsetAndMetadata] = + commits .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => commit.offsets.foreach { case (tp, offset) => acc += (tp -> acc @@ -79,44 +79,70 @@ private[consumer] final class LiveCommitter( acc } .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => - tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) - } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - // We assume the commit is started immediately after returning from this method. - val startTime = java.lang.System.nanoTime() - val onSuccess = { - val endTime = java.lang.System.nanoTime() - val latency = (endTime - startTime).nanoseconds - for { - offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) - _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) - result <- cont(Exit.unit) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) - } yield result - } - val onFailure: Throwable => UIO[Unit] = { - case _: RebalanceInProgressException => + + private def handleCommitCompletion( + commits: Chunk[Commit], + offsets: Map[TopicPartition, OffsetAndMetadata], + startTime: NanoTime, + commitResults: Either[Exception, Map[TopicPartition, OffsetAndMetadata]] + ): UIO[Unit] = + ZIO + .from(commitResults) + .zipLeft( for { - _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commitQueue.offerAll(commits) - _ <- onCommitAvailable + endTime <- ZIO.clockWith(_.nanoTime) + latency = (endTime - startTime).nanoseconds + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) } yield () - case err: Throwable => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) - } - val callback = - new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - sameThreadRuntime.unsafe.run { - if (exception eq null) onSuccess else onFailure(exception) - } - .getOrThrowFiberFailure() - } + ) + .zipLeft(ZIO.foreachDiscard(commits)(_.cont.done(Exit.unit))) + .tap(offsetsWithMetaData => diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))) + .catchAllCause { + case Cause.Fail(_: RebalanceInProgressException, _) => + for { + _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") + _ <- commitQueue.offerAll(commits) + _ <- onCommitAvailable + } yield () + case c => + ZIO.foreachDiscard(commits)(_.cont.done(Exit.fail(c.squash))) <* diagnostics.emit( + DiagnosticEvent.Commit.Failure(offsets, c.squash) + ) } - (offsetsWithMetaData.asJava, callback, onFailure) - } + .ignore + + /** + * Wrapper that converts KafkaConsumer#commitAsync to ZIO + * + * @return + * Task whose completion indicates completion of the commitAsync call. + */ + private def commitAsyncZIO( + consumer: ByteArrayKafkaConsumer, + offsets: Map[TopicPartition, OffsetAndMetadata], + doOnComplete: Either[Exception, Map[TopicPartition, OffsetAndMetadata]] => UIO[Unit] + ): Task[Unit] = + for { + runtime <- ZIO.runtime[Any] + _ <- ZIO.attempt { + consumer.commitAsync( + offsets.asJava, + new OffsetCommitCallback { + override def onComplete( + offsets: JavaMap[TopicPartition, OffsetAndMetadata], + exception: Exception + ): Unit = + Unsafe.unsafe { implicit unsafe => + runtime.unsafe.run { + if (exception == null) doOnComplete(Right(offsets.asScala.toMap)) + else doOnComplete(Left(exception)) + }.getOrThrowFiberFailure() + } + } + ) + } + } yield () override def queueSize: UIO[Int] = commitQueue.size @@ -139,8 +165,7 @@ private[internal] object LiveCommitter { commitTimeout: Duration, diagnostics: Diagnostics, consumerMetrics: ConsumerMetrics, - onCommitAvailable: UIO[Unit], - sameThreadRuntime: Runtime[Any] + onCommitAvailable: UIO[Unit] ): ZIO[Scope, Nothing, LiveCommitter] = for { pendingCommits <- Ref.Synchronized.make(Chunk.empty[Commit]) commitQueue <- ZIO.acquireRelease(Queue.unbounded[Commit])(_.shutdown) @@ -152,7 +177,6 @@ private[internal] object LiveCommitter { consumerMetrics, onCommitAvailable, committedOffsetsRef, - sameThreadRuntime, pendingCommits ) @@ -163,5 +187,4 @@ private[internal] object LiveCommitter { ) { @inline def isPending: UIO[Boolean] = cont.isDone.negate } - } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala index 43be57980..3aa87af3b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala @@ -1,13 +1,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer -import zio.kafka.consumer.internal.RebalanceCoordinator.{ - EndOffsetCommitPending, - EndOffsetCommitted, - EndOffsetNotCommitted, - RebalanceEvent, - StreamCompletionStatus -} +import zio.kafka.consumer.internal.RebalanceCoordinator._ import zio.kafka.consumer.{ ConsumerSettings, RebalanceListener } import zio.stream.ZStream import zio.{ durationInt, Chunk, Duration, Ref, Task, UIO, ZIO } @@ -164,12 +158,7 @@ private[internal] class RebalanceCoordinator( // Even if there is nothing to commit, continue to drive communication with the broker // so that commits can complete and the streams can make progress, by setting // executeOnEmpty = true - .tap { _ => - committer.processQueuedCommits( - (offsets, callback) => ZIO.attempt(consumer.commitAsync(offsets, callback)), - executeOnEmpty = true - ) - } + .tap(_ => committer.processQueuedCommits(consumer, executeOnEmpty = true)) .takeWhile(_ => java.lang.System.nanoTime() <= deadline) .mapZIO(_ => endingStreamsCompletedAndCommitsExist) .takeUntil(completed => completed) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index b4ceddbb2..3c277d8c0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -9,9 +9,9 @@ import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.stream._ import scala.jdk.CollectionConverters._ @@ -206,10 +206,11 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = { for { - partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + pendingCommitCount <- committer.pendingCommitCount _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + - s" ${committer.pendingCommitCount} pending commits," + + s" ${pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) _ <- currentStateRef.set(state) @@ -482,6 +483,8 @@ private[consumer] final class Runloop private ( * - Poll continuously when there are (still) unfulfilled requests or pending commits * - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after * initialization and rebalancing + * + * Note that this method is executed on a dedicated single-thread Executor */ private def run(initialState: State): ZIO[Scope, Throwable, Any] = { import Runloop.StreamOps @@ -492,11 +495,7 @@ private[consumer] final class Runloop private ( .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") - _ <- consumer.runloopAccess { consumer => - committer.processQueuedCommits((offsets, callback) => - ZIO.attempt(consumer.commitAsync(offsets, callback)) - ) - } + _ <- consumer.runloopAccess(committer.processQueuedCommits(_)) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) @@ -594,8 +593,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commandQueue.offer(RunloopCommand.CommitAvailable).unit, - sameThreadRuntime + commandQueue.offer(RunloopCommand.CommitAvailable).unit ) rebalanceCoordinator = new RebalanceCoordinator( lastRebalanceEvent, @@ -624,6 +622,7 @@ object Runloop { _ <- runloop.observeRunloopMetrics(settings.runloopMetricsSchedule).forkScoped // Run the entire loop on a dedicated thread to avoid executor shifts + executor <- RunloopExecutor.newInstance fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped waitForRunloopStop = fiber.join.orDie