Page MenuHomePhabricator
Paste P89953

audit-deployments-T420565
ActivePublic

Authored by fgiunchedi on Mar 27 2026, 11:27 AM.
Tags
None
Referenced Files
F73818092: audit-deployments-T420565
Mar 27 2026, 11:27 AM
Subscribers
None
#!/usr/bin/env python3
"""
Audit all Kubernetes deployments across namespaces against known defaults.
Defaults are keyed by app.kubernetes.io/managed-by label value.
Resource values must be specified in the same format kubectl returns them.
"""
import json
import subprocess
import sys
from dataclasses import dataclass, field
from typing import Optional
# ── Defaults ────────────────────────────────────────────────────────────────
# Values must be exact strings as kubectl returns them.
# Set a field to None to skip checking it.
DEFAULTS: dict[str, dict] = {
"toolforge-jobs-framework": {
"replicas": 1,
"mem_request": "512Mi",
"cpu_request": "100m",
"mem_limit": "256Mi",
"cpu_limit": "500m",
},
"webservice": {
"replicas": 1,
"mem_request": "256Mi",
"cpu_request": "125m",
"mem_limit": "512Mi",
"cpu_limit": "500m",
},
}
# ── Data model ───────────────────────────────────────────────────────────────
@dataclass
class DeploymentAudit:
namespace: str
name: str
manager: Optional[str]
replicas: Optional[int]
cpu_request: Optional[str]
cpu_limit: Optional[str]
mem_request: Optional[str]
mem_limit: Optional[str]
# status values: "default" | "customized" | "not-set" | "no-containers"
# | "no-manager" | "unknown-manager"
status: str
resource_source: str # "template" | "pods" | "-"
diff_fields: list[str] = field(default_factory=list)
# ── kubectl helpers ──────────────────────────────────────────────────────────
def fetch_deployments() -> list[dict]:
result = subprocess.run(
["kubectl", "get", "deployments", "--all-namespaces", "-o", "json"],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f"kubectl error:\n{result.stderr}", file=sys.stderr)
sys.exit(1)
return json.loads(result.stdout)["items"]
def fetch_pod_resources(namespace: str, match_labels: dict) -> Optional[dict]:
"""Return resources dict from the first container of the first running pod
matching the given labels, or None if no pods are found or resources are unset."""
selector = ",".join(f"{k}={v}" for k, v in match_labels.items())
result = subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-l", selector, "-o", "json"],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(
f" warning: kubectl get pods failed for {namespace} selector={selector}:\n"
f" {result.stderr.strip()}",
file=sys.stderr,
)
return None
pods = json.loads(result.stdout).get("items", [])
if not pods:
return None
containers = pods[0].get("spec", {}).get("containers", [])
if not containers:
return None
resources = containers[0].get("resources", {})
if not resources.get("requests") and not resources.get("limits"):
return None
return resources
# ── Audit logic ──────────────────────────────────────────────────────────────
def extract_fields(deployment: dict) -> dict:
meta = deployment["metadata"]
spec = deployment["spec"]
labels = meta.get("labels", {})
manager = labels.get("app.kubernetes.io/managed-by")
replicas = spec.get("replicas", 1) # k8s defaults unset replicas to 1
containers = spec.get("template", {}).get("spec", {}).get("containers", [])
resources = containers[0].get("resources", {}) if containers else {}
requests = resources.get("requests", {})
limits = resources.get("limits", {})
return {
"namespace": meta["namespace"],
"name": meta["name"],
"manager": manager,
"replicas": replicas,
"cpu_request": requests.get("cpu"),
"cpu_limit": limits.get("cpu"),
"mem_request": requests.get("memory"),
"mem_limit": limits.get("memory"),
"_containers": containers,
"_match_labels": spec.get("selector", {}).get("matchLabels", {}),
}
def audit_deployment(deployment: dict) -> DeploymentAudit:
f = extract_fields(deployment)
containers = f.pop("_containers")
match_labels = f.pop("_match_labels")
if not containers:
return DeploymentAudit(**f, status="no-containers", resource_source="-")
resource_source = "template"
resources = containers[0].get("resources", {})
if not resources.get("requests") and not resources.get("limits"):
# fall back to inspecting live pods
resources = fetch_pod_resources(f["namespace"], match_labels) or {}
if not resources.get("requests") and not resources.get("limits"):
return DeploymentAudit(**f, status="not-set", resource_source="-")
resource_source = "pods"
requests = resources.get("requests", {})
limits = resources.get("limits", {})
f["cpu_request"] = requests.get("cpu")
f["cpu_limit"] = limits.get("cpu")
f["mem_request"] = requests.get("memory")
f["mem_limit"] = limits.get("memory")
if f["manager"] is None:
return DeploymentAudit(**f, status="no-manager", resource_source=resource_source)
defaults = DEFAULTS.get(f["manager"])
if defaults is None:
return DeploymentAudit(**f, status="unknown-manager", resource_source=resource_source)
diff_fields = []
for key, expected in defaults.items():
if expected is None:
continue
if f[key] != expected:
diff_fields.append(key)
status = "default" if not diff_fields else "customized"
return DeploymentAudit(**f, status=status, resource_source=resource_source, diff_fields=diff_fields)
# ── Output ───────────────────────────────────────────────────────────────────
def print_table(audits: list[DeploymentAudit]) -> None:
cols = {
"NAMESPACE": max(len("NAMESPACE"), max(len(a.namespace) for a in audits)),
"DEPLOYMENT": max(len("DEPLOYMENT"), max(len(a.name) for a in audits)),
"MANAGER": max(len("MANAGER"), max(len(a.manager or "-") for a in audits)),
"REPLICAS": len("REPLICAS"),
"CPU_REQ": max(len("CPU_REQ"), max(len(a.cpu_request or "-") for a in audits)),
"CPU_LIM": max(len("CPU_LIM"), max(len(a.cpu_limit or "-") for a in audits)),
"MEM_REQ": max(len("MEM_REQ"), max(len(a.mem_request or "-") for a in audits)),
"MEM_LIM": max(len("MEM_LIM"), max(len(a.mem_limit or "-") for a in audits)),
"SOURCE": max(len("SOURCE"), max(len(a.resource_source) for a in audits)),
"STATUS": max(len("STATUS"), max(len(a.status) for a in audits)),
"DIFF": max(len("DIFF"), max(len(",".join(a.diff_fields)) for a in audits)),
}
def row(ns, name, mgr, rep, creq, clim, mreq, mlim, src, st, diff):
return (
f"{ns:<{cols['NAMESPACE']}} "
f"{name:<{cols['DEPLOYMENT']}} "
f"{mgr:<{cols['MANAGER']}} "
f"{rep:<{cols['REPLICAS']}} "
f"{creq:<{cols['CPU_REQ']}} "
f"{clim:<{cols['CPU_LIM']}} "
f"{mreq:<{cols['MEM_REQ']}} "
f"{mlim:<{cols['MEM_LIM']}} "
f"{src:<{cols['SOURCE']}} "
f"{st:<{cols['STATUS']}} "
f"{diff}"
)
print(row("NAMESPACE", "DEPLOYMENT", "MANAGER", "REPLICAS",
"CPU_REQ", "CPU_LIM", "MEM_REQ", "MEM_LIM", "SOURCE", "STATUS", "DIFF"))
print(" ".join("-" * w for w in cols.values()))
sort_order = {
"no-containers": 0,
"not-set": 1,
"customized": 2,
"unknown-manager": 3,
"no-manager": 4,
"default": 5,
}
for a in sorted(audits, key=lambda x: (sort_order.get(x.status, 9), x.namespace, x.name)):
print(row(
a.namespace, a.name, a.manager or "-", str(a.replicas or "-"),
a.cpu_request or "-", a.cpu_limit or "-",
a.mem_request or "-", a.mem_limit or "-",
a.resource_source, a.status, ",".join(a.diff_fields) or "-",
))
def print_summary(audits: list[DeploymentAudit]) -> None:
by_status: dict[str, int] = {}
for a in audits:
by_status[a.status] = by_status.get(a.status, 0) + 1
print(f"\nTotal: {len(audits)} | " + " | ".join(f"{k}: {v}" for k, v in sorted(by_status.items())))
# ── Entry point ──────────────────────────────────────────────────────────────
def main() -> None:
deployments = fetch_deployments()
audits = [audit_deployment(d) for d in deployments]
if not audits:
print("No deployments found.")
return
print_table(audits)
print_summary(audits)
if __name__ == "__main__":
main()