forked from manub/scalatest-embedded-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExampleKafkaStreamsSpec.scala
75 lines (61 loc) · 2.57 KB
/
ExampleKafkaStreamsSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package net.manub.embeddedkafka.streams
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
import org.scalatest.{Matchers, WordSpec}
class ExampleKafkaStreamsSpec
extends WordSpec
with Matchers
with EmbeddedKafkaStreamsAllInOne {
implicit val config =
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)
val (inTopic, outTopic) = ("in", "out")
val stringSerde: Serde[String] = Serdes.String()
"A Kafka streams test" should {
"be easy to run with streams and consumer lifecycle management" in {
val streamBuilder = new KStreamBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
stream.to(stringSerde, stringSerde, outTopic)
runStreams(Seq(inTopic, outTopic), streamBuilder) {
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
publishToKafka(inTopic, "baz", "yaz")
withConsumer[String, String, Unit] { consumer =>
val consumedMessages: Stream[(String, String)] =
consumer.consumeLazily(outTopic)
consumedMessages.take(2) should be(
Seq("hello" -> "world", "foo" -> "bar"))
consumedMessages.drop(2).head should be("baz" -> "yaz")
}
}
}
"allow support creating custom consumers" in {
val streamBuilder = new KStreamBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
stream.to(stringSerde, stringSerde, outTopic)
runStreams(Seq(inTopic, outTopic), streamBuilder) {
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
val consumer = newConsumer[String, String]()
consumer.consumeLazily(outTopic).take(2) should be(
Seq("hello" -> "world", "foo" -> "bar"))
consumer.close()
}
}
"allow for easy string based testing" in {
val streamBuilder = new KStreamBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
stream.to(stringSerde, stringSerde, outTopic)
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder) {
consumer =>
publishToKafka(inTopic, "hello", "world")
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
}
}
}
}