diff --git a/docs/sidebars.js b/docs/sidebars.js index a6d28fb87..b8dcf4f1f 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -14,6 +14,7 @@ const sidebars = { "preventing-duplicates", "sharing-consumer", "serialization-and-deserialization", + "transactions", "writing-tests" ] } diff --git a/docs/transactions.md b/docs/transactions.md new file mode 100644 index 000000000..ea4e2118d --- /dev/null +++ b/docs/transactions.md @@ -0,0 +1,259 @@ +--- +id: transactions +title: "Transactions" +--- + +> This document describes Kafka transactions for zio-kafka version 3.0.0 and later. + +## What are Kafka Transactions? + +[Kafka transactions](https://www.confluent.io/blog/transactions-apache-kafka/) are different from what you may know from +relational databases. In Kafka a transaction means that your program consumes some records, then produces some other +records (to the same broker) and then by committing the _consumed_ records the transaction is committed, and the +produced records become available to other consumers. + +## A warning about consuming transactional records + +By default, produced records become immediately visible for consumers, even if they are produced in a not yet committed +Kafka transaction! To prevent reading transactional records before they are committed, you need to configure every +consumer with the `isolation.level` property set to `read_committed`. For example as follows: + +```scala +val settings = ConsumerSettings(bootstrapServers) + .withGroupId(groupId) + .withProperty("isolation.level", "read_committed") // Only read committed records +.... etc. +``` + +## Producing transactional records + +In order to produce records transactionally, we need a `Consumer` and a `TransactionalProducer` that will work together +to commit transactions. + +First we create layers for the `ConsumerSettings` and the `Consumer`. +Note that rebalance-safe-commits must be enabled (see the background section below for more information). +Since the default `maxRebalanceDuration` is quite high, you may also consider setting it to a lower duration, one in +which you are sure all records of a poll can be processed. + +```scala +val bootstrapBrokers = List("localhost:9092") +val consumerSettings = ZLayer.succeed { + ConsumerSettings(bootstrapBrokers) + .withGroupId("somegroupid") + .withRebalanceSafeCommits(true) // required! + .withMaxRebalanceDuration(30.seconds) +} +val consumer = Consumer.live +``` + +Now we can create layers for the `ProducerSettings` and the `TransactionalProducer`. +Note that the producer connects (and must connect) to the same brokers as the consumer. + +DEVELOPER NOTE: here we assume that we no longer have `TransactionalProducerSettings`. TODO: check this assertion + +```scala +val producerSettings = ZLayer.succeed { + ProducerSettings(bootstrapServers = bootstrapBrokers) +} +val producer: ZLayer[TransactionalProducerSettings with Consumer, Throwable, TransactionalProducer] = + TransactionalProducer.live +``` + +Notice that the `producer` layer also requires a `Consumer`. This is the consumer that will be used to commit the +consumed messages. + +With these in place we can look at the application logic of consuming and producing. In this example we use the +`plainStream` method to consume records with a `Long` value and an `Int` key. + +```scala +val consumedRecordsStream = consumer + .plainStream(Subscription.topics("my-input-topic"), Serde.int, Serde.long) +``` + +For each consumed records, we then create a `ProducerRecord` with a `String` value containing `Copy of `. +Note, that we also keep the offset of the consumed record so that we can commit it later. + +```scala +val producedRecordsStream = consumedRecordsStream + .mapChunks { records: Chunk[CommittableRecord[Int, Long]] => + records.map { record => + val key: Int = record.record.key() + val value: Long = record.record.value() + val newValue: String = "Copy of " + value.toString + + val producerRecord: ProducerRecord[Int, String] = + new ProducerRecord("my-output-topic", key, newValue) + (producerRecord, record.offset) + } + } +``` + +Typically, to optimize throughput, we want to produce records in batches. The underlying chunking structure of the +consumer stream is ideal for that because zio-kafka guarantees that each chunk in the stream corresponds to the records +that were fetched together. However, we need to be careful to retain the chunking structure. For example, we should +not use `.map` because it results in a stream where each chunk contains only a single item. Therefore, we use +`.mapChunks` instead of `.map` and `.mapChunksZIO` instead of `.mapZIO`. + +These new records can now be produced. Let's build it up slowly. + +```scala +producedRecordsStream + .mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] => + ??? + } +``` + +Let's work on those question marks. We need to create a transaction. +Each zio-kafka transaction runs in a ZIO `Scope`; the transaction commits when the scope closes. + +```scala +ZIO.scoped { + ??? // transaction stuff +} +``` + +We also want to try to complete the transaction, even when the program is shutting down. +Therefore, we add `.uninterruptible` and get: + +```scala +ZIO + .scoped { + ??? // transaction stuff + } + .uninterruptible +``` + +Now we can fill in the 'transaction stuff': we create the transaction and use it to produce some records. + +```scala +for { + tx <- transactionalProducer.createTransaction + _ <- { + val (records, offsets) = recordsAndOffsets.unzip + tx.produceChunkBatch( + records, + Serde.int, + Serde.string, + OffsetBatch(offsets) + ) + } + // running inside `mapChunksZIO`, we need to return a Chunk. + // Since we're not using the result, the empty chunk will do. +} yield Chunk.empty +``` + +Notice how we pass the offsets of the consumed records. Once the transaction completes, these are the offsets that will +be committed. + +Putting it all together we get: + +```scala +import org.apache.kafka.clients.producer.ProducerRecord +import zio._ +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer._ +import zio.kafka.producer._ +import zio.kafka.serde.Serde + +import java.util.UUID + +object Transactional extends ZIOAppDefault { + + private val topic = "transactional-test-topic" + + val bootstrapBrokers = List("localhost:9092") + + val consumerSettings = ZLayer.succeed { + ConsumerSettings(bootstrapBrokers) + .withGroupId("transactional-example-app") + .withRebalanceSafeCommits(true) // required for transactional producing + .withMaxRebalanceDuration(30.seconds) + } + + val producerSettings = ZLayer.succeed { + TransactionalProducerSettings( + bootstrapServers = bootstrapBrokers, + transactionalId = UUID.randomUUID().toString // TODO: do we still need this? + ) + } + + private val runConsumerStream: ZIO[Consumer & TransactionalProducer, Throwable, Unit] = + for { + consumer <- ZIO.service[Consumer] + transactionalProducer <- ZIO.service[TransactionalProducer] + _ <- ZIO.logInfo(s"Consuming messages from topic $topic...") + _ <- consumer + .plainStream(Subscription.topics(topic), Serde.int, Serde.long) + .mapChunks { records: Chunk[CommittableRecord[Int, Long]] => + records.map { record => + val key: Int = record.record.key() + val value: Long = record.record.value() + val newValue: String = "Copy of " + value.toString + + val producerRecord: ProducerRecord[Int, String] = + new ProducerRecord("my-output-topic", key, newValue) + (producerRecord, record.offset) + } + } + .mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] => + ZIO + .scoped { + for { + _ <- ZIO.addFinalizer(ZIO.logInfo("Completing transaction")) + tx <- transactionalProducer.createTransaction + _ <- { + val (records, offsets) = recordsAndOffsets.unzip + tx.produceChunkBatch( + records, + Serde.int, + Serde.string, + OffsetBatch(offsets) + ) + } + } yield Chunk.empty + } + .uninterruptible + } + .runDrain + } yield () + + override def run: ZIO[Scope, Any, Any] = + runConsumerStream + .provide( + consumerSettings, + ZLayer.succeed(Diagnostics.NoOp), + Consumer.live, + producerSettings, + TransactionalProducer.live + ) + +} +``` + +## Technical background - How does it all work? + +Kafka must ensure that each record is only processed once, even though a partition can be revoked, assigned to another +consumer, all while the original consumer knows nothing about this. In this situation the original consumer will happily +continue producing records even though the partition was lost. + +Kafka does this by validating the group epoch. The group epoch is a number that gets increased after every rebalance. +When the transactional producer commits a transaction with the offsets of consumed records, it also includes the group +epoch of when the records were consumed. If the broker detects a commit with a group epoch that is not equal to the +current epoch, it will reject the commit. + +In addition, to prevent an old consumer from continuing to poll into the next epoch without participating in the +rebalance, consumers polling within the wrong group epoch are 'fenced'. A fenced consumer dies with a +`FencedInstanceIdException`. + +### How zio-kafka helps + +With zio-kafka you can easily couple the transactional producer to a consumer to commit transactions. ZIO scopes are +used to define the transaction boundaries. Within the scope you can produce records one by one, all in 1 batch or in +multiple batches. Once the scope closes, all collected offsets are committed. + +In addition, zio-kafka prevents failed commits. It does this by holding up every rebalance until all consumed records +are committed. This feature is called rebalance-safe-commits. By holding up the rebalance, records can be processed in +the same group epoch as it was consumed in. + +When a zio-kafka consumer misses a rebalance and continues polling, there is nothing that can be done. It will die with +a `FencedInstanceIdException`. In these cases the consumer (or perhaps simpler, the whole program) should restart. \ No newline at end of file diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala index a4baa348c..8019cec7a 100644 --- a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala @@ -1,36 +1,11 @@ package zio.kafka.example -import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } import zio._ import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription } import zio.kafka.serde.Serde import zio.logging.backend.SLF4J -trait MyKafka { - def bootstrapServers: List[String] - def stop(): UIO[Unit] -} - -object MyKafka { - final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka { - override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") - override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true)) - } - - val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped { - implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( - customBrokerProperties = Map( - "group.min.session.timeout.ms" -> "500", - "group.initial.rebalance.delay.ms" -> "0", - "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", - "super.users" -> "User:ANONYMOUS" - ) - ) - ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) - } -} - object Main extends ZIOAppDefault { /** diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/MyKafka.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/MyKafka.scala new file mode 100644 index 000000000..c9c02ba1c --- /dev/null +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/MyKafka.scala @@ -0,0 +1,28 @@ +package zio.kafka.example + +import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } +import zio._ + +trait MyKafka { + def bootstrapServers: List[String] + def stop(): UIO[Unit] +} + +object MyKafka { + final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka { + override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") + override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true)) + } + + val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped { + implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = Map( + "group.min.session.timeout.ms" -> "500", + "group.initial.rebalance.delay.ms" -> "0", + "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", + "super.users" -> "User:ANONYMOUS" + ) + ) + ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) + } +} diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/Transactional.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/Transactional.scala new file mode 100644 index 000000000..cf0d2802c --- /dev/null +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/Transactional.scala @@ -0,0 +1,94 @@ +package zio.kafka.example + +import org.apache.kafka.clients.producer.ProducerRecord +import zio._ +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer._ +import zio.kafka.producer._ +import zio.kafka.serde.Serde +import zio.logging.backend.SLF4J + +import java.util.UUID + +object Transactional extends ZIOAppDefault { + + /** + * See `zio-logging` documentation: https://zio.github.io/zio-logging/docs/overview/overview_slf4j + */ + override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = + zio.Runtime.removeDefaultLoggers >>> SLF4J.slf4j + + private val topic = "transactional-test-topic" + + private def consumerSettings: ZLayer[MyKafka, Throwable, ConsumerSettings] = ZLayer { + for { + kafka <- ZIO.service[MyKafka] + } yield ConsumerSettings(kafka.bootstrapServers) + .withGroupId("transactional-example-app") + .withRebalanceSafeCommits(true) // required for transactional producing + .withMaxRebalanceDuration(30.seconds) + } + + private val producerSettings: ZLayer[MyKafka, Throwable, TransactionalProducerSettings] = ZLayer { + for { + kafka <- ZIO.service[MyKafka] + } yield TransactionalProducerSettings( + bootstrapServers = kafka.bootstrapServers, + transactionalId = UUID.randomUUID().toString + ) + } + + private val runConsumerStream: ZIO[Consumer & TransactionalProducer, Throwable, Unit] = + for { + consumer <- ZIO.service[Consumer] + transactionalProducer <- ZIO.service[TransactionalProducer] + _ <- ZIO.logInfo(s"Consuming messages from topic $topic...") + _ <- consumer + .plainStream(Subscription.topics(topic), Serde.int, Serde.long) + .mapChunks { records: Chunk[CommittableRecord[Int, Long]] => + records.map { record => + val key: Int = record.record.key() + val value: Long = record.record.value() + val newValue: String = "Copy of " + value.toString + + val producerRecord: ProducerRecord[Int, String] = + new ProducerRecord("my-output-topic", key, newValue) + (producerRecord, record.offset) + } + } + .mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] => + ZIO + .scoped { + for { + _ <- ZIO.addFinalizer(ZIO.logInfo("Completing transaction")) + tx <- transactionalProducer.createTransaction + _ <- { + val (records, offsets) = recordsAndOffsets.unzip + tx.produceChunkBatch( + records, + Serde.int, + Serde.string, + OffsetBatch(offsets) + ) + } + } yield Chunk.empty + } + .uninterruptible + } + .runDrain + } yield () + + override def run: ZIO[Scope, Any, Any] = + ZIO.logInfo("Starting app") *> + ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *> + runConsumerStream + .provide( + consumerSettings, + ZLayer.succeed(Diagnostics.NoOp), + Consumer.live, + producerSettings, + TransactionalProducer.live, + MyKafka.embedded + ) + +}