Skip to content

Commit

Permalink
chore: separate kadmin module into different files and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Jan 3, 2025
1 parent b9c568d commit 05f004f
Show file tree
Hide file tree
Showing 49 changed files with 1,119 additions and 1,026 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (c *Config) flush() {
fmt.Println("Unable to write config file")
os.Exit(-1)
}
log.Debug("flushed config")
}

func (c *Config) SwitchCluster(name string) *Cluster {
Expand Down
81 changes: 81 additions & 0 deletions kadmin/cgroup_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kadmin

import tea "github.com/charmbracelet/bubbletea"

type CGroupLister interface {
ListConsumerGroups() tea.Msg
}

type ConsumerGroup struct {
Name string
Members []GroupMember
}

type ConsumerGroupListingStartedMsg struct {
Err chan error
ConsumerGroups chan []*ConsumerGroup
}

func (msg *ConsumerGroupListingStartedMsg) AwaitCompletion() tea.Msg {
select {
case groups := <-msg.ConsumerGroups:
return ConsumerGroupsListedMsg{groups}
case err := <-msg.Err:
return ConsumerGroupListingErrorMsg{err}
}
}

type ConsumerGroupsListedMsg struct {
ConsumerGroups []*ConsumerGroup
}

type ConsumerGroupListingErrorMsg struct {
Err error
}

func (ka *SaramaKafkaAdmin) ListConsumerGroups() tea.Msg {
errChan := make(chan error)
groupsChan := make(chan []*ConsumerGroup)

go ka.doListConsumerGroups(groupsChan, errChan)

return ConsumerGroupListingStartedMsg{errChan, groupsChan}
}

func (ka *SaramaKafkaAdmin) doListConsumerGroups(groupsChan chan []*ConsumerGroup, errorChan chan error) {
maybeIntroduceLatency()
if listGroupResponse, err := ka.admin.ListConsumerGroups(); err != nil {
errorChan <- err
} else {
var consumerGroups []*ConsumerGroup
var groupNames []string
var groupByName = make(map[string]*ConsumerGroup)

for name, _ := range listGroupResponse {
consumerGroup := ConsumerGroup{Name: name}
consumerGroups = append(consumerGroups, &consumerGroup)
groupByName[name] = &consumerGroup
groupNames = append(groupNames, name)
}

describeConsumerGroupResponse, err := ka.admin.DescribeConsumerGroups(groupNames)
if err != nil {
errorChan <- err
return
}

for _, groupDescription := range describeConsumerGroupResponse {
group := groupByName[groupDescription.GroupId]
var groupMembers []GroupMember
for _, m := range groupDescription.Members {
member := GroupMember{}
member.MemberId = m.MemberId
member.ClientId = m.ClientId
member.ClientHost = m.ClientHost
groupMembers = append(groupMembers, member)
}
group.Members = groupMembers
}
groupsChan <- consumerGroups
}
}
73 changes: 73 additions & 0 deletions kadmin/cgroup_lister_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kadmin

import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestConsumerGroups(t *testing.T) {
t.Run("List groups", func(t *testing.T) {
topic := topicName()
// given
ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Properties: nil,
})

for i := 0; i < 10; i++ {
ka.PublishRecord(&ProducerRecord{
Key: "key",
Value: "value",
Topic: topic,
Partition: nil,
})
}

expectedGroups := make(map[string]bool)
for i := 0; i < 10; i++ {
groupName := fmt.Sprintf("test-group-%d", i)
expectedGroups[groupName] = false
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupName, ka.client)
if err != nil {
t.Fatal("Unable to create Consumer Group.", err)
}

handler := testConsumer{ExpectedMsgCount: 10}
consumerGroup.Consume(context.WithoutCancel(context.Background()), []string{topic}, &handler)

defer consumerGroup.Close()
}

msg := ka.ListConsumerGroups().(ConsumerGroupListingStartedMsg)

select {
case groups := <-msg.ConsumerGroups:
assert.Len(t, groups, 10, "Expected 10 consumer groups")

// Verify that all expected groups are present
for _, group := range groups {
if _, exists := expectedGroups[group.Name]; exists {
assert.NotEmpty(t, group.Members)
assert.NotEmpty(t, group.Members[0].MemberId)
assert.NotEmpty(t, group.Members[0].ClientId)
assert.NotEmpty(t, group.Members[0].ClientHost)
expectedGroups[group.Name] = true
}
}

// Check that all groups in `expectedGroups` were found
for groupName, found := range expectedGroups {
assert.True(t, found, "Consumer group '%s' was not found", groupName)
}
case err := <-msg.Err:
t.Fatal("Error while listing groups", err)
case <-time.After(5 * time.Second):
t.Fatal("Test timed out waiting for consumer groups")
}
})
}
61 changes: 61 additions & 0 deletions kadmin/config_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package kadmin

import (
"github.com/IBM/sarama"
tea "github.com/charmbracelet/bubbletea"
)

type TopicConfigLister interface {
ListConfigs(topic string) tea.Msg
}

type TopicConfigListingStartedMsg struct {
Err chan error
Configs chan map[string]string
}

