Page MenuHomePhabricator

Export retrained Tone-check model to an S3 bucket
Closed, ResolvedPublic

Description

The current status of the end-to-end model retraining is:

In order to proceed with the next steps we need to export the retrained version of the model to a space (e.g. S3 bucket) outside the PVC.
After we export the new version of the retrained model we can move forward to evaluate and test it in order to proceed to the next operations for model productionisation.

Decisions need to be made:

  • Where it would be better to build the functionality of exporting the model:
  • Permissions and infra related actions:
    • Does the pod running the retraining image have R/W access to S3?
    • Is it possible to export it directly from the PVC to the bucket?
  • Actions on S3:
    • Create a generic bucket for exporting there all models

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Copy model from the mounted PVC to dedicated S3 bucket.repos/data-engineering/airflow-dags!1961gkyziridisexport_modelmain
Customize query in GitLab

Event Timeline

Hey @brouberol, we are currently working on this task and there are some open questions.
Here is the idea:
We will build a kokkuri image which will include the logic for copying the trained model from the PVC to an S3 bucket.
This logic will be implemented in ml-piplines repo following the same logic and CI/CD processes with the retraining image. More precisely, this code will be containerised via kokkuri pipelines during the gitlab-ci process, and the generated image will be pushed to machine-learning/ml-pipelines/ docker-registry.
This image will be run via WMFKubernetesPodOperator in the tone-check retraining DAG as the last step of the pipeline.

Questions
  • Do we need specific permissions for copying files from the PVC to an S3 bucket on the POD side and/or in the container itself?
  • Do we need something specific when creating the S3 bucket that we will use for exporting the models?
  • How do you find this architecture to build the model_export functionality on the side of ml-piplines repo and not in the Airflow-DAGs repo?

The branch that we are working is this one.

Do we need specific permissions for copying files from the PVC to an S3 bucket on the POD side and/or in the container itself?

No. The only thing you'd need would be a network policy allowing egress traffic to S3 as well as credentials stored in the airlfow-ml private values, injected in the Airflow connections, to authenticate access to S3.

Do we need something specific when creating the S3 bucket that we will use for exporting the models?

You'll need to have created a specific keypair to create the bucket with, and distribute this keypair to the Airflow private values, as previously stated. We (DPE SRE) can do that.

How do you find this architecture to build the model_export functionality on the side of ml-piplines repo and not in the Airflow-DAGs repo?

If it works for you, then all the better. If you think that this is something that could benefit everyone, then wmf_airflow_common might be a better place for it, but if it is ml-related and you're happy with it, then by all means.

Q: How do you find this architecture to build the model_export functionality on the side of ml-piplines repo and not in the Airflow-DAGs repo?
A: If it works for you, then all the better. If you think that this is something that could benefit everyone, then wmf_airflow_common might be a better place for it, but if it is ml-related and you're happy with it, then by all means.

Initially I was planning to implement it directly on the airflow-dags side, in order to use the get_s3_client() method from wmf_airflow_common.clients.s3 import get_s3_client.
The get_s3_client() is running via PythonOperator, can I attach the PVC on it and run the whole thing without the WMFKubernetesPodOperator ? This option is kinda easier by avoiding:

  1. Build job logic in ml-pieplines
  2. Build and publish the image via kokkuri
  3. Set up the connections/keypairs etc...

So, if we can attach the PVC using the PythonOperator and use the get_s3_client(conn_id="thanos_swift") that would be easier I think.
I remember we had created this "thanos_swift" connection for a previous task.

Questions
  • Can I attach the PVC using the PythonOperator in order to use the wmf_airflow_common.clients.s3 and do the job using the "thanos_swift" connection?
  • Can I do the opposite? Use the wmf_airflow_common.clients.s3 directly in WMFKubernetesPodOperator ?

Can I attach the PVC using the PythonOperator in order to use the wmf_airflow_common.clients.s3 and do the job using the "thanos_swift" connection?

No, a PVC is a Kubernetes notion. A PythonOperator is just a way to execute a python function in your task pod. It has no way to attach and mount a Persistent Volume onto its runtime. If you need to read from Ceph, then your need a KubernetesPodOperator, with volume mounting from the ceph PVC.

Can I do the opposite? Use the wmf_airflow_common.clients.s3 directly in WMFKubernetesPodOperator ?

To do this, you'd need to have your KubernetesPodOperatorrun a pod that uses the airflow image itself, which should give you access to the wmf_airflow_common package. That being said, it would require the logic executed by your DAG to be fully defined in airflow-dags then, I think.

Can I do the opposite? Use the wmf_airflow_common.clients.s3 directly in WMFKubernetesPodOperator ?

To do this, you'd need to have your KubernetesPodOperatorrun a pod that uses the airflow image itself, which should give you access to the wmf_airflow_common package. That being said, it would require the logic executed by your DAG to be fully defined in airflow-dags then, I think.

This is perfect!
So is we use an image from: docker-registry/repos/data-engineering/airflow/ via the WMFKubernetesPodOperator, then we can just build the logic directly on the dags.
Thank you very much for your time @brouberol!

Update

I moved all the code and logic into the Airflow-DAGs repo. I used three different airflow images and I had different issues:

  1. Using airflow:ebd2d6ee4509445c05572ab5fa6e6afb588fb285-production or airflow:latest produced a ModuleNotFoundError: No module named 'wmf_airflow_common':
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] === First list all elements in /mnt/model-training/tone_check/20251014T132011/output_model ===
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] total 4
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] drwxrwsr-x 2 app runuser 4096 Oct 15 16:59 checkpoint-63618
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] === Checking model output path ===
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] ✅ Model directory found. Uploading to S3...
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base] Traceback (most recent call last):
[2025-10-20, 13:35:51 UTC] {pod_manager.py:536} INFO - [base]   File "<stdin>", line 3, in <module>
[2025-10-20, 13:35:51 UTC] {pod_manager.py:555} INFO - [base] ModuleNotFoundError: No module named 'wmf_airflow_common'
[2025-10-20, 13:35:51 UTC] {pod.py:1122} INFO - Deleting pod: upload-model-to-s3-ebodqj6
  1. Using airflow:2025-06-23-122527-798105179248020e79ffe44a8cb442d8675e9db7 tag it never loaded in the airflow UI (it was parsing the code for ever).
  1. Using airflow-2.10.5-py3.11-2025-10-08-171529-e8280ad4b26af3cc41fd00af1ee6840463c2e0f4 it threw the exact same error.

