Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tool: validator script for azure NPM to cilium migration #3359

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions tools/azure-npm-to-cilium-validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# TODO: block comment describing this script/usage

import os
from kubernetes import client, config

# Load Kubernetes configuration
config.load_kube_config()

# Initialize Kubernetes API clients
v1 = client.CoreV1Api()
networking_v1 = client.NetworkingV1Api()

# Get namespaces
namespaces = [ns.metadata.name for ns in v1.list_namespace().items]

# Store policies and services in dictionaries
policies = {}
# FIXME: find services with externaltrafficpolicy=Cluster instead of all services. Modify the variable name and code that touch it
services = {}

# Iterate over namespaces and store policies/services
for ns in namespaces:
print(f"Writing policies and services for namespace {ns}...")
policies[ns] = networking_v1.list_namespaced_network_policy(ns).to_dict()
services[ns] = v1.list_namespaced_service(ns).to_dict()

print("Policies and services have been stored in memory.")

# NetworkPolicy validation for Cilium differences
print("In Cilium, some kinds of NetworkPolicy behave differently. Reviewing NetworkPolicy configuration...")

def check_endport_policies():
for ns in namespaces:
endport_policies = [policy for policy in policies[ns]['items'] if any(egress.get('ports', [{}])[0].get('endPort') for egress in policy['spec'].get('egress', []))]
if endport_policies:
print(f"❌ Found NetworkPolicies with endPort field in namespace {ns}:")
for policy in endport_policies:
print(f"{ns}/{policy['metadata']['name']}")
else:
print(f"✅ No NetworkPolicies with endPort field found in namespace {ns}.")

def check_cidr_policies():
for ns in namespaces:
cidr_policies = [policy for policy in policies[ns]['items'] if any(egress.get('to', [{}])[0].get('ipBlock') for egress in policy['spec'].get('egress', []))]
if cidr_policies:
print(f"❌ Found NetworkPolicies with CIDRs in namespace {ns}:")
for policy in cidr_policies:
print(f"{ns}/{policy['metadata']['name']}")
else:
print(f"✅ No NetworkPolicies with CIDRs found in namespace {ns}.")

check_endport_policies()
check_cidr_policies()

# Service validation for Cilium differences
print("In Cilium, NetworkPolicy behaves differently for some kinds of Service. Reviewing Service configuration...")

SERVICES_AT_RISK = []
NO_SELECTOR_SERVICES = []
SAFE_SERVICES = []

# is the policy guaranteed to target any pod that could be selected by the service selector?
# TODO: eval if this logic is correct/complete
def match_selector(service_selector, policy_selector):
service_labels = service_selector.get('matchLabels', {})
policy_labels = policy_selector.get('matchLabels', {})

if all(item in service_labels.items() for item in policy_labels.items()):
return True

service_expressions = service_selector.get('matchExpressions', [])
policy_expressions = policy_selector.get('matchExpressions', [])

for expr in service_expressions:
key = expr['key']
operator = expr['operator']
values = expr.get('values', [])

matching_expr = next((e for e in policy_expressions if e['key'] == key), None)
if not matching_expr:
return False

matching_operator = matching_expr['operator']
matching_values = matching_expr.get('values', [])

if operator == "In" and not any(value in matching_values for value in values):
return False
if operator == "NotIn" and any(value in matching_values for value in values):
return False

return True

def check_service_risk(service_selector, namespace, service_ports):
policies_ns = policies[namespace]['items']

for policy in policies_ns:
if any(not ingress.get('from') and not ingress.get('ports') for ingress in policy['spec'].get('ingress', [])):
if match_selector(service_selector, policy['spec'].get('podSelector', {})):
SAFE_SERVICES.append(f"{namespace}/{service}")
return

for policy in policies_ns:
if any(not ingress.get('from') and ingress.get('ports') for ingress in policy['spec'].get('ingress', [])):
if match_selector(service_selector, policy['spec'].get('podSelector', {})):
matching_ports = [f"{port['port']}/{port['protocol']}" for ingress in policy['spec']['ingress'] for port in ingress.get('ports', [])]
if any(svc_port in matching_ports for svc_port in service_ports):
SAFE_SERVICES.append(f"{namespace}/{service}")
return

for ns in namespaces:
if not any(ingress.get('from') for policy in policies[ns]['items'] for ingress in policy['spec'].get('ingress', [])):
print(f"Skipping namespace {ns} as it has no ingress NetworkPolicy rules.")
continue

services_ns = services[ns]['items']
print(f"Checking NetworkPolicy targeting services with externalTrafficPolicy=Cluster in namespace {ns}...")

for service in services_ns:
if service['spec']['type'] in ["LoadBalancer", "NodePort"]:
externalTrafficPolicy = service['spec'].get('externalTrafficPolicy')
service_ports = [f"{port['port']}/{port['protocol']}" for port in service['spec'].get('ports', [])]
if externalTrafficPolicy != "Local":
SERVICES_AT_RISK.append(f"{ns}/{service['metadata']['name']}")
selector = service['spec'].get('selector')
if not selector:
NO_SELECTOR_SERVICES.append(f"{ns}/{service['metadata']['name']}")
else:
check_service_risk(selector, ns, service_ports)

unsafe_services = list(set(SERVICES_AT_RISK) - set(SAFE_SERVICES) - set(NO_SELECTOR_SERVICES))

if not SERVICES_AT_RISK:
print("\033[32m✔ No issues with service ingress.\033[0m")
else:
if NO_SELECTOR_SERVICES:
print("\nFound services without selectors which could be impacted by migration. Manual investigation is required to evaluate if ingress is allowed to the service's backend Pods. Please evaluate if these services would be impacted:")
for service in NO_SELECTOR_SERVICES:
print(service)
if unsafe_services:
print("\nFound services with selectors which could be impacted by migration. Manual investigation is required to evaluate if ingress is allowed to the service's backend Pods. Please evaluate if these services would be impacted:")
for service in unsafe_services:
print(service)

if NO_SELECTOR_SERVICES or len(SAFE_SERVICES) < len(SERVICES_AT_RISK) or any("❌" in check_endport_policies()) or any("❌" in check_cidr_policies()):
print("\033[31m✘ Review above issues before migration.\033[0m")
else:
print("\033[32m✔ Safe to migrate this cluster.\033[0m")

print("Warning: This script should be rerun if services or network policies are created, deleted, or edited.")
Loading