Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Paste
P89953
audit-deployments-T420565
Active
Public
Actions
Authored by
fgiunchedi
on Mar 27 2026, 11:27 AM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Tags
None
Referenced Files
F73818092: audit-deployments-T420565
Mar 27 2026, 11:27 AM
2026-03-27 11:27:01 (UTC+0)
Subscribers
None
#!/usr/bin/env python3
"""
Audit all Kubernetes deployments across namespaces against known defaults.
Defaults are keyed by app.kubernetes.io/managed-by label value.
Resource values must be specified in the same format kubectl returns them.
"""
import
json
import
subprocess
import
sys
from
dataclasses
import
dataclass
,
field
from
typing
import
Optional
# ── Defaults ────────────────────────────────────────────────────────────────
# Values must be exact strings as kubectl returns them.
# Set a field to None to skip checking it.
DEFAULTS
:
dict
[
str
,
dict
]
=
{
"toolforge-jobs-framework"
:
{
"replicas"
:
1
,
"mem_request"
:
"512Mi"
,
"cpu_request"
:
"100m"
,
"mem_limit"
:
"256Mi"
,
"cpu_limit"
:
"500m"
,
},
"webservice"
:
{
"replicas"
:
1
,
"mem_request"
:
"256Mi"
,
"cpu_request"
:
"125m"
,
"mem_limit"
:
"512Mi"
,
"cpu_limit"
:
"500m"
,
},
}
# ── Data model ───────────────────────────────────────────────────────────────
@dataclass
class
DeploymentAudit
:
namespace
:
str
name
:
str
manager
:
Optional
[
str
]
replicas
:
Optional
[
int
]
cpu_request
:
Optional
[
str
]
cpu_limit
:
Optional
[
str
]
mem_request
:
Optional
[
str
]
mem_limit
:
Optional
[
str
]
# status values: "default" | "customized" | "not-set" | "no-containers"
# | "no-manager" | "unknown-manager"
status
:
str
resource_source
:
str
# "template" | "pods" | "-"
diff_fields
:
list
[
str
]
=
field
(
default_factory
=
list
)
# ── kubectl helpers ──────────────────────────────────────────────────────────
def
fetch_deployments
()
->
list
[
dict
]:
result
=
subprocess
.
run
(
[
"kubectl"
,
"get"
,
"deployments"
,
"--all-namespaces"
,
"-o"
,
"json"
],
capture_output
=
True
,
text
=
True
,
)
if
result
.
returncode
!=
0
:
print
(
f
"kubectl error:
\n
{
result
.
stderr
}
"
,
file
=
sys
.
stderr
)
sys
.
exit
(
1
)
return
json
.
loads
(
result
.
stdout
)[
"items"
]
def
fetch_pod_resources
(
namespace
:
str
,
match_labels
:
dict
)
->
Optional
[
dict
]:
"""Return resources dict from the first container of the first running pod
matching the given labels, or None if no pods are found or resources are unset."""
selector
=
","
.
join
(
f
"
{
k
}
=
{
v
}
"
for
k
,
v
in
match_labels
.
items
())
result
=
subprocess
.
run
(
[
"kubectl"
,
"get"
,
"pods"
,
"-n"
,
namespace
,
"-l"
,
selector
,
"-o"
,
"json"
],
capture_output
=
True
,
text
=
True
,
)
if
result
.
returncode
!=
0
:
print
(
f
" warning: kubectl get pods failed for
{
namespace
}
selector=
{
selector
}
:
\n
"
f
"
{
result
.
stderr
.
strip
()
}
"
,
file
=
sys
.
stderr
,
)
return
None
pods
=
json
.
loads
(
result
.
stdout
)
.
get
(
"items"
,
[])
if
not
pods
:
return
None
containers
=
pods
[
0
]
.
get
(
"spec"
,
{})
.
get
(
"containers"
,
[])
if
not
containers
:
return
None
resources
=
containers
[
0
]
.
get
(
"resources"
,
{})
if
not
resources
.
get
(
"requests"
)
and
not
resources
.
get
(
"limits"
):
return
None
return
resources
# ── Audit logic ──────────────────────────────────────────────────────────────
def
extract_fields
(
deployment
:
dict
)
->
dict
:
meta
=
deployment
[
"metadata"
]
spec
=
deployment
[
"spec"
]
labels
=
meta
.
get
(
"labels"
,
{})
manager
=
labels
.
get
(
"app.kubernetes.io/managed-by"
)
replicas
=
spec
.
get
(
"replicas"
,
1
)
# k8s defaults unset replicas to 1
containers
=
spec
.
get
(
"template"
,
{})
.
get
(
"spec"
,
{})
.
get
(
"containers"
,
[])
resources
=
containers
[
0
]
.
get
(
"resources"
,
{})
if
containers
else
{}
requests
=
resources
.
get
(
"requests"
,
{})
limits
=
resources
.
get
(
"limits"
,
{})
return
{
"namespace"
:
meta
[
"namespace"
],
"name"
:
meta
[
"name"
],
"manager"
:
manager
,
"replicas"
:
replicas
,
"cpu_request"
:
requests
.
get
(
"cpu"
),
"cpu_limit"
:
limits
.
get
(
"cpu"
),
"mem_request"
:
requests
.
get
(
"memory"
),
"mem_limit"
:
limits
.
get
(
"memory"
),
"_containers"
:
containers
,
"_match_labels"
:
spec
.
get
(
"selector"
,
{})
.
get
(
"matchLabels"
,
{}),
}
def
audit_deployment
(
deployment
:
dict
)
->
DeploymentAudit
:
f
=
extract_fields
(
deployment
)
containers
=
f
.
pop
(
"_containers"
)
match_labels
=
f
.
pop
(
"_match_labels"
)
if
not
containers
:
return
DeploymentAudit
(
**
f
,
status
=
"no-containers"
,
resource_source
=
"-"
)
resource_source
=
"template"
resources
=
containers
[
0
]
.
get
(
"resources"
,
{})
if
not
resources
.
get
(
"requests"
)
and
not
resources
.
get
(
"limits"
):
# fall back to inspecting live pods
resources
=
fetch_pod_resources
(
f
[
"namespace"
],
match_labels
)
or
{}
if
not
resources
.
get
(
"requests"
)
and
not
resources
.
get
(
"limits"
):
return
DeploymentAudit
(
**
f
,
status
=
"not-set"
,
resource_source
=
"-"
)
resource_source
=
"pods"
requests
=
resources
.
get
(
"requests"
,
{})
limits
=
resources
.
get
(
"limits"
,
{})
f
[
"cpu_request"
]
=
requests
.
get
(
"cpu"
)
f
[
"cpu_limit"
]
=
limits
.
get
(
"cpu"
)
f
[
"mem_request"
]
=
requests
.
get
(
"memory"
)
f
[
"mem_limit"
]
=
limits
.
get
(
"memory"
)
if
f
[
"manager"
]
is
None
:
return
DeploymentAudit
(
**
f
,
status
=
"no-manager"
,
resource_source
=
resource_source
)
defaults
=
DEFAULTS
.
get
(
f
[
"manager"
])
if
defaults
is
None
:
return
DeploymentAudit
(
**
f
,
status
=
"unknown-manager"
,
resource_source
=
resource_source
)
diff_fields
=
[]
for
key
,
expected
in
defaults
.
items
():
if
expected
is
None
:
continue
if
f
[
key
]
!=
expected
:
diff_fields
.
append
(
key
)
status
=
"default"
if
not
diff_fields
else
"customized"
return
DeploymentAudit
(
**
f
,
status
=
status
,
resource_source
=
resource_source
,
diff_fields
=
diff_fields
)
# ── Output ───────────────────────────────────────────────────────────────────
def
print_table
(
audits
:
list
[
DeploymentAudit
])
->
None
:
cols
=
{
"NAMESPACE"
:
max
(
len
(
"NAMESPACE"
),
max
(
len
(
a
.
namespace
)
for
a
in
audits
)),
"DEPLOYMENT"
:
max
(
len
(
"DEPLOYMENT"
),
max
(
len
(
a
.
name
)
for
a
in
audits
)),
"MANAGER"
:
max
(
len
(
"MANAGER"
),
max
(
len
(
a
.
manager
or
"-"
)
for
a
in
audits
)),
"REPLICAS"
:
len
(
"REPLICAS"
),
"CPU_REQ"
:
max
(
len
(
"CPU_REQ"
),
max
(
len
(
a
.
cpu_request
or
"-"
)
for
a
in
audits
)),
"CPU_LIM"
:
max
(
len
(
"CPU_LIM"
),
max
(
len
(
a
.
cpu_limit
or
"-"
)
for
a
in
audits
)),
"MEM_REQ"
:
max
(
len
(
"MEM_REQ"
),
max
(
len
(
a
.
mem_request
or
"-"
)
for
a
in
audits
)),
"MEM_LIM"
:
max
(
len
(
"MEM_LIM"
),
max
(
len
(
a
.
mem_limit
or
"-"
)
for
a
in
audits
)),
"SOURCE"
:
max
(
len
(
"SOURCE"
),
max
(
len
(
a
.
resource_source
)
for
a
in
audits
)),
"STATUS"
:
max
(
len
(
"STATUS"
),
max
(
len
(
a
.
status
)
for
a
in
audits
)),
"DIFF"
:
max
(
len
(
"DIFF"
),
max
(
len
(
","
.
join
(
a
.
diff_fields
))
for
a
in
audits
)),
}
def
row
(
ns
,
name
,
mgr
,
rep
,
creq
,
clim
,
mreq
,
mlim
,
src
,
st
,
diff
):
return
(
f
"
{
ns
:
<
{
cols
[
'NAMESPACE'
]
}}
"
f
"
{
name
:
<
{
cols
[
'DEPLOYMENT'
]
}}
"
f
"
{
mgr
:
<
{
cols
[
'MANAGER'
]
}}
"
f
"
{
rep
:
<
{
cols
[
'REPLICAS'
]
}}
"
f
"
{
creq
:
<
{
cols
[
'CPU_REQ'
]
}}
"
f
"
{
clim
:
<
{
cols
[
'CPU_LIM'
]
}}
"
f
"
{
mreq
:
<
{
cols
[
'MEM_REQ'
]
}}
"
f
"
{
mlim
:
<
{
cols
[
'MEM_LIM'
]
}}
"
f
"
{
src
:
<
{
cols
[
'SOURCE'
]
}}
"
f
"
{
st
:
<
{
cols
[
'STATUS'
]
}}
"
f
"
{
diff
}
"
)
print
(
row
(
"NAMESPACE"
,
"DEPLOYMENT"
,
"MANAGER"
,
"REPLICAS"
,
"CPU_REQ"
,
"CPU_LIM"
,
"MEM_REQ"
,
"MEM_LIM"
,
"SOURCE"
,
"STATUS"
,
"DIFF"
))
print
(
" "
.
join
(
"-"
*
w
for
w
in
cols
.
values
()))
sort_order
=
{
"no-containers"
:
0
,
"not-set"
:
1
,
"customized"
:
2
,
"unknown-manager"
:
3
,
"no-manager"
:
4
,
"default"
:
5
,
}
for
a
in
sorted
(
audits
,
key
=
lambda
x
:
(
sort_order
.
get
(
x
.
status
,
9
),
x
.
namespace
,
x
.
name
)):
print
(
row
(
a
.
namespace
,
a
.
name
,
a
.
manager
or
"-"
,
str
(
a
.
replicas
or
"-"
),
a
.
cpu_request
or
"-"
,
a
.
cpu_limit
or
"-"
,
a
.
mem_request
or
"-"
,
a
.
mem_limit
or
"-"
,
a
.
resource_source
,
a
.
status
,
","
.
join
(
a
.
diff_fields
)
or
"-"
,
))
def
print_summary
(
audits
:
list
[
DeploymentAudit
])
->
None
:
by_status
:
dict
[
str
,
int
]
=
{}
for
a
in
audits
:
by_status
[
a
.
status
]
=
by_status
.
get
(
a
.
status
,
0
)
+
1
print
(
f
"
\n
Total:
{
len
(
audits
)
}
| "
+
" | "
.
join
(
f
"
{
k
}
:
{
v
}
"
for
k
,
v
in
sorted
(
by_status
.
items
())))
# ── Entry point ──────────────────────────────────────────────────────────────
def
main
()
->
None
:
deployments
=
fetch_deployments
()
audits
=
[
audit_deployment
(
d
)
for
d
in
deployments
]
if
not
audits
:
print
(
"No deployments found."
)
return
print_table
(
audits
)
print_summary
(
audits
)
if
__name__
==
"__main__"
:
main
()
Event Timeline
fgiunchedi
created this paste.
Mar 27 2026, 11:27 AM
2026-03-27 11:27:01 (UTC+0)
fgiunchedi
mentioned this in
T420565: Audit tools memory requests vs actual usage
.
Mar 27 2026, 11:29 AM
2026-03-27 11:29:03 (UTC+0)
Log In to Comment