type TopicConfigsListedMsg struct {
Configs map[string]string
}

type TopicConfigListingErrorMsg struct {
Err error
}

func (m *TopicConfigListingStartedMsg) AwaitCompletion() tea.Msg {
select {
case e := <-m.Err:
return TopicConfigListingErrorMsg{e}
case c := <-m.Configs:
return TopicConfigsListedMsg{c}
}
}

func (ka *SaramaKafkaAdmin) ListConfigs(topic string) tea.Msg {
errChan := make(chan error)
configsChan := make(chan map[string]string)

go ka.doListConfigs(topic, configsChan, errChan)

return TopicConfigListingStartedMsg{
errChan,
configsChan,
}
}

func (ka *SaramaKafkaAdmin) doListConfigs(topic string, configsChan chan map[string]string, errorChan chan error) {
maybeIntroduceLatency()
configsResp, err := ka.admin.DescribeConfig(sarama.ConfigResource{
Type: TOPIC_RESOURCE_TYPE,
Name: topic,
})
if err != nil {
errorChan <- err
return
}
configs := make(map[string]string)
for _, e := range configsResp {
configs[e.Name] = e.Value
}
configsChan <- configs
}
38 changes: 38 additions & 0 deletions kadmin/config_lister_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package kadmin

import (
kgo "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"testing"
)

func TestListTopicConfigs(t *testing.T) {
t.Run("List Topic Configs", func(t *testing.T) {
topic := topicName()
// given
createTopic(t, []kgo.TopicConfig{
{
Topic: topic,
NumPartitions: 2,
ReplicationFactor: 1,
},
})

//when
msg := ka.ListConfigs(topic).(TopicConfigListingStartedMsg)

// then
var configs map[string]string
select {
case c := <-msg.Configs:
configs = c
case e := <-msg.Err:
assert.Fail(t, "Failed to list configs", e)
return
}
assert.Equal(t, "delete", configs["cleanup.policy"])

// clean up
ka.DeleteTopic(topic)
})
}
32 changes: 32 additions & 0 deletions kadmin/config_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kadmin

import tea "github.com/charmbracelet/bubbletea"

type ConfigUpdater interface {
UpdateConfig(t TopicConfigToUpdate) tea.Msg
}

type TopicConfigUpdatedMsg struct{}

type TopicConfigToUpdate struct {
Topic string
Key string
Value string
}

type UpdateTopicConfigErrorMsg struct {
Reason string
}

func (ka *SaramaKafkaAdmin) UpdateConfig(t TopicConfigToUpdate) tea.Msg {
err := ka.admin.AlterConfig(
TOPIC_RESOURCE_TYPE,
t.Topic,
map[string]*string{t.Key: &t.Value},
false,
)
if err != nil {
return KAdminErrorMsg{err}
}
return TopicConfigUpdatedMsg{}
}
89 changes: 89 additions & 0 deletions kadmin/config_updater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kadmin

import (
kgo "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"testing"
)

func TestUpdateTopicConfig(t *testing.T) {
t.Run("Update Topic Config", func(t *testing.T) {
topic := topicName()
// given
createTopic(t, []kgo.TopicConfig{
{
Topic: topic,
NumPartitions: 2,
ReplicationFactor: 1,
},
})

// when
ka.UpdateConfig(TopicConfigToUpdate{
topic,
"delete.retention.ms",
"172800000"},
)

// then

// then
msg := ka.ListConfigs(topic).(TopicConfigListingStartedMsg)
var configs map[string]string
select {
case c := <-msg.Configs:
configs = c
case e := <-msg.Err:
assert.Fail(t, "Failed to list configs", e)
return
}
assert.Equal(t, "172800000", configs["delete.retention.ms"])

// clean up
ka.DeleteTopic(topic)
})

t.Run("Invalid value for update", func(t *testing.T) {
topic := topicName()
// given
createTopic(t, []kgo.TopicConfig{
{
Topic: topic,
NumPartitions: 2,
ReplicationFactor: 1,
},
})

// when
msg := ka.UpdateConfig(TopicConfigToUpdate{
topic,
"delete.retention.ms",
"-172800000"},
)

// then
assert.IsType(t, KAdminErrorMsg{}, msg)
assert.Equal(t, "Invalid value -172800000 for configuration delete.retention.ms: Value must be at least 0", msg.(KAdminErrorMsg).Error.Error())

// clean up
ka.DeleteTopic(topic)
})

//t.Run("Broker not available", func(t *testing.T) {
// topic := topicName()
// // when
// ska := NewSaramaKAdmin(ConnectionDetails{
// BootstrapServers: []string{"localpost:123"},
// SASLConfig: nil,
// })
// msg := ska.UpdateConfig(TopicConfigToUpdate{
// topic,
// "delete.retention.ms",
// "-172800000"},
// )
//
// // then
// assert.IsType(t, KAdminErrorMsg{}, msg)
// assert.Equal(t, "Broker Not Available: not a client facing error and is used mostly by tools when a broker is not alive", msg.(KAdminErrorMsg).Error.Error())
//})
}
Loading

0 comments on commit 05f004f

Please sign in to comment.