Skip to content

Commit

Permalink
Document transactions
Browse files Browse the repository at this point in the history
Document the transactional producer introduced in #1425.
  • Loading branch information
erikvanoosten committed Jan 9, 2025
1 parent dc5abd6 commit cd107ab
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const sidebars = {
"preventing-duplicates",
"sharing-consumer",
"serialization-and-deserialization",
"transactions",
"writing-tests"
]
}
Expand Down
259 changes: 259 additions & 0 deletions docs/transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
---
id: transactions
title: "Transactions"
---

> This document describes Kafka transactions for zio-kafka version 3.0.0 and later.
## What are Kafka Transactions?

[Kafka transactions](https://www.confluent.io/blog/transactions-apache-kafka/) are different from what you may know from
relational databases. In Kafka a transaction means that your program consumes some records, then produces some other
records (to the same broker) and then by committing the _consumed_ records the transaction is committed, and the
produced records become available to other consumers.

## A warning about consuming transactional records

By default, produced records become immediately visible for consumers, even if they are produced in a not yet committed
Kafka transaction! To prevent reading transactional records before they are committed, you need to configure every
consumer with the `isolation.level` property set to `read_committed`. For example as follows:

```scala
val settings = ConsumerSettings(bootstrapServers)
.withGroupId(groupId)
.withProperty("isolation.level", "read_committed") // Only read committed records
.... etc.
```

## Producing transactional records

In order to produce records transactionally, we need a `Consumer` and a `TransactionalProducer` that will work together
to commit transactions.

First we create layers for the `ConsumerSettings` and the `Consumer`.
Note that rebalance-safe-commits must be enabled (see the background section below for more information).
Since the default `maxRebalanceDuration` is quite high, you may also consider setting it to a lower duration, one in
which you are sure all records of a poll can be processed.

```scala
val bootstrapBrokers = List("localhost:9092")
val consumerSettings = ZLayer.succeed {
ConsumerSettings(bootstrapBrokers)
.withGroupId("somegroupid")
.withRebalanceSafeCommits(true) // required!
.withMaxRebalanceDuration(30.seconds)
}
val consumer = Consumer.live
```

Now we can create layers for the `ProducerSettings` and the `TransactionalProducer`.
Note that the producer connects (and must connect) to the same brokers as the consumer.

DEVELOPER NOTE: here we assume that we no longer have `TransactionalProducerSettings`. TODO: check this assertion

```scala
val producerSettings = ZLayer.succeed {
ProducerSettings(bootstrapServers = bootstrapBrokers)
}
val producer: ZLayer[TransactionalProducerSettings with Consumer, Throwable, TransactionalProducer] =
TransactionalProducer.live
```

Notice that the `producer` layer also requires a `Consumer`. This is the consumer that will be used to commit the
consumed messages.

With these in place we can look at the application logic of consuming and producing. In this example we use the
`plainStream` method to consume records with a `Long` value and an `Int` key.

```scala
val consumedRecordsStream = consumer
.plainStream(Subscription.topics("my-input-topic"), Serde.int, Serde.long)
```

For each consumed records, we then create a `ProducerRecord` with a `String` value containing `Copy of <input value>`.
Note, that we also keep the offset of the consumed record so that we can commit it later.

```scala
val producedRecordsStream = consumedRecordsStream
.mapChunks { records: Chunk[CommittableRecord[Int, Long]] =>
records.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = "Copy of " + value.toString

val producerRecord: ProducerRecord[Int, String] =
new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
}
```

Typically, to optimize throughput, we want to produce records in batches. The underlying chunking structure of the
consumer stream is ideal for that because zio-kafka guarantees that each chunk in the stream corresponds to the records
that were fetched together. However, we need to be careful to retain the chunking structure. For example, we should
not use `.map` because it results in a stream where each chunk contains only a single item. Therefore, we use
`.mapChunks` instead of `.map` and `.mapChunksZIO` instead of `.mapZIO`.

These new records can now be produced. Let's build it up slowly.

```scala
producedRecordsStream
.mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] =>
???
}
```

Let's work on those question marks. We need to create a transaction.
Each zio-kafka transaction runs in a ZIO `Scope`; the transaction commits when the scope closes.

```scala
ZIO.scoped {
??? // transaction stuff
}
```

We also want to try to complete the transaction, even when the program is shutting down.
Therefore, we add `.uninterruptible` and get:

```scala
ZIO
.scoped {
??? // transaction stuff
}
.uninterruptible
```

Now we can fill in the 'transaction stuff': we create the transaction and use it to produce some records.

```scala
for {
tx <- transactionalProducer.createTransaction
_ <- {
val (records, offsets) = recordsAndOffsets.unzip
tx.produceChunkBatch(
records,
Serde.int,
Serde.string,
OffsetBatch(offsets)
)
}
// running inside `mapChunksZIO`, we need to return a Chunk.
// Since we're not using the result, the empty chunk will do.
} yield Chunk.empty
```

Notice how we pass the offsets of the consumed records. Once the transaction completes, these are the offsets that will
be committed.

Putting it all together we get:

```scala
import org.apache.kafka.clients.producer.ProducerRecord
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer._
import zio.kafka.producer._
import zio.kafka.serde.Serde

import java.util.UUID

object Transactional extends ZIOAppDefault {

private val topic = "transactional-test-topic"

val bootstrapBrokers = List("localhost:9092")

val consumerSettings = ZLayer.succeed {
ConsumerSettings(bootstrapBrokers)
.withGroupId("transactional-example-app")
.withRebalanceSafeCommits(true) // required for transactional producing
.withMaxRebalanceDuration(30.seconds)
}

val producerSettings = ZLayer.succeed {
TransactionalProducerSettings(
bootstrapServers = bootstrapBrokers,
transactionalId = UUID.randomUUID().toString // TODO: do we still need this?
)
}

private val runConsumerStream: ZIO[Consumer & TransactionalProducer, Throwable, Unit] =
for {
consumer <- ZIO.service[Consumer]
transactionalProducer <- ZIO.service[TransactionalProducer]
_ <- ZIO.logInfo(s"Consuming messages from topic $topic...")
_ <- consumer
.plainStream(Subscription.topics(topic), Serde.int, Serde.long)
.mapChunks { records: Chunk[CommittableRecord[Int, Long]] =>
records.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = "Copy of " + value.toString

val producerRecord: ProducerRecord[Int, String] =
new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
}
.mapChunksZIO { recordsAndOffsets: Chunk[(ProducerRecord[Int, String], Offset)] =>
ZIO
.scoped {
for {
_ <- ZIO.addFinalizer(ZIO.logInfo("Completing transaction"))
tx <- transactionalProducer.createTransaction
_ <- {
val (records, offsets) = recordsAndOffsets.unzip
tx.produceChunkBatch(
records,
Serde.int,
Serde.string,
OffsetBatch(offsets)
)
}
} yield Chunk.empty
}
.uninterruptible
}
.runDrain
} yield ()

override def run: ZIO[Scope, Any, Any] =
runConsumerStream
.provide(
consumerSettings,
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live,
producerSettings,
TransactionalProducer.live
)

}
```

## Technical background - How does it all work?

Kafka must ensure that each record is only processed once, even though a partition can be revoked, assigned to another
consumer, all while the original consumer knows nothing about this. In this situation the original consumer will happily
continue producing records even though the partition was lost.

Kafka does this by validating the group epoch. The group epoch is a number that gets increased after every rebalance.
When the transactional producer commits a transaction with the offsets of consumed records, it also includes the group
epoch of when the records were consumed. If the broker detects a commit with a group epoch that is not equal to the
current epoch, it will reject the commit.

In addition, to prevent an old consumer from continuing to poll into the next epoch without participating in the
rebalance, consumers polling within the wrong group epoch are 'fenced'. A fenced consumer dies with a
`FencedInstanceIdException`.

### How zio-kafka helps

With zio-kafka you can easily couple the transactional producer to a consumer to commit transactions. ZIO scopes are
used to define the transaction boundaries. Within the scope you can produce records one by one, all in 1 batch or in
multiple batches. Once the scope closes, all collected offsets are committed.

In addition, zio-kafka prevents failed commits. It does this by holding up every rebalance until all consumed records
are committed. This feature is called rebalance-safe-commits. By holding up the rebalance, records can be processed in
the same group epoch as it was consumed in.

When a zio-kafka consumer misses a rebalance and continues polling, there is nothing that can be done. It will die with
a `FencedInstanceIdException`. In these cases the consumer (or perhaps simpler, the whole program) should restart.
25 changes: 0 additions & 25 deletions zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.serde.Serde
import zio.logging.backend.SLF4J

trait MyKafka {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}

object MyKafka {
final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true))
}

val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped {
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS"
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}
}

object Main extends ZIOAppDefault {

/**
Expand Down
28 changes: 28 additions & 0 deletions zio-kafka-example/src/main/scala/zio/kafka/example/MyKafka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._

trait MyKafka {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}

object MyKafka {
final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends MyKafka {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true))
}

val embedded: ZLayer[Any, Throwable, MyKafka] = ZLayer.scoped {
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS"
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}
}
Loading

0 comments on commit cd107ab

Please sign in to comment.