From the above logs, we can see that the container starts correctly, prints the statements and runs until the wmf_airflow_common import.
So, it seems that it cannot find the wmf_common files. Do I miss something?
I tried to build the image locally run it interactively but I cannot see the wmf code. Do I need to pip install it or something?

I tried to avoid importing the s3_client using from wmf_airflow_common.clients.s3 import get_s3_client and implement the logic directly in the container like this.
Now I am getting this error:

[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Unable to load the config, contains a configuration error.
[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Traceback (most recent call last):
[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base]   File "/usr/lib/python3.11/pathlib.py", line 1117, in mkdir
[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base]     os.mkdir(self, mode)
[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/logs/scheduler/2025-10-22'
[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] During handling of the above exception, another exception occurred:

You can see the full logs in the following pase:

1tone-check-training-dag-upload-model-to-s3-iq5wn62q
2Log message source details
3[2025-10-22, 11:56:28 UTC] {local_task_job_runner.py:123}Pre task execution logs
4[2025-10-22, 11:56:28 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
5[2025-10-22, 11:56:28 UTC] {pod.py:1276} INFO - Building pod upload-model-to-s3-rwlvnf0 with labels: {'dag_id': 'tone_check_training_dag', 'task_id': 'upload_model_to_s3', 'run_id': 'manual__2025-10-22T115614.8011990000-a93871998', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
6[2025-10-22, 11:56:29 UTC] {pod.py:573} INFO - Found matching pod upload-model-to-s3-rwlvnf0 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.10.5', 'app': 'airflow', 'component': 'task-pod', 'dag_id': 'tone_check_training_dag', 'kubernetes_pod_operator': 'True', 'release': 'dev-gkyziridis', 'routed_via': 'dev-gkyziridis', 'run_id': 'manual__2025-10-22T115614.8011990000-a93871998', 'task_id': 'upload_model_to_s3', 'try_number': '1'}
7[2025-10-22, 11:56:29 UTC] {pod.py:574} INFO - `try_number` of task_instance: 1
8[2025-10-22, 11:56:29 UTC] {pod.py:575} INFO - `try_number` of pod: 1
9[2025-10-22, 11:56:29 UTC] {pod_manager.py:390} INFO - The Pod has an Event: Successfully assigned airflow-dev/upload-model-to-s3-rwlvnf0 to dse-k8s-worker1012.eqiad.wmnet from None
10[2025-10-22, 11:56:29 UTC] {pod_manager.py:410}Waiting until 120s to get the POD scheduled...
11[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] === First list all elements in /mnt/model-training/tone_check/20251014T132011/output_model ===
12[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] total 4
13[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] drwxrwsr-x 2 app runuser 4096 Oct 15 16:59 checkpoint-63618
14[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Current WD: /opt/airflow
15[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] === ls -l dags/ ===
16[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] total 0
17[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] === ls -l ../ ===
18[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] total 8
19[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] drwxr-xr-x 1 app app 4096 Oct 8 17:31 airflow
20[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] drwxr-xr-x 2 app app 4096 Oct 8 17:24 lib
21[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] === Checking model output path ===
22[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base]Model directory found. Uploading to S3...
23[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Unable to load the config, contains a configuration error.
24[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Traceback (most recent call last):
25[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/pathlib.py", line 1117, in mkdir
26[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] os.mkdir(self, mode)
27[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/logs/scheduler/2025-10-22'
28[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] During handling of the above exception, another exception occurred:
29[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Traceback (most recent call last):
30[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/logging/config.py", line 562, in configure
31[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] handler = self.configure_handler(handlers[name])
32[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
33[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/logging/config.py", line 747, in configure_handler
34[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] result = factory(**kwargs)
35[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] ^^^^^^^^^^^^^^^^^
36[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/home/app/.local/lib/python3.11/site-packages/airflow/utils/log/file_processor_handler.py", line 53, in __init__
37[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Path(self._get_log_directory()).mkdir(parents=True, exist_ok=True)
38[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/pathlib.py", line 1121, in mkdir
39[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] self.parent.mkdir(parents=True, exist_ok=True)
40[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/pathlib.py", line 1117, in mkdir
41[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] os.mkdir(self, mode)
42[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] PermissionError: [Errno 13] Permission denied: '/opt/airflow/logs/scheduler'
43[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] The above exception was the direct cause of the following exception:
44[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] Traceback (most recent call last):
45[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "<stdin>", line 4, in <module>
46[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/home/app/.local/lib/python3.11/site-packages/airflow/__init__.py", line 74, in <module>
47[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] settings.initialize()
48[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/home/app/.local/lib/python3.11/site-packages/airflow/settings.py", line 785, in initialize
49[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] LOGGING_CLASS_PATH = configure_logging()
50[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] ^^^^^^^^^^^^^^^^^^^
51[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/home/app/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 74, in configure_logging
52[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] raise e
53[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/home/app/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 69, in configure_logging
54[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] dictConfig(logging_config)
55[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/logging/config.py", line 812, in dictConfig
56[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] dictConfigClass(config).configure()
57[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] File "/usr/lib/python3.11/logging/config.py", line 569, in configure
58[2025-10-22, 11:56:44 UTC] {pod_manager.py:536} INFO - [base] raise ValueError('Unable to configure handler '
59[2025-10-22, 11:56:44 UTC] {pod_manager.py:555} INFO - [base] ValueError: Unable to configure handler 'processor'
60[2025-10-22, 11:56:44 UTC] {pod.py:1122} INFO - Deleting pod: upload-model-to-s3-rwlvnf0
61[2025-10-22, 11:56:44 UTC] {taskinstance.py:3313} ERROR - Task failed with exception
62Traceback (most recent call last):
63 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
64 result = _execute_callable(context=context, **execute_callable_kwargs)
65 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
66 return ExecutionCallableRunner(
67 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run
68 return self.func(*args, **kwargs)
69 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
70 return func(self, *args, **kwargs)
71 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 640, in execute
72 return self.execute_sync(context)
73 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 721, in execute_sync
74 self.cleanup(
75 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1053, in cleanup
76 raise AirflowException(
77airflow.exceptions.AirflowException: Pod upload-model-to-s3-rwlvnf0 returned a failure.
78remote_pod: {'api_version': 'v1',
79 'kind': 'Pod',
80 'metadata': {'annotations': {'cni.projectcalico.org/containerID': '37fb57f310851111c487732ff072d58bad0d062fd2512ff96595dd3c8a53806b',
81 'cni.projectcalico.org/podIP': '',
82 'cni.projectcalico.org/podIPs': '',
83 'container.seccomp.security.alpha.kubernetes.io/base': 'runtime/default'},
84 'creation_timestamp': datetime.datetime(2025, 10, 22, 11, 56, 29, tzinfo=tzlocal()),
85 'deletion_grace_period_seconds': None,
86 'deletion_timestamp': None,
87 'finalizers': None,
88 'generate_name': None,
89 'generation': None,
90 'labels': {'airflow_kpo_in_cluster': 'True',
91 'airflow_version': '2.10.5',
92 'app': 'airflow',
93 'component': 'task-pod',
94 'dag_id': 'tone_check_training_dag',
95 'kubernetes_pod_operator': 'True',
96 'release': 'dev-gkyziridis',
97 'routed_via': 'dev-gkyziridis',
98 'run_id': 'manual__2025-10-22T115614.8011990000-a93871998',
99 'task_id': 'upload_model_to_s3',
100 'try_number': '1'},
101 'managed_fields': [{'api_version': 'v1',
102 'fields_type': 'FieldsV1',
103 'fields_v1': {'f:metadata': {'f:labels': {'.': {},
104 'f:airflow_kpo_in_cluster': {},
105 'f:airflow_version': {},
106 'f:app': {},
107 'f:component': {},
108 'f:dag_id': {},
109 'f:kubernetes_pod_operator': {},
110 'f:release': {},
111 'f:routed_via': {},
112 'f:run_id': {},
113 'f:task_id': {},
114 'f:try_number': {}}},
115 'f:spec': {'f:affinity': {'.': {},
116 'f:nodeAffinity': {'.': {},
117 'f:requiredDuringSchedulingIgnoredDuringExecution': {}}},
118 'f:containers': {'k:{"name":"base"}': {'.': {},
119 'f:args': {},
120 'f:command': {},
121 'f:env': {'.': {},
122 'k:{"name":"AWS_REQUEST_CHECKSUM_CALCULATION"}': {'.': {},
123 'f:name': {},
124 'f:value': {}},
125 'k:{"name":"AWS_RESPONSE_CHECKSUM_VALIDATION"}': {'.': {},
126 'f:name': {},
127 'f:value': {}},
128 'k:{"name":"REQUESTS_CA_BUNDLE"}': {'.': {},
129 'f:name': {},
130 'f:value': {}}},
131 'f:image': {},
132 'f:imagePullPolicy': {},
133 'f:name': {},
134 'f:resources': {'.': {},
135 'f:limits': {'.': {},
136 'f:cpu': {},
137 'f:memory': {}},
138 'f:requests': {'.': {},
139 'f:cpu': {},
140 'f:memory': {}}},
141 'f:securityContext': {'.': {},
142 'f:allowPrivilegeEscalation': {},
143 'f:capabilities': {'.': {},
144 'f:drop': {}},
145 'f:runAsNonRoot': {},
146 'f:seccompProfile': {'.': {},
147 'f:type': {}}},
148 'f:terminationMessagePath': {},
149 'f:terminationMessagePolicy': {},
150 'f:volumeMounts': {'.': {},
151 'k:{"mountPath":"/mnt/model-training"}': {'.': {},
152 'f:mountPath': {},
153 'f:name': {}}}}},
154 'f:dnsPolicy': {},
155 'f:enableServiceLinks': {},
156 'f:priorityClassName': {},
157 'f:restartPolicy': {},
158 'f:schedulerName': {},
159 'f:securityContext': {'.': {},
160 'f:fsGroup': {}},
161 'f:terminationGracePeriodSeconds': {},
162 'f:volumes': {'.': {},
163 'k:{"name":"airflow-ml-model-training-volume"}': {'.': {},
164 'f:name': {},
165 'f:persistentVolumeClaim': {'.': {},
166 'f:claimName': {}}}}}},
167 'manager': 'OpenAPI-Generator',
168 'operation': 'Update',
169 'subresource': None,
170 'time': datetime.datetime(2025, 10, 22, 11, 56, 29, tzinfo=tzlocal())},
171 {'api_version': 'v1',
172 'fields_type': 'FieldsV1',
173 'fields_v1': {'f:metadata': {'f:annotations': {'f:cni.projectcalico.org/containerID': {},
174 'f:cni.projectcalico.org/podIP': {},
175 'f:cni.projectcalico.org/podIPs': {}}}},
176 'manager': 'Go-http-client',
177 'operation': 'Update',
178 'subresource': 'status',
179 'time': datetime.datetime(2025, 10, 22, 11, 56, 34, tzinfo=tzlocal())},
180 {'api_version': 'v1',
181 'fields_type': 'FieldsV1',
182 'fields_v1': {'f:status': {'f:conditions': {'k:{"type":"ContainersReady"}': {'.': {},
183 'f:lastProbeTime': {},
184 'f:lastTransitionTime': {},
185 'f:reason': {},
186 'f:status': {},
187 'f:type': {}},
188 'k:{"type":"Initialized"}': {'.': {},
189 'f:lastProbeTime': {},
190 'f:lastTransitionTime': {},
191 'f:status': {},
192 'f:type': {}},
193 'k:{"type":"Ready"}': {'.': {},
194 'f:lastProbeTime': {},
195 'f:lastTransitionTime': {},
196 'f:reason': {},
197 'f:status': {},
198 'f:type': {}}},
199 'f:containerStatuses': {},
200 'f:hostIP': {},
201 'f:phase': {},
202 'f:podIP': {},
203 'f:podIPs': {'.': {},
204 'k:{"ip":"10.67.28.239"}': {'.': {},
205 'f:ip': {}},
206 'k:{"ip":"2620:0:861:302:7e7d:804d:2c9b:f66f"}': {'.': {},
207 'f:ip': {}}},
208 'f:startTime': {}}},
209 'manager': 'kubelet',
210 'operation': 'Update',
211 'subresource': 'status',
212 'time': datetime.datetime(2025, 10, 22, 11, 56, 36, tzinfo=tzlocal())}],
213 'name': 'upload-model-to-s3-rwlvnf0',
214 'namespace': 'airflow-dev',
215 'owner_references': None,
216 'resource_version': '853719349',
217 'self_link': None,
218 'uid': '60fe59cf-fc50-4148-baac-a6e92451dfac'},
219 'spec': {'active_deadline_seconds': None,
220 'affinity': {'node_affinity': {'preferred_during_scheduling_ignored_during_execution': None,
221 'required_during_scheduling_ignored_during_execution': {'node_selector_terms': [{'match_expressions': [{'key': 'kubernetes.io/hostname',
222 'operator': 'NotIn',
223 'values': ['dse-k8s-worker1001.eqiad.wmnet']}],
224 'match_fields': None}]}},
225 'pod_affinity': None,
226 'pod_anti_affinity': None},
227 'automount_service_account_token': None,
228 'containers': [{'args': ['\n'
229 'set -e\n'
230 'echo "=== First list all elements in '
231 '/mnt/model-training/tone_check/20251014T132011/output_model '
232 '==="\n'
233 'ls -l '
234 '/mnt/model-training/tone_check/20251014T132011/output_model\n'
235 'echo "Current WD: $(pwd)"\n'
236 '\n'
237 'echo "=== ls -l dags/ ==="\n'
238 'ls -l dags/\n'
239 '\n'
240 'echo "=== ls -l ../ ==="\n'
241 'ls -l ../\n'
242 '\n'
243 'echo "=== Checking model output path ==="\n'
244 'if [ ! -d '
245 '"/mnt/model-training/tone_check/20251014T132011/output_model" '
246 ']; then\n'
247 ' echo "❌ Model directory does not '
248 'exist: '
249 '/mnt/model-training/tone_check/20251014T132011/output_model"\n'
250 ' exit 1\n'
251 'fi\n'
252 '\n'
253 'if [ -z "$(ls -A '
254 '/mnt/model-training/tone_check/20251014T132011/output_model)" '
255 ']; then\n'
256 ' echo "⚠️ Model directory is empty: '
257 '/mnt/model-training/tone_check/20251014T132011/output_model"\n'
258 ' exit 1\n'
259 'fi\n'
260 '\n'
261 'echo "✅ Model directory found. Uploading '
262 'to S3..."\n'
263 "python3 - <<'PYCODE'\n"
264 'import os\n'
265 'from pathlib import Path\n'
266 'import boto3\n'
267 'from airflow.configuration import '
268 'get_custom_secret_backend\n'
269 '\n'
270 'def get_s3_client(conn_id):\n'
271 ' secret_backend = '
272 'get_custom_secret_backend()\n'
273 ' s3_connection = '
274 'secret_backend.get_connection(conn_id=conn_id)\n'
275 ' session = boto3.session.Session()\n'
276 ' return session.client(\n'
277 ' service_name="s3",\n'
278 ' '
279 'aws_access_key_id=s3_connection.login,\n'
280 ' '
281 'aws_secret_access_key=s3_connection.password,\n'
282 ' **s3_connection.extra_dejson,\n'
283 ' )\n'
284 '\n'
285 'def upload_directory_to_s3(local_dir, '
286 'bucket, s3_prefix, conn_id=thanos_swift):\n'
287 ' s3 = get_s3_client(conn_id=conn_id)\n'
288 ' local_path = Path(local_dir)\n'
289 '\n'
290 ' for root, _, files in '
291 'os.walk(local_path):\n'
292 ' for f in files:\n'
293 ' full_path = Path(root) / f\n'
294 ' rel_path = '
295 'full_path.relative_to(local_path)\n'
296 ' s3_key = '
297 'f"{s3_prefix}/{rel_path.as_posix()}"\n'
298 ' print(f"⬆️ Uploading: '
299 '{full_path} → s3://{bucket}/{s3_key}")\n'
300 ' s3.upload_file(str(full_path), '
301 'bucket, s3_key)\n'
302 '\n'
303 ' print("✅ Upload complete.")\n'
304 '\n'
305 'upload_directory_to_s3(\n'
306 ' '
307 'local_dir="/mnt/model-training/tone_check/20251014T132011/output_model",\n'
308 ' bucket="wmf-ml-models",\n'
309 ' '
310 's3_prefix="retrained-models/tone-check/"\n'
311 ')\n'
312 'PYCODE'],
313 'command': ['bash', '-c'],
314 'env': [{'name': 'REQUESTS_CA_BUNDLE',
315 'value': '/etc/ssl/certs/ca-certificates.crt',
316 'value_from': None},
317 {'name': 'AWS_REQUEST_CHECKSUM_CALCULATION',
318 'value': 'WHEN_REQUIRED',
319 'value_from': None},
320 {'name': 'AWS_RESPONSE_CHECKSUM_VALIDATION',
321 'value': 'WHEN_REQUIRED',
322 'value_from': None}],
323 'env_from': None,
324 'image': 'docker-registry.wikimedia.org/repos/data-engineering/airflow-dags:airflow-2.10.5-py3.11-2025-10-08-171529-e8280ad4b26af3cc41fd00af1ee6840463c2e0f4',
325 'image_pull_policy': 'IfNotPresent',
326 'lifecycle': None,
327 'liveness_probe': None,
328 'name': 'base',
329 'ports': None,
330 'readiness_probe': None,
331 'resize_policy': None,
332 'resources': {'claims': None,
333 'limits': {'cpu': '2', 'memory': '3Gi'},
334 'requests': {'cpu': '1',
335 'memory': '1500Mi'}},
336 'restart_policy': None,
337 'security_context': {'allow_privilege_escalation': False,
338 'app_armor_profile': None,
339 'capabilities': {'add': None,
340 'drop': ['ALL']},
341 'privileged': None,
342 'proc_mount': None,
343 'read_only_root_filesystem': None,
344 'run_as_group': None,
345 'run_as_non_root': True,
346 'run_as_user': None,
347 'se_linux_options': None,
348 'seccomp_profile': {'localhost_profile': None,
349 'type': 'RuntimeDefault'},
350 'windows_options': None},
351 'startup_probe': None,
352 'stdin': None,
353 'stdin_once': None,
354 'termination_message_path': '/dev/termination-log',
355 'termination_message_policy': 'File',
356 'tty': None,
357 'volume_devices': None,
358 'volume_mounts': [{'mount_path': '/mnt/model-training',
359 'mount_propagation': None,
360 'name': 'airflow-ml-model-training-volume',
361 'read_only': None,
362 'recursive_read_only': None,
363 'sub_path': None,
364 'sub_path_expr': None},
365 {'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
366 'mount_propagation': None,
367 'name': 'kube-api-access-6l58h',
368 'read_only': True,
369 'recursive_read_only': None,
370 'sub_path': None,
371 'sub_path_expr': None}],
372 'working_dir': None}],
373 'dns_config': None,
374 'dns_policy': 'ClusterFirst',
375 'enable_service_links': True,
376 'ephemeral_containers': None,
377 'host_aliases': None,
378 'host_ipc': None,
379 'host_network': None,
380 'host_pid': None,
381 'host_users': None,
382 'hostname': None,
383 'image_pull_secrets': None,
384 'init_containers': None,
385 'node_name': 'dse-k8s-worker1012.eqiad.wmnet',
386 'node_selector': None,
387 'os': None,
388 'overhead': None,
389 'preemption_policy': 'PreemptLowerPriority',
390 'priority': -100,
391 'priority_class_name': 'low-priority-pod',
392 'readiness_gates': None,
393 'resource_claims': None,
394 'resources': None,
395 'restart_policy': 'Never',
396 'runtime_class_name': None,
397 'scheduler_name': 'default-scheduler',
398 'scheduling_gates': None,
399 'security_context': {'app_armor_profile': None,
400 'fs_group': 900,
401 'fs_group_change_policy': None,
402 'run_as_group': None,
403 'run_as_non_root': None,
404 'run_as_user': None,
405 'se_linux_change_policy': None,
406 'se_linux_options': None,
407 'seccomp_profile': None,
408 'supplemental_groups': None,
409 'supplemental_groups_policy': None,
410 'sysctls': None,
411 'windows_options': None},
412 'service_account': 'default',
413 'service_account_name': 'default',
414 'set_hostname_as_fqdn': None,
415 'share_process_namespace': None,
416 'subdomain': None,
417 'termination_grace_period_seconds': 30,
418 'tolerations': [{'effect': 'NoExecute',
419 'key': 'node.kubernetes.io/not-ready',
420 'operator': 'Exists',
421 'toleration_seconds': 300,
422 'value': None},
423 {'effect': 'NoExecute',
424 'key': 'node.kubernetes.io/unreachable',
425 'operator': 'Exists',
426 'toleration_seconds': 300,
427 'value': None}],
428 'topology_spread_constraints': None,
429 'volumes': [{'aws_elastic_block_store': None,
430 'azure_disk': None,
431 'azure_file': None,
432 'cephfs': None,
433 'cinder': None,
434 'config_map': None,
435 'csi': None,
436 'downward_api': None,
437 'empty_dir': None,
438 'ephemeral': None,
439 'fc': None,
440 'flex_volume': None,
441 'flocker': None,
442 'gce_persistent_disk': None,
443 'git_repo': None,
444 'glusterfs': None,
445 'host_path': None,
446 'image': None,
447 'iscsi': None,
448 'name': 'airflow-ml-model-training-volume',
449 'nfs': None,
450 'persistent_volume_claim': {'claim_name': 'airflow-ml-model-training',
451 'read_only': None},
452 'photon_persistent_disk': None,
453 'portworx_volume': None,
454 'projected': None,
455 'quobyte': None,
456 'rbd': None,
457 'scale_io': None,
458 'secret': None,
459 'storageos': None,
460 'vsphere_volume': None},
461 {'aws_elastic_block_store': None,
462 'azure_disk': None,
463 'azure_file': None,
464 'cephfs': None,
465 'cinder': None,
466 'config_map': None,
467 'csi': None,
468 'downward_api': None,
469 'empty_dir': None,
470 'ephemeral': None,
471 'fc': None,
472 'flex_volume': None,
473 'flocker': None,
474 'gce_persistent_disk': None,
475 'git_repo': None,
476 'glusterfs': None,
477 'host_path': None,
478 'image': None,
479 'iscsi': None,
480 'name': 'kube-api-access-6l58h',
481 'nfs': None,
482 'persistent_volume_claim': None,
483 'photon_persistent_disk': None,
484 'portworx_volume': None,
485 'projected': {'default_mode': 420,
486 'sources': [{'cluster_trust_bundle': None,
487 'config_map': None,
488 'downward_api': None,
489 'secret': None,
490 'service_account_token': {'audience': None,
491 'expiration_seconds': 3607,
492 'path': 'token'}},
493 {'cluster_trust_bundle': None,
494 'config_map': {'items': [{'key': 'ca.crt',
495 'mode': None,
496 'path': 'ca.crt'}],
497 'name': 'kube-root-ca.crt',
498 'optional': None},
499 'downward_api': None,
500 'secret': None,
501 'service_account_token': None},
502 {'cluster_trust_bundle': None,
503 'config_map': None,
504 'downward_api': {'items': [{'field_ref': {'api_version': 'v1',
505 'field_path': 'metadata.namespace'},
506 'mode': None,
507 'path': 'namespace',
508 'resource_field_ref': None}]},
509 'secret': None,
510 'service_account_token': None}]},
511 'quobyte': None,
512 'rbd': None,
513 'scale_io': None,
514 'secret': None,
515 'storageos': None,
516 'vsphere_volume': None}]},
517 'status': {'conditions': [{'last_probe_time': None,
518 'last_transition_time': datetime.datetime(2025, 10, 22, 11, 56, 29, tzinfo=tzlocal()),
519 'message': None,
520 'reason': None,
521 'status': 'True',
522 'type': 'Initialized'},
523 {'last_probe_time': None,
524 'last_transition_time': datetime.datetime(2025, 10, 22, 11, 56, 36, tzinfo=tzlocal()),
525 'message': None,
526 'reason': 'PodFailed',
527 'status': 'False',
528 'type': 'Ready'},
529 {'last_probe_time': None,
530 'last_transition_time': datetime.datetime(2025, 10, 22, 11, 56, 36, tzinfo=tzlocal()),
531 'message': None,
532 'reason': 'PodFailed',
533 'status': 'False',
534 'type': 'ContainersReady'},
535 {'last_probe_time': None,
536 'last_transition_time': datetime.datetime(2025, 10, 22, 11, 56, 29, tzinfo=tzlocal()),
537 'message': None,
538 'reason': None,
539 'status': 'True',
540 'type': 'PodScheduled'}],
541 'container_statuses': [{'allocated_resources': None,
542 'allocated_resources_status': None,
543 'container_id': 'containerd://f5d371d56d5a93b7d2af35a8b17497d6ff122cbd0ca6df3c5cd151793c24b05a',
544 'image': 'docker-registry.wikimedia.org/repos/data-engineering/airflow-dags:airflow-2.10.5-py3.11-2025-10-08-171529-e8280ad4b26af3cc41fd00af1ee6840463c2e0f4',
545 'image_id': 'docker-registry.wikimedia.org/repos/data-engineering/airflow-dags@sha256:1e6019bc2ce825a02b3b2fe0878248d726bf8caf37ea1b2d33c2748f3be50a90',
546 'last_state': {'running': None,
547 'terminated': None,
548 'waiting': None},
549 'name': 'base',
550 'ready': False,
551 'resources': None,
552 'restart_count': 0,
553 'started': False,
554 'state': {'running': None,
555 'terminated': {'container_id': 'containerd://f5d371d56d5a93b7d2af35a8b17497d6ff122cbd0ca6df3c5cd151793c24b05a',
556 'exit_code': 1,
557 'finished_at': datetime.datetime(2025, 10, 22, 11, 56, 35, tzinfo=tzlocal()),
558 'message': None,
559 'reason': 'Error',
560 'signal': None,
561 'started_at': datetime.datetime(2025, 10, 22, 11, 56, 34, tzinfo=tzlocal())},
562 'waiting': None},
563 'user': None,
564 'volume_mounts': None}],
565 'ephemeral_container_statuses': None,
566 'host_i_ps': None,
567 'host_ip': '10.64.0.115',
568 'init_container_statuses': None,
569 'message': None,
570 'nominated_node_name': None,
571 'phase': 'Failed',
572 'pod_i_ps': [{'ip': '10.67.28.239'},
573 {'ip': '2620:0:861:302:7e7d:804d:2c9b:f66f'}],
574 'pod_ip': '10.67.28.239',
575 'qos_class': 'Burstable',
576 'reason': None,
577 'resize': None,
578 'resource_claim_statuses': None,
579 'start_time': datetime.datetime(2025, 10, 22, 11, 56, 29, tzinfo=tzlocal())}}
580[2025-10-22, 11:56:44 UTC] {taskinstance.py:1226} INFO - Marking task as UP_FOR_RETRY. dag_id=tone_check_training_dag, task_id=upload_model_to_s3, run_id=manual__2025-10-22T11:56:14.801199+00:00, execution_date=20251022T115614, start_date=20251022T115628, end_date=20251022T115644
581[2025-10-22, 11:56:44 UTC] {taskinstance.py:341}Post task execution logs

Update

After some discussions with people from the DE team, I am pasting here some ideas and good practices which answer the above comments.

Ideas from DE

The main problem seems to be (in the above comment) that we are missing the volumes and volumeMounts part of the airflow pod spec.
So we could add these to the task pod, but not sure that this is the most sustainable or most efficient way to achieve it.
Screenshots from the output of kubectl describe pod airflow-scheduler-7c6464cbdd-w8hzp :

Mounts (422×1 px, 349 KB)
Volumes (1×1 px, 651 KB)

We would probably be interested in the discussion happening at: T405360: Implement an Airflow operator for moving data from point A to B
In our case, we trying to move data from a locally mounted cephf's PVC, which just looks like a local filesystem, to an S3 bucket on swift/thanos.
We might find that using our data-engineering/sync-utils image image is efficient and we could use rclone or s3cmd to push to the file(s) to swift.

We shouldn't have to go about re-adding fundamental airflow infrastructure in the tasks, which is why I think that probably using an airflow image in a KubernetesPodOperator based task is probably over-complicated.

However, maybe there is another way:

Just as you can customise the pod that is launched by the KubernetesPodOperator.

This means that we can mount the PVC within the pod that runs the task.

We have some examples of where we modify the pod that is used to run the task:
Code Examples:

Both of the above examples are where we modify the resources associated with the executor pod that launches a KubernetesPodOperator based task. So there are two pods in this kind of setup.
In the current case, you could override the executor_config for the pod to mount the volume, then we could use the standard PythonOperator and get the S3 creds with wmf_airflow_common.clients.s3 as you had originally intended. No need to use a KubernetesPodOperator at all.

Update

I configured the executor_pod_override to mount the PVC so we can use it in our DAG with all the components (pure Airflow components and WMFKuberentesPodOperator).
I tested the final step move_model_to_s3_task using the get_s3_client() from wmf_airflow_common/clients/ and it was working perfectly.
The model was copied in the dedicated S3 bucket for exporting retrained models "s3://wmf-ml-models/retrained-models".

$ s3cmd -c /etc/s3cmd/cfg.d/ml-team.cfg ls -H --recursive s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/
2026-01-20 13:33   865   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/config.json
2026-01-20 13:33   678M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/model.safetensors
2026-01-20 13:33  1357M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/optimizer.pt
2026-01-20 13:33    13K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/rng_state.pth
2026-01-20 13:33  1064   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/scheduler.pt
2026-01-20 13:33   695   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/special_tokens_map.json
2026-01-20 13:33     2M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/tokenizer.json
2026-01-20 13:33  1330   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/tokenizer_config.json
2026-01-20 13:33    24K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/trainer_state.json
2026-01-20 13:33     5K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/training_args.bin
2026-01-20 13:33   972K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-63618/vocab.txt

You can find the logs of the move_model_to_s3_task in the following paste:

1tone-check-training-dag-move-model-to-s3-p56ysy2n
2 ▶ Log message source details
3/home/app/.local/lib/python3.11/site-packages/airflow/metrics/statsd_logger.py:184 RemovedInAirflow3Warning: The basic metric validator will be deprecated in the future in favor of pattern-matching. You can try this now by setting config option metrics_use_pattern_match to True.
4/opt/airflow/airflow_local_settings.py:14 DeprecationWarning: The `airflow.kubernetes.pod_generator.PodDefaults` class is deprecated. Please use `'airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated.PodDefaults'`. The `cncf.kubernetes` provider must be >= 7.4.0 for that..
5[2026-01-20, 14:14:57 UTC] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/airflow_dags/ml/dags/tone_check_training_dag.py
6[2026-01-20, 14:14:57 UTC] {dag_default_args.py:273} INFO - Dag-default-args set is `dev_wmf`.
7/home/app/.local/lib/python3.11/site-packages/conda_pack/core.py:15 UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
8[2026-01-20, 14:14:57 UTC] {task_command.py:467} INFO - Running <TaskInstance: tone_check_training_dag.move_model_to_s3 manual__2026-01-20T14:14:29.438216+00:00 [queued]> on host tone-check-training-dag-move-model-to-s3-p56ysy2n
9[2026-01-20, 14:14:57 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
10[2026-01-20, 14:14:57 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
11[2026-01-20, 14:14:58 UTC] {tone_check_training_dag.py:97} INFO - [+] S3 client loaded !
12[2026-01-20, 14:14:58 UTC] {tone_check_training_dag.py:99} INFO - Searching files in /mnt/model-training/tone_check/20251014T132011/output_model:
13[2026-01-20, 14:14:58 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/config.json | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/config.json
14[2026-01-20, 14:14:58 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/model.safetensors | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/model.safetensors
15[2026-01-20, 14:15:05 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/special_tokens_map.json | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/special_tokens_map.json
16[2026-01-20, 14:15:05 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/rng_state.pth | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/rng_state.pth
17[2026-01-20, 14:15:06 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/tokenizer_config.json | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/tokenizer_config.json
18[2026-01-20, 14:15:06 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/vocab.txt | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/vocab.txt
19[2026-01-20, 14:15:06 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/tokenizer.json | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/tokenizer.json
20[2026-01-20, 14:15:07 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/training_args.bin | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/training_args.bin
21[2026-01-20, 14:15:07 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/scheduler.pt | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/scheduler.pt
22[2026-01-20, 14:15:07 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/trainer_state.json | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/trainer_state.json
23[2026-01-20, 14:15:07 UTC] {tone_check_training_dag.py:105} INFO - - File: /mnt/model-training/tone_check/20251014T132011/output_model/checkpoint-63618/optimizer.pt | wmf-ml-models | retrained-models/tone-check/checkpoint-63618/optimizer.pt
24[2026-01-20, 14:15:22 UTC] {tone_check_training_dag.py:108} INFO - [+] Files uploded correctly at: s3://wmf-ml-models/retrained-models/tone-check//
25[2026-01-20, 14:15:22 UTC] {python.py:240} INFO - Done. Returned value was: None
26[2026-01-20, 14:15:22 UTC] {taskinstance.py:341} ▶ Post task execution logs
27[2026-01-20, 14:15:22 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
28[2026-01-20, 14:15:22 UTC] {base.py:84} INFO - Retrieving connection 's3_dpe'
29[2026-01-20, 14:15:22 UTC] {connection_wrapper.py:331} INFO - AWS Connection (conn_id='s3_dpe', conn_type='aws') credentials retrieved from login and password.

I will test the whole tone-check retraining pipeline end-to-end and paste the updates.

Update

I am facing some difficulties to test the final component: move_model_to_s3 since the pipeline fails after the copy_hdfs_to_pvc step and during the start of the traing_tone_check step.
The copy_hdfs_to_pvc finishes successfully (it copies the data correctly), the pod is deleted after the job, but when the next component traing_tone_check starts, it fails throwing the following error Multi-Attach error for volume "pvc-8a6a2920-8d7e-4616-8ab6-a6a70b26d116" Volume is already used by pod(s) tone-check-training-dag-train-tone-check-xbg6fmsv from None:

2026-01-23, 14:38:45 UTC] {pod_manager.py:153} ▼ Waiting until 600s to get the POD scheduled...
[2026-01-23, 14:38:45 UTC] {pod_manager.py:177} INFO - Waiting 600s to get the POD running...
[2026-01-23, 14:38:45 UTC] {pod_manager.py:127} INFO - The Pod has an Event: Successfully assigned airflow-dev/train-tone-check-nxies9u to dse-k8s-worker1009.eqiad.wmnet from None
[2026-01-23, 14:38:50 UTC] {pod_manager.py:127} INFO - The Pod has an Event: Multi-Attach error for volume "pvc-8a6a2920-8d7e-4616-8ab6-a6a70b26d116" Volume is already used by pod(s) tone-check-training-dag-train-tone-check-xbg6fmsv from None
[2026-01-23, 14:40:50 UTC] {pod_manager.py:127} INFO - The Pod has an Event: Unable to attach or mount volumes: unmounted volumes=[airflow-ml-model-training-volume], unattached volumes=[airflow-dev-gkyziridis-worker--etc-refinery airflow-ml-model-training-volume kube-api-access-s2wgk]: timed out waiting for the condition from None
[2026-01-23, 14:43:06 UTC] {pod_manager.py:127} INFO - The Pod has an Event: Unable to attach or mount volumes: unmounted volumes=[airflow-ml-model-training-volume], unattached volumes=[airflow-ml-model-training-volume kube-api-access-s2wgk airflow-dev-gkyziridis-worker--etc-refinery]: timed out waiting for the condition from None
[2026-01-23, 14:45:21 UTC] {pod_manager.py:127} INFO - The Pod has an Event: Unable to attach or mount volumes: unmounted volumes=[airflow-ml-model-training-volume], unattached volumes=[kube-api-access-s2wgk airflow-dev-gkyziridis-worker--etc-refinery airflow-ml-model-training-volume]: timed out waiting for the condition from None
[2026-01-23, 14:48:47 UTC] {pod_manager.py:180} ▲▲▲ Log group end
[2026-01-23, 14:48:47 UTC] {pod.py:1171} INFO - Deleting pod: train-tone-check-nxies9u

I also checked the PVC using kubectl and I see that the PVC is "RWO": "ReadWriteOnce" I am not sure if this makes the problem:

$ kube_env airflow-ml-deploy dse-k8s-eqiad
$ kubectl get pvc airflow-ml-model-training -n airflow-dev
NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
airflow-ml-model-training   Bound    pvc-8a6a2920-8d7e-4616-8ab6-a6a70b26d116   20Gi       RWO            ceph-rbd-ssd   151d

Thoughts:

  • I followed the suggestions from the DE team in the above comments: T406217#11306562 and configured the executor_pod_override in order to make the PVC inside the DAG accessible to all the airflow/kubernetes operators. The strange thing is that if "that was the problem" then the first WMFKubernetesPodOperator in the pipeline which is the copy_hdfs_to_pvc , should fail, although the pipeline fails in the second WMFKubernetesPodOperator which is the train_tone_check_model.
  • Could it be that the PVC is RWO (ReadWriteOnce) and not RWX (ReadWriteMany)?

Update

The end-to-end tone-check retraining pipeline succeeded, we solved the issues of Multy-Attach PVC.

image.png (946×2 px, 112 KB)

The new version of the retrained tone-check model is successfully copied to the dedicated S3 bucket under: s3://wmf-ml-models/retrained-models/tone-check/, here are the logs of the export step:
1tone-check-training-dag-move-model-to-s3-nv8wgsew
2 ▶ Log message source details
3[2026-01-28, 22:24:03 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
4[2026-01-28, 22:24:04 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
5[2026-01-28, 22:24:05 UTC] {tone_check_training_dag.py:101} INFO - [+] S3 client loaded !
6[2026-01-28, 22:24:05 UTC] {tone_check_training_dag.py:103} INFO - Searching files in /mnt/model-training/tone_check/20260128T134152/output_model:
7[2026-01-28, 22:24:05 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/config.json | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/config.json
8[2026-01-28, 22:24:05 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/model.safetensors | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/model.safetensors
9[2026-01-28, 22:24:12 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/special_tokens_map.json | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/special_tokens_map.json
10[2026-01-28, 22:24:12 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/rng_state.pth | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/rng_state.pth
11[2026-01-28, 22:24:12 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/tokenizer_config.json | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/tokenizer_config.json
12[2026-01-28, 22:24:13 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/vocab.txt | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/vocab.txt
13[2026-01-28, 22:24:13 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/tokenizer.json | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/tokenizer.json
14[2026-01-28, 22:24:13 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/training_args.bin | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/training_args.bin
15[2026-01-28, 22:24:14 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/scheduler.pt | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/scheduler.pt
16[2026-01-28, 22:24:14 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/trainer_state.json | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/trainer_state.json
17[2026-01-28, 22:24:14 UTC] {tone_check_training_dag.py:109} INFO - - File: /mnt/model-training/tone_check/20260128T134152/output_model/checkpoint-21530/optimizer.pt | wmf-ml-models | retrained-models/tone-check/checkpoint-21530/optimizer.pt
18[2026-01-28, 22:24:29 UTC] {tone_check_training_dag.py:112} INFO - [+] Files uploded correctly at: s3://wmf-ml-models/retrained-models/tone-check//
19[2026-01-28, 22:24:29 UTC] {python.py:240} INFO - Done. Returned value was: None

Here are the content of the S3 bucket:

$ s3cmd -c /etc/s3cmd/cfg.d/ml-team.cfg ls -H s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/
2026-01-28 22:24   865   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/config.json
2026-01-28 22:24   678M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/model.safetensors
2026-01-28 22:24  1357M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/optimizer.pt
2026-01-28 22:24    13K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/rng_state.pth
2026-01-28 22:24  1064   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/scheduler.pt
2026-01-28 22:24   695   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/special_tokens_map.json
2026-01-28 22:24     2M  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/tokenizer.json
2026-01-28 22:24  1330   s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/tokenizer_config.json
2026-01-28 22:24     9K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/trainer_state.json
2026-01-28 22:24     5K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/training_args.bin
2026-01-28 22:24   972K  s3://wmf-ml-models/retrained-models/tone-check/checkpoint-21530/vocab.txt