Page MenuHomePhabricator
Paste P15954

docker_pulltime.py
ActivePublic

Authored by JMeybohm on May 12 2021, 9:59 PM.
#!/usr/bin/env python3
import argparse
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field, fields
from typing import BinaryIO, TextIO, Dict, Any, List, Tuple, Optional
from pathlib import Path
import pandas as pd
import numpy as np
import plotly.express as px
def sizeof_fmt(num, suffix="B"):
# The glory one from:
# https://stackoverflow.com/questions/1094841/get-human-readable-version-of-file-size
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Yi", suffix)
@dataclass
class Layer:
id: str
size_bytes: int = 0
# Pull is the overall time for this layer: Download + Verify + Extract
# Note, that extrace might start very late (due to it being sequential)
# so pull_end might be later than you'd expect
pull_start: datetime = None
pull_end: datetime = None
download_start: datetime = None
download_end: datetime = None
verify_start: datetime = None
verify_end: datetime = None
extract_start: datetime = None
extract_end: datetime = None
exists: bool = False
@classmethod
def existing(cls, id: str, t: datetime):
return cls(
id=id,
pull_start=t,
pull_end=t,
download_start=t,
download_end=t,
verify_start=t,
verify_end=t,
extract_start=t,
extract_end=t,
exists=True,
)
def pull_time(self) -> timedelta:
return self.pull_end - self.pull_start
def download_time(self) -> timedelta:
return self.download_end - self.download_start
def verify_time(self) -> timedelta:
return self.verify_end - self.verify_start
def extract_time(self) -> timedelta:
return self.extract_end - self.extract_start
def process_time(self) -> timedelta:
# Time it took to process this layer (Download + Verify + Extract)
# See above why this is different to pull time
return self.download_time() + self.verify_time() + self.extract_time()
def size_str(self) -> str:
return sizeof_fmt(self.size_bytes)
def validate(self):
for field in fields(self):
if field.name.endswith("_start") or field.name.endswith("_end"):
if getattr(self, field.name) is None:
raise ValueError(f"{self.id}: {field.name} should not be None")
@dataclass
class PullProcess:
image: str = ""
tag: str = ""
pull_start: datetime = None
pull_end: datetime = None
layers: Dict[str, Layer] = field(default_factory=dict)
def _total_time(self, attr: str) -> timedelta:
total = timedelta()
for layer in self.layers.values():
total += getattr(layer, f"{attr}_time")()
return total
def pulled_layers(self) -> int:
return len([l for l in self.layers.values() if not l.exists])
def pull_time(self) -> timedelta:
return self.pull_end - self.pull_start
def download_time(self) -> timedelta:
return self._total_time("download")
def verify_time(self) -> timedelta:
return self._total_time("verify")
def extract_time(self) -> timedelta:
return self._total_time("extract")
def size_str(self) -> str:
size = 0
for layer in self.layers.values():
size += layer.size_bytes
return sizeof_fmt(size)
def parse_json(fileobj: TextIO) -> PullProcess:
pull = PullProcess()
for line in fileobj:
j = json.loads(line)
status = j["status"]
time = datetime.fromtimestamp(j["time"])
if status.startswith("Pulling from "):
# First line
pull.pull_start = time
pull.image = status.removeprefix("Pulling from ")
pull.tag = j["id"]
continue
if status.startswith("Status: Downloaded "):
# Last line
pull.pull_end = time
continue
if not "id" in j:
# Just the last status lines, nothing important
continue
layer_id = j["id"]
if layer_id not in pull.layers:
if status == "Pulling fs layer":
layer = Layer(id=layer_id, pull_start=time)
layer.download_start = time
elif status == "Already exists":
layer = Layer.existing(id=layer_id, t=time)
pull.layers[layer_id] = layer
else:
layer = pull.layers[layer_id]
if status == "Downloading":
current = j["progressDetail"].get("current", None)
if current is None:
# This is a retry case, we will see another download line for this layer later
continue
total = j["progressDetail"].get("total", None)
if layer.size_bytes == 0 and total is not None:
layer.size_bytes = total
if current == total:
layer.download_end = time
# There might not always be a "Verifying Checksum" status line
# so store this early and update if the line comes in
layer.verify_start = time
elif status == "Verifying Checksum":
if layer.download_end is None:
# There might not always be a status line for the completed download
# If there was none, assume download has ended now
layer.download_end = time
layer.verify_start = time
elif status == "Download complete":
layer.verify_end = time
# There might not always be a status line for the completed download
# Also there must not be a Verify Checksum line
if layer.download_end is None:
layer.download_end = time
if layer.verify_start is None:
layer.verify_start = time
elif status == "Extracting":
current = j["progressDetail"]["current"]
total = j["progressDetail"].get("total", None)
if layer.extract_start is None:
layer.extract_start = time
if current == total:
layer.extract_end = time
if layer.verify_end is None:
# This makes verify_end very inconsistent
layer.verify_end = time
elif status == "Pull complete":
layer.pull_end = time
# Sometimes, there is no total key in Download and Extracting lines
# so it's unclear when extact was finished. But it has definitely
# finished now.
if layer.extract_end is None:
layer.extract_end = time
for layer in pull.layers.values():
try:
layer.validate()
except ValueError as e:
print(fileobj.name)
raise e
return pull
def avg_times(pulls: List[PullProcess]) -> tuple:
dl = []
ex = []
for pull in pulls:
dl.append(pull.download_time().total_seconds())
ex.append(pull.extract_time().total_seconds())
avg_dl_time = timedelta(seconds=np.mean(dl))
avg_ex_time = timedelta(seconds=np.mean(ex))
std_dl_time = timedelta(seconds=np.std(dl))
return (avg_dl_time, avg_ex_time, std_dl_time)
def cmd_files(args: argparse.Namespace):
pulls = []
for f in args.input:
split = Path(f.name).with_suffix("").name.split("_")
hostname = f"{split[0]} ({split[-1]})"
pulls.append((hostname, parse_json(f)))
if args.average:
avg_download_time, avg_extract_time, std_dl_time = avg_times(
[p[1] for p in pulls]
)
print(f"Avg. download: {avg_download_time}")
print(f"Std. dev. download: {std_dl_time}")
print(f"Avg. extract: {avg_extract_time}")
else:
for hostname, pull in pulls:
print(hostname)
if args.verbose:
print(
f"Pulled {pull.pulled_layers()}/{len(pull.layers)} layers ({pull.size_str()}) of {pull.image}:{pull.tag} in {str(pull.pull_time())}"
)
for layer in pull.layers.values():
try:
if layer.exists:
print(f"{layer.id}: Already exists")
else:
print(
f"{layer.id}: Size: {layer.size_str()} Download: {str(layer.download_time())} Extract: {str(layer.extract_time())} Verify: {str(layer.verify_time())} Total: {str(layer.process_time())}"
)
except (KeyError, TypeError) as e:
print(layer)
raise e
print(f"Download: {pull.download_time()}")
print(f"Extract: {pull.extract_time()}")
print(f"Verify: {pull.verify_time()}")
def parse_testset(directory: Path) -> Tuple[Dict]:
results_by_node = {}
results_by_parallelism = {}
for p in directory.glob("**/*.json"):
node, parallelism, _, _ = p.with_suffix("").name.split("_")
parallelism = int(parallelism.lstrip("p"))
try:
pull = parse_json(p.open())
except KeyError as e:
print(p)
raise e
if node not in results_by_node:
results_by_node[node] = {}
if parallelism not in results_by_node[node]:
results_by_node[node][parallelism] = {"pulls": []}
results_by_node[node][parallelism]["pulls"].append(pull)
if len(results_by_node[node][parallelism]["pulls"]) == 3:
avg_dl, avg_ex, std_dl = avg_times(
results_by_node[node][parallelism]["pulls"]
)
results_by_node[node][parallelism]["avg_dl_time"] = avg_dl
results_by_node[node][parallelism]["avg_ex_time"] = avg_ex
results_by_node[node][parallelism]["std_dl_time"] = std_dl
if parallelism not in results_by_parallelism:
results_by_parallelism[parallelism] = {
"raw": [
(avg_dl, avg_ex),
]
}
else:
results_by_parallelism[parallelism]["raw"].append((avg_dl, avg_ex))
for parallelism, values in results_by_parallelism.items():
dl = []
ex = []
for d, e in values["raw"]:
dl.append(d.total_seconds())
ex.append(e.total_seconds())
values["avg_dl_time"] = timedelta(seconds=np.mean(dl))
values["avg_ex_time"] = timedelta(seconds=np.mean(ex))
values["std_dl_time"] = timedelta(seconds=np.std(dl))
return (results_by_node, results_by_parallelism)
def plot_by_parallelism(by_parallelism: Dict, output: Optional[BinaryIO]):
df = (
pd.DataFrame.from_dict(by_parallelism, orient="index")
.sort_index()
.drop("raw", axis=1)
)
# Plotly does not work with timedelta object, let's just say this is datetime
for col in df:
df[col] = df[col] + pd.to_datetime("1970/01/01")
fig = px.line(df, y=["avg_dl_time", "std_dl_time"], x=df.index)
fig.update_layout(
yaxis_title="Time (read as HH:MM:SS)",
xaxis_title="# of nodes pulling images in parallel",
)
if output is None:
fig.show()
else:
fig.write_html(output)
def cmd_testset(args: argparse.Namespace):
by_node, by_parallelism = parse_testset(args.directory)
for node, v in by_node.items():
for parallelism, values in v.items():
print(
f"{node}: {parallelism}: {values['avg_dl_time']} {values['avg_ex_time']}"
)
for parallelism, v in by_parallelism.items():
print(f"{parallelism}: {v['avg_dl_time']} {v['avg_ex_time']}")
plot_by_parallelism(by_parallelism, args.out)
if __name__ == "__main__":
"""
Generate the input file with something like:
#!/bin/bash
REPO=docker-registry.discovery.wmnet
IMAGE=restricted/mediawiki-multiversion
TAG=2021-05-14-185433-publish
repo_uri="https://${REPO}"
# Craft the AuthConfig object needed to authenticate to docker-registry.discovery.wmnet
basicauth=$(sudo cat /var/lib/kubelet/config.json | jq -r ".auths.\"${repo_uri}\".auth" | base64 -d)
arr=(${basicauth//:/ })
auth=$(echo -n "{\"username\": \"${arr[0]}\",\"password\": \"${arr[1]}\",\"serveraddress\": \"${repo_uri}\"}" | base64 -w 0)
sudo docker rmi "${REPO}/${IMAGE}:${TAG}"
outfile="${HOSTNAME}_$(date +%s).json"
sudo curl -s --unix-socket /var/run/docker.sock -XPOST \
-d "fromImage=${REPO}/${IMAGE}&tag=${TAG}" \
-H "X-Registry-Auth: ${auth}" \
'http://docker/v1.18/images/create' | \
jq -c --unbuffered '. + {time: now}' > ${outfile}
"""
parser = argparse.ArgumentParser(description="Plot docker stats CPU usage data")
subparsers = parser.add_subparsers()
files = subparsers.add_parser(
"files", help="Parse one or more json files for timings"
)
files.add_argument("input", type=argparse.FileType("r"), nargs="+")
files.add_argument("--average", action="store_true")
files.add_argument("-v", "--verbose", action="store_true")
files.set_defaults(func=cmd_files)
testset = subparsers.add_parser("testset", help="Parse a testset directory")
testset.add_argument("directory", type=Path)
testset.add_argument("--out", type=argparse.FileType("w"), required=False)
testset.set_defaults(func=cmd_testset)
args = parser.parse_args()
try:
func = args.func
except AttributeError:
parser.error("too few arguments")
func(args)

Event Timeline

JMeybohm changed the title of this paste from docker-pulltime.py to docker_pulltime.py.May 20 2021, 9:26 AM
JMeybohm edited the content of this paste. (Show Details)