Skip to content

Commit

Permalink
fix: reading topics without enough requested records
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Jan 26, 2025
1 parent 780224a commit e3c7474
Show file tree
Hide file tree
Showing 3 changed files with 466 additions and 82 deletions.
12 changes: 12 additions & 0 deletions .run/go build ktea.go.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="go build ktea.go" type="GoApplicationRunConfiguration" factoryName="Go Application" nameIsGenerated="true">
<module name="ktea" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-tags prd" />
<kind value="FILE" />
<package value="ktea" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/ktea.go" />
<method v="2" />
</configuration>
</component>
258 changes: 176 additions & 82 deletions kadmin/record _reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ type ConsumerRecord struct {
Timestamp time.Time
}

type offsets struct {
oldest int64
// most recent used offset
newest int64
}

func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) ReadingStartedMsg {
ctx, cancelFunc := context.WithCancel(ctx)
startedMsg := ReadingStartedMsg{
Expand All @@ -65,94 +71,94 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) Rea
mu sync.Mutex
closeOnce sync.Once
wg sync.WaitGroup
offsets map[int]int64
offsets map[int]offsets
ok bool
partitions []int
)

partitions = ka.determineReadPartitions(rd)

if rd.StartPoint != Beginning {
offsets, ok = ka.fetchFirstAvailableOffsets(partitions, rd, startedMsg)
if !ok {
close(startedMsg.Err)
cancelFunc()
return startedMsg
}
offsets, ok = ka.fetchOffsets(partitions, rd, startedMsg)
if !ok {
close(startedMsg.Err)
cancelFunc()
return startedMsg
}

wg.Add(len(partitions))

for _, partition := range partitions {
go func(partition int) {
defer wg.Done()

startingOffset := ka.determineStartingOffset(partition, rd, offsets)
consumer, err := client.ConsumePartition(rd.Topic.Name, int32(partition), startingOffset)
if err != nil {
log.Error(err)
startedMsg.Err <- err
cancelFunc()
return
}
defer consumer.Close()

msgChan := consumer.Messages()

for {
select {
case err := <-consumer.Errors():
if offsets[partition].newest != offsets[partition].oldest {
go func(partition int) {
defer wg.Done()

readingOffsets := ka.determineReadingOffsets(rd, offsets[partition])
consumer, err := client.ConsumePartition(
rd.Topic.Name,
int32(partition),
readingOffsets.start,
)
if err != nil {
log.Error(err)
startedMsg.Err <- err
cancelFunc()
return
case <-ctx.Done():
return
case msg := <-msgChan:
var headers []Header
for _, h := range msg.Headers {
headers = append(headers, Header{
string(h.Key),
string(h.Value),
})
}

deserializer := serdes.NewAvroDeserializer(ka.sra)
var payload string
payload, err = deserializer.Deserialize(msg.Value)
if err != nil {
payload = err.Error()
}

consumerRecord := ConsumerRecord{
Key: string(msg.Key),
Value: payload,
Partition: int64(msg.Partition),
Offset: msg.Offset,
Headers: headers,
Timestamp: msg.Timestamp,
}

var shouldClose bool
}
defer consumer.Close()

mu.Lock()
msgCount++
if msgCount >= rd.Limit {
shouldClose = true
}
mu.Unlock()
msgChan := consumer.Messages()

for {
select {
case startedMsg.ConsumerRecord <- consumerRecord:
case <-ctx.Done():
case err := <-consumer.Errors():
startedMsg.Err <- err
return
}

if shouldClose {
cancelFunc() // Cancel the context to stop other goroutines
case <-ctx.Done():
return
case msg := <-msgChan:
var headers []Header
for _, h := range msg.Headers {
headers = append(headers, Header{
string(h.Key),
string(h.Value),
})
}

consumerRecord := ConsumerRecord{
Key: string(msg.Key),
Value: ka.deserialize(err, msg),
Partition: int64(msg.Partition),
Offset: msg.Offset,
Headers: headers,
Timestamp: msg.Timestamp,
}

var shouldClose bool

mu.Lock()
msgCount++
if msgCount >= rd.Limit {
shouldClose = true
}
mu.Unlock()

select {
case startedMsg.ConsumerRecord <- consumerRecord:
case <-ctx.Done():
return
}

if shouldClose {
cancelFunc() // Cancel the context to stop other goroutines
return
}
if msg.Offset == readingOffsets.end {
return
}
}
}
}
}(partition)
}(partition)
}
}

go func() {
Expand All @@ -165,6 +171,19 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) Rea
return startedMsg
}

func (ka *SaramaKafkaAdmin) deserialize(
err error,
msg *sarama.ConsumerMessage,
) string {
deserializer := serdes.NewAvroDeserializer(ka.sra)
var payload string
payload, err = deserializer.Deserialize(msg.Value)
if err != nil {
payload = err.Error()
}
return payload
}

func (ka *SaramaKafkaAdmin) determineReadPartitions(rd ReadDetails) []int {
var partitions []int
if len(rd.Partitions) == 0 {
Expand All @@ -178,22 +197,74 @@ func (ka *SaramaKafkaAdmin) determineReadPartitions(rd ReadDetails) []int {
return partitions
}

func (ka *SaramaKafkaAdmin) determineStartingOffset(partition int, rd ReadDetails, partByOffset map[int]int64) int64 {
var startingOffset int64
type readingOffsets struct {
start int64
end int64
}

func (ka *SaramaKafkaAdmin) determineReadingOffsets(
rd ReadDetails,
offsets offsets,
) readingOffsets {
var startOffset int64
var endOffset int64
numberOfRecordsPerPart := int64(float64(int64(rd.Limit)) / float64(rd.Topic.Partitions))
if rd.StartPoint == Beginning {
startingOffset = sarama.OffsetOldest
startOffset, endOffset = ka.determineOffsetsFromBeginning(
startOffset,
offsets,
numberOfRecordsPerPart,
endOffset,
)
} else {
latestOffset := partByOffset[partition]
startingOffset = latestOffset - int64(float64(int64(rd.Limit))/float64(len(partByOffset)))
if startingOffset < 0 {
startingOffset = 0
}
startOffset, endOffset = ka.determineMostRecentOffsets(
startOffset,
offsets,
numberOfRecordsPerPart,
endOffset,
)
}
return readingOffsets{
start: startOffset,
end: endOffset,
}
return startingOffset
}

func (ka *SaramaKafkaAdmin) fetchFirstAvailableOffsets(partitions []int, rd ReadDetails, rsm ReadingStartedMsg) (map[int]int64, bool) {
offsets := make(map[int]int64)
func (ka *SaramaKafkaAdmin) determineMostRecentOffsets(
startOffset int64,
offsets offsets,
numberOfRecordsPerPart int64,
endOffset int64,
) (int64, int64) {
startOffset = offsets.newest - numberOfRecordsPerPart
endOffset = offsets.newest
if startOffset < 0 {
startOffset = offsets.oldest
}
return startOffset, endOffset
}

func (ka *SaramaKafkaAdmin) determineOffsetsFromBeginning(
startOffset int64,
offsets offsets,
numberOfRecordsPerPart int64,
endOffset int64,
) (int64, int64) {
startOffset = offsets.oldest
if offsets.oldest+numberOfRecordsPerPart < offsets.newest {
endOffset = startOffset + numberOfRecordsPerPart - 1
} else {
endOffset = offsets.newest
}
return startOffset, endOffset
}

func (ka *SaramaKafkaAdmin) fetchOffsets(
partitions []int,
rd ReadDetails,
rsm ReadingStartedMsg,
) (map[int]offsets, bool) {
offsetsByPartition := make(map[int]offsets)
var wg sync.WaitGroup
var mu sync.Mutex
errorsChan := make(chan error, len(partitions))
Expand All @@ -203,14 +274,37 @@ func (ka *SaramaKafkaAdmin) fetchFirstAvailableOffsets(partitions []int, rd Read
go func(partition int) {
defer wg.Done()

offset, err := ka.client.GetOffset(rd.Topic.Name, int32(partition), sarama.OffsetNewest)
newestOffset, err := ka.client.GetOffset(
rd.Topic.Name,
int32(partition),
sarama.OffsetNewest,
)
if err != nil {
errorsChan <- err
return
}

oldestOffset, err := ka.client.GetOffset(
rd.Topic.Name,
int32(partition),
sarama.OffsetOldest,
)
if err != nil {
errorsChan <- err
return
}

mu.Lock()
offsets[partition] = offset
var augend int64
if oldestOffset == newestOffset {
augend = 0
} else {
augend = 1
}
offsetsByPartition[partition] = offsets{
oldestOffset,
newestOffset - augend,
}
mu.Unlock()
}(partition)
}
Expand All @@ -224,6 +318,6 @@ func (ka *SaramaKafkaAdmin) fetchFirstAvailableOffsets(partitions []int, rd Read
close(rsm.Err)
return nil, false
default:
return offsets, true
return offsetsByPartition, true
}
}
Loading

0 comments on commit e3c7474

Please sign in to comment.