Skip to content

Commit

Permalink
Improve connect cluster vault error handling (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv authored Dec 20, 2024
1 parent 95f399b commit b37432f
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 475 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.michelin.ns4kafka.controller.connect;

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterDeleteOperation;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotAllowed;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.CONNECT_CLUSTER;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;
Expand Down Expand Up @@ -246,7 +245,8 @@ public HttpResponse<List<ConnectCluster>> bulkDelete(String namespace,
public List<ConnectCluster> listVaults(final String namespace) {
return connectClusterService.findAllForNamespaceWithWritePermission(getNamespace(namespace))
.stream()
.filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key()))
.filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key())
&& StringUtils.hasText(connectCluster.getSpec().getAes256Salt()))
.toList();
}

Expand All @@ -264,12 +264,7 @@ public List<VaultResponse> vaultPassword(final String namespace,
@Body final List<String> passwords) {
final Namespace ns = getNamespace(namespace);

final var validationErrors = new ArrayList<String>();
if (!connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectCluster)) {
validationErrors.add(invalidConnectClusterNotAllowed(connectCluster));
}

validationErrors.addAll(connectClusterService.validateConnectClusterVault(ns, connectCluster));
List<String> validationErrors = connectClusterService.validateConnectClusterVault(ns, connectCluster);

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(CONNECT_CLUSTER, connectCluster, validationErrors);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/michelin/ns4kafka/service/AclService.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public boolean topicAclsCollide(AccessControlEntry topicAclA, AccessControlEntry
*/
public boolean isOwnerOfTopLevelAcl(AccessControlEntry accessControlEntry, Namespace namespace) {
// Grantor Namespace is OWNER of Resource + ResourcePattern ?
return findAllGrantedToNamespace(namespace).stream()
return findAllGrantedToNamespace(namespace)
.stream()
.filter(ace -> ace.getSpec().getResourceType() == accessControlEntry.getSpec().getResourceType()
&& ace.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
.anyMatch(ace -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterEncryptionConfig;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterMalformedUrl;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNameAlreadyExistGlobally;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNoEncryptionConfig;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotHealthy;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound;

Expand All @@ -17,7 +18,6 @@
import com.michelin.ns4kafka.service.client.connect.KafkaConnectClient;
import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo;
import com.michelin.ns4kafka.util.EncryptionUtils;
import com.michelin.ns4kafka.util.FormatErrorUtils;
import com.michelin.ns4kafka.util.RegexUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
Expand All @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -190,7 +189,8 @@ public Optional<ConnectCluster> findByNameWithOwnerPermission(Namespace namespac
public List<ConnectCluster> findAllForNamespaceWithWritePermission(Namespace namespace) {
return Stream.concat(
findByWildcardNameWithOwnerPermission(namespace, "*").stream(),
findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE)).stream()
findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE))
.stream()
.map(connectCluster -> ConnectCluster.builder()
.metadata(connectCluster.getMetadata())
.spec(ConnectCluster.ConnectClusterSpec.builder()
Expand Down Expand Up @@ -281,41 +281,28 @@ public Mono<List<String>> validateConnectClusterCreation(ConnectCluster connectC
}

/**
* Validate the given connect worker has configuration for vaults.
* Validate the given connect worker has configuration for vault.
*
* @param connectCluster The Kafka connect worker to validate
* @param namespace The namespace
* @param connectClusterName The Kafka connect worker to validate
* @return A list of validation errors
*/
public List<String> validateConnectClusterVault(final Namespace namespace, final String connectCluster) {
final var errors = new ArrayList<String>();

final List<ConnectCluster> kafkaConnects = findAllForNamespaceByPermissions(namespace,
List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE));
public List<String> validateConnectClusterVault(final Namespace namespace, final String connectClusterName) {
final List<String> errors = new ArrayList<>();

if (kafkaConnects.isEmpty()) {
errors.add(invalidNotFound(connectCluster));
return errors;
}
Optional<ConnectCluster> connectClusters = findAllForNamespaceWithWritePermission(namespace)
.stream()
.filter(connectCluster -> connectCluster.getMetadata().getName().equals(connectClusterName))
.findFirst();

if (kafkaConnects.stream().noneMatch(cc -> StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))) {
errors.add(invalidConnectClusterEncryptionConfig());
if (connectClusters.isEmpty()) {
errors.add(invalidNotFound(connectClusterName));
return errors;
}

final Optional<ConnectCluster> kafkaConnect = kafkaConnects.stream()
.filter(cc -> cc.getMetadata().getName().equals(connectCluster)
&& StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))
.findFirst();

if (kafkaConnect.isEmpty()) {
final String allowedConnectClusters = kafkaConnects.stream()
.filter(cc -> StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))
.map(cc -> cc.getMetadata().getName())
.collect(Collectors.joining(", "));
errors.add(FormatErrorUtils.invalidConnectClusterMustBeOneOf(connectCluster, allowedConnectClusters));
if (!StringUtils.hasText(connectClusters.get().getSpec().getAes256Key())
|| !StringUtils.hasText(connectClusters.get().getSpec().getAes256Salt())) {
errors.add(invalidConnectClusterNoEncryptionConfig());
return errors;
}

Expand Down Expand Up @@ -343,19 +330,6 @@ public boolean isNamespaceOwnerOfConnectCluster(Namespace namespace, String conn
AccessControlEntry.ResourceType.CONNECT_CLUSTER, connectCluster);
}

/**
* Is given namespace allowed (Owner or Writer) for the given connect worker.
*
* @param namespace The namespace
* @param connectCluster The Kafka connect cluster
* @return true if it is, false otherwise
*/
public boolean isNamespaceAllowedForConnectCluster(Namespace namespace, String connectCluster) {
return findAllForNamespaceWithWritePermission(namespace)
.stream()
.anyMatch(kafkaConnect -> kafkaConnect.getMetadata().getName().equals(connectCluster));
}

/**
* Vault a password for a specific namespace and a kafka connect cluster.
*
Expand Down
Loading

0 comments on commit b37432f

Please sign in to comment.