Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] described consumer groups logs #1327

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ce19710
described consumer groups logs
dongnuo123 Dec 23, 2024
870be5e
list lags logs
dongnuo123 Dec 23, 2024
7d7c3ac
spotless apply
dongnuo123 Dec 23, 2024
075ef08
disable the new coordinator
dongnuo123 Dec 24, 2024
4cb68c5
enable the new coordinator
dongnuo123 Dec 24, 2024
0998fb9
fix ConsumerGroup.State enum
dongnuo123 Dec 24, 2024
32bbfc5
Revert "fix ConsumerGroup.State enum"
dongnuo123 Dec 24, 2024
57811a1
throw exception to log description
dongnuo123 Dec 24, 2024
3e37edb
throw exception to log description
dongnuo123 Dec 24, 2024
5b8f1fb
throw exception to log description
dongnuo123 Dec 24, 2024
294c5ba
throw exception to log description
dongnuo123 Dec 24, 2024
39a2e26
throw exception to log description
dongnuo123 Dec 24, 2024
7d1bb98
throw exception to log description
dongnuo123 Dec 24, 2024
71a7bbb
throw exception to log description
dongnuo123 Dec 24, 2024
c3b3ad4
throw exception to log description
dongnuo123 Dec 24, 2024
a0f7328
throw exception to log description
dongnuo123 Dec 24, 2024
a81b5fd
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
5f3185e
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
9f7d5ea
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
f1b6289
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
6503e9e
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
b0ac41f
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
e25176d
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
649dcca
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
b1aa40c
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
b64af7c
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
b89d379
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
3c6e2af
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
8a1f9c5
more logs to state/assignor translation
dongnuo123 Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ log4j.appender.file.maxFileSize=100MB
log4j.appender.file.File=${kafka-rest.log.dir}/kafka-rest.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.io.confluent.kafkarest.controllers=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.entities.ConsumerGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,12 +30,16 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConsumerGroupManagerImpl implements ConsumerGroupManager {

private final Admin adminClient;
private final ClusterManager clusterManager;

private static final Logger log = LoggerFactory.getLogger(ConsumerGroupManagerImpl.class);

@Inject
ConsumerGroupManagerImpl(Admin adminClient, ClusterManager clusterManager) {
this.adminClient = requireNonNull(adminClient);
Expand Down Expand Up @@ -74,18 +79,65 @@ private CompletableFuture<List<ConsumerGroup>> getConsumerGroups(
return KafkaFutures.toCompletableFuture(
adminClient.describeConsumerGroups(consumerGroupIds).all())
.thenApply(
descriptions ->
descriptions.values().stream()
.filter(
// When describing a consumer-group that does not exist, AdminClient returns
// a dummy consumer-group with simple=true and state=DEAD.
// TODO: Investigate a better way of detecting non-existent consumer-group.
description ->
!description.isSimpleConsumerGroup()
|| description.state() != ConsumerGroupState.DEAD)
.map(
description ->
ConsumerGroup.fromConsumerGroupDescription(clusterId, description))
.collect(Collectors.toList()));
descriptions -> {
List<ConsumerGroupState> states =
descriptions.values().stream()
.map(description -> description.state())
.collect(Collectors.toList());
List<String> assignors =
descriptions.values().stream()
.map(description -> description.partitionAssignor())
.collect(Collectors.toList());
for (ConsumerGroupState state : states) {
if (state == ConsumerGroupState.UNKNOWN) {
throw new IllegalStateException("before getConsumerGroups - States: " + states);
}
}
for (String assignor : assignors) {
if (assignor == null || assignor.equals("")) {
throw new IllegalStateException(
"before getConsumerGroups - Assignors: " + assignors);
}
}

List<ConsumerGroup> consumerGroups =
descriptions.values().stream()
.filter(
// When describing a consumer-group that does not exist, AdminClient
// returns
// a dummy consumer-group with simple=true and state=DEAD.
// TODO: Investigate a better way of detecting non-existent
// consumer-group.
description ->
!description.isSimpleConsumerGroup()
|| description.state() != ConsumerGroupState.DEAD)
.map(
description ->
ConsumerGroup.fromConsumerGroupDescription(clusterId, description))
.collect(Collectors.toList());

List<ConsumerGroup.State> statesAfter = new ArrayList<ConsumerGroup.State>();
List<String> assignorsAfter = new ArrayList<String>();
for (ConsumerGroup group : consumerGroups) {
statesAfter.add(group.getState());
assignorsAfter.add(group.getPartitionAssignor());
}

if (statesAfter.contains(ConsumerGroup.State.UNKNOWN)
|| assignorsAfter.contains("")) {
throw new IllegalStateException(
"after getConsumerGroups - States: "
+ statesAfter
+ ", Assignors: "
+ assignorsAfter);
}

// log.warn(
// "after getConsumerGroups - States: "
// + statesAfter
// + ", Assignors: "
// + assignorsAfter);
return consumerGroups;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ final class ConsumerLagManagerImpl extends AbstractConsumerLagManager
@Override
public CompletableFuture<List<ConsumerLag>> listConsumerLags(
String clusterId, String consumerGroupId) {
log.warn("listing ConsumerLags: clusterId={}, consumerGroupId={}", clusterId, consumerGroupId);
return consumerGroupManager
.getConsumerGroup(clusterId, consumerGroupId)
.thenApply(
consumerGroup ->
checkEntityExists(
consumerGroup, "Consumer Group %s could not be found.", consumerGroupId))
consumerGroup -> {
log.warn("described consumerGroup: {}", consumerGroup);
return checkEntityExists(
consumerGroup, "Consumer Group %s could not be found.", consumerGroupId);
})
.thenCompose(
consumerGroup ->
getCurrentOffsets(consumerGroupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,53 @@ public static Builder builder() {

public static ConsumerGroup fromConsumerGroupDescription(
String clusterId, ConsumerGroupDescription description) {
return builder()
.setClusterId(clusterId)
.setConsumerGroupId(description.groupId())
.setSimple(description.isSimpleConsumerGroup())
// I have only seen partitionAssignor="" in all my tests, no matter what I do.
// TODO: Investigate how to actually get partition assignor.
.setPartitionAssignor(description.partitionAssignor())
// I have only been able to see state=PREPARING_REBALANCE on all my tests.
// TODO: Investigate how to get actual state of consumer group.
.setState(State.fromConsumerGroupState(description.state()))
.setCoordinator(Broker.fromNode(clusterId, description.coordinator()))
.setConsumers(
description.members().stream()
.map(
consumer ->
Consumer.fromMemberDescription(clusterId, description.groupId(), consumer))
.collect(Collectors.toList()))
.build();

// if (description.state() == ConsumerGroupState.UNKNOWN) {
// throw new IllegalStateException(
// "before fromConsumerGroupDescription - State: " + description.state());
// }
//
// if (description.partitionAssignor() == null || description.partitionAssignor().equals(""))
// {
// throw new IllegalStateException(
// "before fromConsumerGroupDescription - Assignor: " +
// description.partitionAssignor());
// }

ConsumerGroup consumerGroup =
builder()
.setClusterId(clusterId)
.setConsumerGroupId(description.groupId())
.setSimple(description.isSimpleConsumerGroup())
// I have only seen partitionAssignor="" in all my tests, no matter what I do.
// TODO: Investigate how to actually get partition assignor.
.setPartitionAssignor(description.partitionAssignor())
// I have only been able to see state=PREPARING_REBALANCE on all my tests.
// TODO: Investigate how to get actual state of consumer group.
.setState(State.fromConsumerGroupState(description.state()))
.setCoordinator(Broker.fromNode(clusterId, description.coordinator()))
.setConsumers(
description.members().stream()
.map(
consumer ->
Consumer.fromMemberDescription(
clusterId, description.groupId(), consumer))
.collect(Collectors.toList()))
.build();

// if (consumerGroup.getState().name().equals(ConsumerGroupState.UNKNOWN.name())) {
// throw new IllegalStateException(
// "after fromConsumerGroupDescription - State: " + consumerGroup.getState());
// }
//
// if (consumerGroup.getPartitionAssignor() == null
// || consumerGroup.getPartitionAssignor().equals("")) {
// throw new IllegalStateException(
// "after fromConsumerGroupDescription - Assignor: " +
// consumerGroup.getPartitionAssignor());
// }

return consumerGroup;
}

@AutoValue.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/v3/clusters/{clusterId}/consumer-groups")
@ResourceName("api.v3.consumer-groups.*")
Expand All @@ -52,6 +54,8 @@ public final class ConsumerGroupsResource {
private final CrnFactory crnFactory;
private final UrlFactory urlFactory;

private static final Logger log = LoggerFactory.getLogger(ConsumerGroupsResource.class);

@Inject
public ConsumerGroupsResource(
Provider<ConsumerGroupManager> consumerGroupManager,
Expand Down Expand Up @@ -106,7 +110,19 @@ public void getConsumerGroup(
consumerGroupManager
.get()
.getConsumerGroup(clusterId, consumerGroupId)
.thenApply(consumerGroup -> consumerGroup.orElseThrow(NotFoundException::new))
.thenApply(
consumerGroup -> {
if (consumerGroup.isPresent()) {
throw new IllegalStateException(
// log.warn(
"ConsumerGroupsResource - state: "
+ consumerGroup.get().getState()
+ ", "
+ "assignor: "
+ consumerGroup.get().getPartitionAssignor());
}
return consumerGroup.orElseThrow(NotFoundException::new);
})
.thenApply(
consumerGroup ->
GetConsumerGroupResponse.create(toConsumerGroupData(clusterId, consumerGroup)));
Expand Down
4 changes: 3 additions & 1 deletion kafka-rest/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.io.confluent.kafkarest.controllers=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.easymock.EasyMockExtension;
import org.easymock.Mock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -306,6 +307,7 @@ public void setUp() {
}

@Test
@Disabled
public void listConsumerGroups_returnsConsumerGroups() throws Exception {
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(adminClient.listConsumerGroups()).andReturn(listConsumerGroupsResult);
Expand Down Expand Up @@ -370,6 +372,7 @@ public void listConsumerGroups_returnsConsumerGroups() throws Exception {
}

@Test
@Disabled
public void listConsumerGroups_nonExistentCluster_throwsNotFound() throws Exception {
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager);
Expand All @@ -383,6 +386,7 @@ public void listConsumerGroups_nonExistentCluster_throwsNotFound() throws Except
}

@Test
@Disabled
public void getConsumerGroup_returnsConsumerGroup() throws Exception {
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
Expand Down Expand Up @@ -437,6 +441,7 @@ public void getConsumerGroup_returnsConsumerGroup() throws Exception {
}

@Test
@Disabled
public void getConsumerGroup_nonExistingCluster_throwsNotFound() throws Exception {
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager);
Expand All @@ -452,6 +457,7 @@ public void getConsumerGroup_nonExistingCluster_throwsNotFound() throws Exceptio
}

@Test
@Disabled
public void getConsumerGroup_nonExistingConsumerGroup_returnsEmpty() throws Exception {
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,16 @@ protected Properties getBrokerProperties(int i) {
props.setProperty("process.roles", "broker");
props.setProperty("auto.create.topics.enable", "false");
props.setProperty("message.max.bytes", String.valueOf(MAX_MESSAGE_SIZE));
props.setProperty("group.coordinator.new.enable", "true");

// Configure logging to stdout
props.setProperty("log4j.rootLogger", "INFO, stdout");
props.setProperty("log4j.appender.stdout", "org.apache.log4j.ConsoleAppender");
props.setProperty("log4j.appender.stdout.layout", "org.apache.log4j.PatternLayout");
props.setProperty(
"log4j.appender.stdout.layout.ConversionPattern", "%d{ISO8601} [%t] %-5p %c %x - %m%n");
props.setProperty(
"log4j.logger.org.apache.kafka.coordinator.group.GroupMetadataManager", "DEBUG");
return props;
}

Expand Down
Loading