Page MenuHomePhabricator

Build model training pipeline for tone check using WMF ML Airflow instance
Open, Needs TriagePublic

Description

As part of the effort to T391940: FY2024-25 Q4 Goal: Productionize tone check model, the ML team would like to create a reproducible and maintainable training pipeline using WMF's ML Airflow instance.

This will make the model training iterations easier and the process less error-prone by version-controlling the code and reviewing it through code reviews.

We'll tackle the following:

  • answer initial planning questions based on the ML team's discussions:
    1. Why do we use Airflow and what are the main benefits of it?
    2. What is the repo that we're going to use as a codebase? What are the practices that are followed in WMF?
    3. What are the data accesses that we need to tackle this work?
    4. What are the steps that we need to perform in each DAG?
    5. What is the status of the Airflow instances? What is the difference between the old vs k8s Airflow instances? Which Airflow instance are we going to use?
    6. Which Airflow operators shall we use for each of the pipeline steps?
    7. How do we ship the code? Do we package everything in a Docker image? How is the DAG logic shipped to the Airflow instance?
  • restructure the training notebook, and consider retraining/fine-tuning workflow
  • figure out using WMFKubernetesPodOperator for model training

Details

Other Assignee
brouberol
Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
ml: add DAG to copy tone-check training data from HDFS to PVCrepos/data-engineering/airflow-dags!1707kevinbaziratone_check_data_copymain
ml: add tone-check retraining DAGrepos/data-engineering/airflow-dags!1662kevinbaziratone_check_scheduling_job_logicmain
common/kubernetes: add node_anti_affinity_for_hostnames utility to kubernetes helpersrepos/data-engineering/airflow-dags!1660kevinbaziranode_anti_affinity_for_hostnamesmain
Add pre-commit and ruff configuration for code QArepos/machine-learning/ml-pipelines!3kevinbaziraadd_pre-commit_checksmain
machine-learning: Add ml-pipelines reporepos/releng/gitlab-trusted-runner!117gkyziridismainmain
machine-learning: Add ml-pipelines reporepos/releng/gitlab-trusted-runner!116gkyziridismainmain
Customize query in GitLab

Related Objects

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
isarantopoulos renamed this task from Build model training pipeline using WMF ML Airflow instance to Build model training pipeline for tone check using WMF ML Airflow instance.Jul 15 2025, 2:41 PM

Thanks for your excellent summary @gkyziridis.

Solutions & Brainstorming
  • Passing data/model/files into the container:
    • Either include boto3 library inside docker and include the corresponding code for initialising an s3 session to download model from s3 inside the container:
      • This needs some work from DE for configuring the docker running on the pod having access to s3 buckets, thanos, zwift. (similar work that they have done for setting up the access on s3 component on the airflow level)
    • Either build and set up PVC or a common space in airflow where all the components (both pure airflow and Kubernetes components) can have access:
      • This needs to be set up from DE team.

My suggestion is that we go for your second option here, which is to create a persistent volume that you can use to hold the model. You will be able to make this same volume available to any of your tasks.

This means that you will be able to download the model in one task (using boto3) and then in a subsequent task, feed this model file to the retraining task.

When that task succeeds, you will be able to have another task that pushes the updated model back to your s3/swift bucket.


However, we haven't set up anything exactly like this before, so it needs a little thinking about, before we rush to implementation. Please excuse me for thinking out loud here...

I have a few ideas, so we can decide which idea we like the best.

Idea 1

In each of our Airflow instances, we currently have two persistent volume claims.

  • airflow-dags-pvc - this is where the DAGs are distributed between pods
  • airflow-kerberos-token-pvc - this is where the kerberos credential cache is distributed between pods

We could update the airflow chart to make it possible to deploy additional PVCs alongside the airflow production deployment.

Idea 2

We could create another release within the helmfile.d/dse-k8s-services/airflow-ml/helmfile.yaml file that only deploys a PVC

Idea 3

What we do with the dumps on airflow, is to use an additional Kubernetes namespace.

  • In this case, we use a namespace called mediawiki-dumps-legacy.
  • A PVC is created in this namespace by the mediawiki chart.
  • The dumps DAGs all launch their tasks using the KubernetesPodOperator into this namespace.

Now, this is probably a bit more complicated than we would need it to be for this case, but there could be advantages.


For simplicity's sake, I think that idea 2 is probably the most appealing. It doesn't require any update to the Airflow chart and I think that all of the permissions should be in place to be able to make it work already.

When you come to use the volume, you would modify your DAG to cause the pods to mount the volume by using the techniques outlined here: https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#how-to-use-cluster-configmaps-secrets-and-volumes-with-pod

There are some examples in the test_k8s/dumps DAGs and we will be able to help you to get these working.

So I'll proceed with idea 2 now and get a patch ready. It doesn't preclude us from implementing either of the other two ideas at a later date, if that turns out to be a useful approach.

How large a volume do you think you would like to begin with?

Idea 2

We could create another release within the helmfile.d/dse-k8s-services/airflow-ml/helmfile.yaml file that only deploys a PVC

Idea 3

What we do with the dumps on airflow, is to use an additional Kubernetes namespace.

  • In this case, we use a namespace called mediawiki-dumps-legacy.
  • A PVC is created in this namespace by the mediawiki chart.
  • The dumps DAGs all launch their tasks using the KubernetesPodOperator into this namespace.

Now, this is probably a bit more complicated than we would need it to be for this case, but there could be advantages.


For simplicity's sake, I think that idea 2 is probably the most appealing. It doesn't require any update to the Airflow chart and I think that all of the permissions should be in place to be able to make it work already.

When you come to use the volume, you would modify your DAG to cause the pods to mount the volume by using the techniques outlined here: https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#how-to-use-cluster-configmaps-secrets-and-volumes-with-pod

There are some examples in the test_k8s/dumps DAGs and we will be able to help you to get these working.

So I'll proceed with idea 2 now and get a patch ready. It doesn't preclude us from implementing either of the other two ideas at a later date, if that turns out to be a useful approach.

How large a volume do you think you would like to begin with?

Hey @BTullis thnx a lot for your response and for stating here your ideas.

I think we can go for the Idea 2 which looks nice and efficient.

We have discussed two main points with my team:

  1. Is it possible to create a PVC which we can adjust its volume in future? If we use this PVC as a central space for all the retraining jobs for all models that means that we need to adjust its space accordingly. That means that this space will be larger and larger.
  2. For the tone-check model retraining we need the following folder structure:
    • Data/
      • retraining_data.csv (around 500Mb)
    • BaseModels/
      • bert-base-multilingual-cased_peacock_512 (2.2Gb)
    • TrainedModels/
      • retrained_bert_peacock (2.3Gb)

So all in all, is it possible to start with a 5-6Gb volume? And have the option to upgrade it easily in the future?
I am tagging @AikoChou since we discussed it together.

Sorry for the delay on this one. Tagging it so that ping it up asap and progress.

Hey @BTullis thnx for working on this.
I am exploring Idea 3:

What we do with the dumps on airflow, is to use an additional Kubernetes namespace.

  • In this case, we use a namespace called mediawiki-dumps-legacy.
  • A PVC is created in this namespace by the mediawiki chart.
  • The dumps DAGs all launch their tasks using the KubernetesPodOperator into this namespace.

Now, this is probably a bit more complicated than we would need it to be for this case, but there could be advantages.

I have some questions:
I see the mediawiki-dumps-legacy configurations which are deployed as dumps in deployment-charts having a mount path, e.g., "/mnt/dumpsdata".
Where this path actually exists and how I can get access to it?
How I can store the base model and the training data in that mount directory?

@gkyziridis I'm going to take this over from @BTullis as he's going to be OOO for a couple of days.

I see the mediawiki-dumps-legacy configurations which are deployed as dumps in deployment-charts having a mount path, e.g., "/mnt/dumpsdata".

That'd be here, aka in the override values we inject into the charts from the release helmfile.d directory.

However, there's a lot more to unpack in that project that it seems, and much more than (I think) you need. Let me walk you through it.

The helmfile.d/dse-k8s-services/mediawiki-dumps-legacy production release actually renders 2 charts:

  • mediawiki # rendered to generate a CronJob resource
  • mediawiki-dumps-legacy # rendered to generate ConfigMap resources specific to the dumps themselves

Running helmfile -e dse-k8s-eqiad template | grep -C 10 /mnt we see the following:

# Source: mediawiki/templates/cronjob.yaml.tpl
apiVersion: batch/v1
kind: CronJob
metadata:
  name: mediawiki-production-dumps-job-template
  labels:
    app: mediawiki
    chart: mediawiki-0.10.7
    release: production
    heritage: Helm

spec:
...
            # GeoIP data
            - name: mediawiki-production-geoip
              mountPath: /usr/share/GeoIP/
              readOnly: true
            - name: mediawiki-production-geoipinfo
              mountPath: /usr/share/GeoIPInfo/
              readOnly: true
            - name: mediawiki-production-dumps
              mountPath: /mnt/dumpsdata

          - name: mediawiki-production-tls-proxy
            image: docker-registry.discovery.wmnet/envoy:1.23.10-3
            imagePullPolicy: IfNotPresent
            env:
              - name: SERVICE_NAME
                value: production
              - name: SERVICE_ZONE
                value: "default"
              - name: CONCURRENCY
                value: "12"
              - name: ADMIN_PORT
...
---
...
# Source: mediawiki-dumps-legacy/templates/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mediawiki-dumps-legacy-configs
  labels:
    app: mediawiki-dumps-legacy
    chart: mediawiki-dumps-legacy-0.0.32
    release: resources
    heritage: Helm
  namespace: mediawiki-dumps-legacy
data:
  wikidump.conf.dumps: |
    [wiki]
    dblist=/srv/mediawiki/dblists/all.dblist
    privatelist=/srv/mediawiki/dblists/private.dblist
    closedlist=/srv/mediawiki/dblists/closed.dblist
    skipdblist=/etc/dumps/dblists/skip.dblist
    flowlist=/srv/mediawiki/dblists/flow.dblist
    dir=/srv/mediawiki
    adminsettings=private/PrivateSettings.php
    tablejobs=/etc/dumps/confs/table_jobs.yaml
    multiversion=/srv/mediawiki/multiversion

    [output]
    public=/mnt/dumpsdata/xmldatadumps/public
    private=/mnt/dumpsdata/xmldatadumps/private
    temp=/mnt/dumpsdata/xmldatadumps/temp
    templatedir=/etc/dumps/templs
    index=backup-index.html
    webroot=http://download.wikimedia.org
    fileperms=0o644
...

Now, the real work actually happens in airflow-dags. If you look at the mediawiki_adds_changes_dump DAG definition, you'll see that it is composed of 3 steps:

  1. fetch pod spec from cronjob
  2. run the dumps using the pod spec (+ commands and arguments) fetched from step 1.
  3. sync the dump artifacts to some other hosts

The first step queries the Kubernetes API to get the pod spec within the CronJob resource I pasted earlier, than contains /mnt/dumpsdata (source).
The second step then builds a Pod spec using the result from the first step, while also mounting some ConfigMap resources in various places. (See volumes and volumemounts definition.)

Your problem is actually easier, because it involves way less moving pieces.

We'd define a PersistentVolumeClaim giving you (say) 50GB of disk space in Ceph, under deployment-charts/helmfile.d/dse-k8s-services/airflow-ml/helmfile.yaml. Let's call that PersistentVolumeClaim resource airflow-ml-model-training. Assuming you want that volume to be mounted in /mnt/model-retrain, you'd basically add the following arguments to your KubernetesPodOperator task,

from wmf_airflow_common.kubernetes import make_persistent_volume_claim_volume, make_volumemount

PVC_NAME = "airflow-ml-model-training"

with DAG(...) as dag:
    task = KubernetesPodOperator(
        ...,
        volumes=[make_persistent_volume_claim_volume(pvc_name=PVC_NAME, read_only=False)],
        volume_mounts=[make_volumemount(volume_name=PVC_NAME, mount_path="/mnt/model-retrain")],
        ...
    )

By doing this for all tasks in your DAG that need to access to the model, you'd then be able to mount that volume at all steps, and thus share data between each step,

Does that make sense?

Thank you for the detailed explanation above @brouberol, it is much appreciated.

Your problem is actually easier, because it involves way less moving pieces.
We'd define a PersistentVolumeClaim giving you (say) 50GB of disk space in Ceph, under deployment-charts/helmfile.d/dse-k8s-services/airflow-ml/helmfile.yaml. Let's call that PersistentVolumeClaim resource airflow-ml-model-retrain-scratch-space. Assuming you want that volume to be mounted in /mnt/model-retrain, you'd basically add the following arguments to your KubernetesPodOperator task

By doing this for all tasks in your DAG that need to access to the model, you'd then be able to mount that volume at all steps, and thus share data between each step,

Does that make sense?

Yes it makes total sense. When do you expect the PVC to be ready?

I can probably get to it by tomorrow!

Change #1181641 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-ml: define an RBD volume claim used as a model training scratch space

https://gerrit.wikimedia.org/r/1181641

Change #1181641 merged by Brouberol:

[operations/deployment-charts@master] airflow-ml: define an RBD volume claim used as a model training scratch space

https://gerrit.wikimedia.org/r/1181641

Change #1181656 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-ml: fix typo in storage quantity and storage class name

https://gerrit.wikimedia.org/r/1181656

Change #1181656 merged by Brouberol:

[operations/deployment-charts@master] airflow-ml: fix typo in storage quantity and storage class name

https://gerrit.wikimedia.org/r/1181656

Change #1181658 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-ml: drop -pvc suffix in the PVC name

https://gerrit.wikimedia.org/r/1181658

Change #1181658 merged by Brouberol:

[operations/deployment-charts@master] airflow-ml: drop -pvc suffix in the PVC name

https://gerrit.wikimedia.org/r/1181658

brouberol@deploy1003:~$ k get pvc airflow-ml-model-training
NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
airflow-ml-model-training   Bound    pvc-d324c87c-e0ad-41de-8ed9-ad7095b4c4e7   20Gi       RWO            ceph-rbd-ssd   2s

The PVC is now available.

Thnx for deploying this @brouberol.
Where this space actually exists?
I see that this is a ceph-rbd-ssd storage but how I can access it and copy files in it?

This exists in our Ceph cluster. If you want to read/write to it, you need to mount it in your airflow task pod, like I mentioned at the end of https://phabricator.wikimedia.org/T396495#11101114

This exists in our Ceph cluster. If you want to read/write to it, you need to mount it in your airflow task pod, like I mentioned at the end of https://phabricator.wikimedia.org/T396495#11101114

Thank you for the quick response and apologies for the too many questions.
So in the following scenario: I have a base model in an S3 bucket and a csv locally on my machine (the csv will be actually generated from an airflow pipeline and it is needed to exist in that PVC space before the retraining starts).
Should we always run an airflow job for copying those files into PVC_NAME = "airflow-ml-model-training" and then fire the retraining job attaching this mount?
For example:

from wmf_airflow_common.operators.kubernetes import KubernetesPodOperator
from wmf_airflow_common.clients.s3 import get_s3_client

with DAG(...) as dag:

    # Step 1. Download model from S3, add model and data in PVC
    download_model_docker_image = "docker-registry.wikimedia.org/repos/machine-learning/ml-pipelines"
    model_docker_tag = "job-5" 
    task_1 = KubernetesPodOperator(
        task_id="download_model_from_s3",
        name="download_model_from_s3",
        image=f"{download_model_docker_image}:{model_docker_tag}",
        volumes=[make_persistent_volume_claim_volume(pvc_name=PVC_NAME, read_only=False)],
        volume_mounts=[make_volumemount(volume_name=PVC_NAME, mount_path="/mnt/model-retrain")],
        ...
    )

    # Step 2. Train model
    training_docker_image = "docker-registry.wikimedia.org/repos/machine-learning/ml-pipelines"
    training_docker_tag = "job-588276"
    run_k8s_pod = KubernetesPodOperator(
        task_id="tonecheck_retrain",
        name="tonecheck_retrain",
        image=f"{training_docker_image}:{training_docker_tag}",
        volumes=[make_persistent_volume_claim_volume(pvc_name=PVC_NAME, read_only=False)],
        volume_mounts=[make_volumemount(volume_name=PVC_NAME, mount_path="/mnt/model-retrain")],
        ...
    )

If the above scenario is correct:

  1. Should we always run the job for copying these files into the PVC when we fire the retraining DAG?
  2. Shall the download_model_from_s3 job exists inside a docker image which will be executed by the KubernetesPodOperator or there are other ways (PythonOperator for instance)?
  3. Can I use the get_s3_client from wmf_airflow_common.clients.s3 inside a docker container ?

I guess it depends on whether the csv is expected to change regularly. If not, we could create a one-off pod mounting the PVC and copy the csv file into the pod and from there into the volume.

If the CSV is expected to change regularly, I would probably define an airflow Dataset representing the file, and set it as an outlet of the airflow pipeline responsible for generating it in the first place, and an inlet of the model training DAG (see https://airflow.apache.org/docs/apache-airflow/2.10.5/authoring-and-scheduling/datasets.html).

This way, everytime the first pipeline runs and updates the CSV file, airflow knows that it needs to re-run the model training DAG.

WDYT?

If the CSV is expected to change regularly, I would probably define an airflow Dataset representing the file, and set it as an outlet of the airflow pipeline responsible for generating it in the first place, and an inlet of the model training DAG (see https://airflow.apache.org/docs/apache-airflow/2.10.5/authoring-and-scheduling/datasets.html).

This way, everytime the first pipeline runs and updates the CSV file, airflow knows that it needs to re-run the model training DAG.

This is a nice idea. Basically the whole retraining pipeline exists in two steps:

  1. Generating data pipeline
  2. Retraining model

We haven't yet conclude on the frequency of the tone-check retrain.

I guess it depends on whether the csv is expected to change regularly. If not, we could create a one-off pod mounting the PVC and copy the csv file into the pod and from there into the volume.

The one-off pod mounting seems a nice idea for the current state. We shall do a "one-off" for copying both the current csv file into the PVC and the base model.
The base model would never change so it is always needed to exist in the PVC (so the 'one-off' logic fits perfectly).
The csv generation will be part of the pipeline so I will discuss the AirflowDataset idea with @AikoChou, but for now it would be perfect if we do a "one-off" copy the current csv and the base-model in order to test the retraining docker image along with the PVC mount.

Tell me where I can find that CSV (on which host and which path) and where you would like to be located in the volume. I'll deal with the rest while documenting the steps.

Model and Dataset:

The PVC structure looks like this:

In order to be mounted like this:

docker run --rm \
  -v $(pwd)/data:/srv/edit_check/training/tone_check/data \
  -v $(pwd)/base_model:/srv/edit_check/training/tone_check/base_model \
  -v $(pwd)/output:/srv/edit_check/training/tone_check/output \
  retrain:slim

More info for docker run in this ticket: https://phabricator.wikimedia.org/T401007#11054192

Both drive links give me an access denied. Do you only need me to upload the CSV, or both the model and the CSV?

This_link works better. It would be good to add both data and base model into the PVC if would that be easy for you. Please create an empty folder called output/ for the exported retrained-model, following the structure:

.
├── base_model/
│   ├── config.json
│   ├── model.safetensors
│   ├── optimizer.pt
│   ├── rng_state.pth
│   ├── scheduler.pt
│   ├── special_tokens_map.json
│   ├── tokenizer_config.json
│   ├── tokenizer.json
│   ├── trainer_state.json
│   ├── training_args.bin
│   └── vocab.txt
├── data/
│   └── peacock_detection_dataset.csv
├── output/

The retraining docker image reads this config:

"""Tonecheck model configuration script."""
from pathlib import Path

# Automatically resolve the absolute project root
PROJECT_ROOT = Path(__file__).resolve().parents[3]

# Paths
BASE_MODEL_PATH = f"{PROJECT_ROOT}/training/tone_check/base_model"
OUTPUT_MODEL_PATH = f"{PROJECT_ROOT}/training/tone_check/output"
TRAINING_DATA_PATH = (
    f"{PROJECT_ROOT}/training/tone_check/data/peacock_detection_dataset.csv"
)

And it was running during local testing like:

docker run --rm \
  -v $(pwd)/data:/srv/edit_check/training/tone_check/data \
  -v $(pwd)/base_model:/srv/edit_check/training/tone_check/base_model \
  -v $(pwd)/output:/srv/edit_check/training/tone_check/output \
  retrain:slim

I first copied the files onto the deployment server:

$ scp Downloads/peacock_detection_dataset.csv deployment.eqiad.wmnet:~/
$ scp -r Downloads/base_model deployment.eqiad.wmnet:~/

I then created a pod with the Ceph volume mounted:

brouberol@deploy1003:~$ cat ml-pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: ml-copy-data
  namespace: airflow-ml
spec:
  containers:
  - args:
    - infinity
    command:
    - sleep
    image: docker-registry.discovery.wmnet/repos/data-engineering/airflow:2025-06-23-122527-798105179248020e79ffe44a8cb442d8675e9db7
    name: base
    resources:
      limits:
        cpu: "1"
        memory: 2Gi
      requests:
        cpu: "1"
        memory: 2Gi
    securityContext:
      allowPrivilegeEscalation: false
      capabilities:
        drop:
        - ALL
      runAsNonRoot: true
      seccompProfile:
        type: RuntimeDefault
    volumeMounts:
    - mountPath: /mnt/model-training
      name: airflow-ml-model-training
  securityContext:
    fsGroup: 900
    fsGroupChangePolicy: OnRootMismatch
  volumes:
  - name: airflow-ml-model-training
    persistentVolumeClaim:
      claimName: airflow-ml-model-training
brouberol@deploy1003:~$ kubectl create -f ml-pod.yaml
pod/ml-copy-data created

I then exec-ed into the file to create the file structure:

brouberol@deploy1003:~$ kubectl exec -it ml-copy-data -- bash
airflow@ml-copy-data:/opt/airflow$ cd /mnt/model-training/
airflow@ml-copy-data:/mnt/model-training$ mkdir data output

and finally copied the file into the pod:

brouberol@deploy1003:~/ml-data$ kubectl cp ./base_model/ ml-copy-data:/mnt/model-training/
brouberol@deploy1003:~/ml-data$ kubectl cp peacock_detection_dataset.csv ml-copy-data:/mnt/model-training/data/

We can see that we have the requested file structure:

airflow@ml-copy-data:/opt/airflow$ ls -R /mnt/model-training/
/mnt/model-training/:
base_model  data  lost+found  output

/mnt/model-training/base_model:
config.json   rng_state.pth  special_tokens_map.json  tokenizer_config.json  training_args.bin
optimizer.pt  scheduler.pt   tokenizer.json           trainer_state.json     vocab.txt

/mnt/model-training/data:
peacock_detection_dataset.csv

/mnt/model-training/lost+found:

/mnt/model-training/output:

I then simply deleted the pod.

Oh, should these have been located under /mnt/model-training/tone-check/ instead of /mnt/model-training/ ?

Oh, should these have been located under /mnt/model-training/tone-check/ instead of /mnt/model-training/ ?

The retraining docker-image lives in /srv/edit_check/.
Since it is reading this config:

from pathlib import Path

# Automatically resolve the absolute project root
PROJECT_ROOT = Path(__file__).resolve().parents[3]

BASE_MODEL_PATH = f"{PROJECT_ROOT}/training/tone_check/base_model"
OUTPUT_MODEL_PATH = f"{PROJECT_ROOT}/training/tone_check/output"
TRAINING_DATA_PATH = (
    f"{PROJECT_ROOT}/training/tone_check/data/peacock_detection_dataset.csv"
)

The PROJECT_ROOT in this case is: /srv/edit_check/ and the mount needs a structure like this:

# /srv/edit_check/training/tone_check/base_model/
# /srv/edit_check/training/tone_check/data/
# /srv/edit_check/training/tone_check/output

somebody@e4bea63ad207:/srv/edit_check/training/tone_check$ ls -l
total 4
drwxr-xr-x 13 somebody somebody  416 Jul 31 14:56 base_model
drwxr-xr-x  3 somebody somebody   96 Aug  1 12:11 data
drwxr-xr-x  4 somebody somebody  128 Aug 25 15:39 output
drwxr-xr-x  1 somebody somebody 4096 Aug 25 15:37 retrain

I was mounting local folders during the docker-run like this (if this helps):

docker run --rm \
  -v $(pwd)/data:/srv/edit_check/training/tone_check/data \
  -v $(pwd)/base_model:/srv/edit_check/training/tone_check/base_model \
  -v $(pwd)/output:/srv/edit_check/training/tone_check/output \
  retrain:slim

Sure, I mounted the volume under /mnt/model-training and you seem to be wanting to mount it under /srv/edit_check, which you will be free to in your DAG, no problem. What I need to know is how you need the file tree to be organized in the volume itself. Do you need the training/tone_check/ directories to exist in the Ceph data itself, to accomodate for multiple model training data to be stored in the same volume?

Aka are you happy with

. # Ceph volume mountpoint
├── base_model
├── data
└── output

or do you need

. # Ceph volume mountpoint
└── training
    └── tone_check
        ├── base_model
        ├── data
        └── output

Lets keep the second option:

. # Ceph volume mountpoint
└── training
    └── tone_check
        ├── base_model
        ├── data
        └── output

So then when we execute the docker-image with the mounted volume we would see:

somebody@e4bea63ad207:/srv/edit_check/training/tone_check$ ls -l
total 4
drwxr-xr-x 13 somebody somebody  416 Jul 31 14:56 base_model
drwxr-xr-x  3 somebody somebody   96 Aug  1 12:11 data
drwxr-xr-x  4 somebody somebody  128 Aug 25 15:39 output
airflow@ml-copy-data:/mnt/model-training$ find .
.
./training
./training/tone_check
./training/tone_check/data
./training/tone_check/data/peacock_detection_dataset.csv
./training/tone_check/base_model
./training/tone_check/base_model/training_args.bin
./training/tone_check/base_model/rng_state.pth
./training/tone_check/base_model/special_tokens_map.json
./training/tone_check/base_model/scheduler.pt
./training/tone_check/base_model/vocab.txt
./training/tone_check/base_model/trainer_state.json
./training/tone_check/base_model/config.json
./training/tone_check/base_model/optimizer.pt
./training/tone_check/base_model/tokenizer_config.json
./training/tone_check/base_model/tokenizer.json
./training/tone_check/output
./lost+found

All done!

Thank you for building the PVC and copying the files on it in order to start the testing. Much appreciated @brouberol .

I am running the DAG on airflow and I am having this error:

1ml-training-pipeline-tonecheck-retrain-bnd693q1
2 ▶ Log message source details
3[2025-08-26, 13:25:35 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
4[2025-08-26, 13:25:35 UTC] {pod.py:1228} INFO - Building pod tonecheck-retrain-tj9hob6 with labels: {'dag_id': 'ml_training_pipeline', 'task_id': 'tonecheck_retrain', 'run_id': 'manual__2025-08-26T130225.6190750000-debf98cff', 'kubernetes_pod_operator': 'True', 'try_number': '2'}
5[2025-08-26, 13:25:35 UTC] {pod.py:567} INFO - Found matching pod tonecheck-retrain-tj9hob6 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.10.5', 'app': 'airflow', 'component': 'task-pod', 'dag_id': 'ml_training_pipeline', 'kubernetes_pod_operator': 'True', 'release': 'dev-gkyziridis', 'routed_via': 'dev-gkyziridis', 'run_id': 'manual__2025-08-26T130225.6190750000-debf98cff', 'task_id': 'tonecheck_retrain', 'try_number': '2'}
6[2025-08-26, 13:25:35 UTC] {pod.py:568} INFO - `try_number` of task_instance: 2
7[2025-08-26, 13:25:35 UTC] {pod.py:569} INFO - `try_number` of pod: 2
8[2025-08-26, 13:25:35 UTC] {pod_manager.py:394} ▶ Waiting until 1300s to get the POD scheduled...
9[2025-08-26, 13:47:18 UTC] {pod.py:1076} INFO - Deleting pod: tonecheck-retrain-tj9hob6
10[2025-08-26, 13:47:18 UTC] {taskinstance.py:3313} ERROR - Task failed with exception
11Traceback (most recent call last):
12 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 647, in execute_sync
13 self.await_pod_start(pod=self.pod)
14 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 582, in await_pod_start
15 self.pod_manager.await_pod_start(
16 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 425, in await_pod_start
17 raise PodLaunchFailedException(
18airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException: Pod took too long to be scheduled on the cluster, giving up. More than 1300s. Check the pod events in kubernetes.
19During handling of the above exception, another exception occurred:
20Traceback (most recent call last):
21 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
22 result = _execute_callable(context=context, **execute_callable_kwargs)
23 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
24 return ExecutionCallableRunner(
25 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run
26 return self.func(*args, **kwargs)
27 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
28 return func(self, *args, **kwargs)
29 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 607, in execute
30 return self.execute_sync(context)
31 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 688, in execute_sync
32 self.cleanup(
33 File "/tmp/pyenv/versions/3.10.15/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1007, in cleanup
34 raise AirflowException(
35airflow.exceptions.AirflowException: Pod tonecheck-retrain-tj9hob6 returned a failure.
36remote_pod: {'api_version': None,
37 'kind': None,
38 'metadata': {'annotations': {'container.seccomp.security.alpha.kubernetes.io/base': 'runtime/default'},
39 'creation_timestamp': datetime.datetime(2025, 8, 26, 13, 25, 35, tzinfo=tzlocal()),
40 'deletion_grace_period_seconds': None,
41 'deletion_timestamp': None,
42 'finalizers': None,
43 'generate_name': None,
44 'generation': None,
45 'labels': {'airflow_kpo_in_cluster': 'True',
46 'airflow_version': '2.10.5',
47 'app': 'airflow',
48 'component': 'task-pod',
49 'dag_id': 'ml_training_pipeline',
50 'kubernetes_pod_operator': 'True',
51 'release': 'dev-gkyziridis',
52 'routed_via': 'dev-gkyziridis',
53 'run_id': 'manual__2025-08-26T130225.6190750000-debf98cff',
54 'task_id': 'tonecheck_retrain',
55 'try_number': '2'},
56 'managed_fields': [{'api_version': 'v1',
57 'fields_type': 'FieldsV1',
58 'fields_v1': {'f:metadata': {'f:labels': {'.': {},
59 'f:airflow_kpo_in_cluster': {},
60 'f:airflow_version': {},
61 'f:app': {},
62 'f:component': {},
63 'f:dag_id': {},
64 'f:kubernetes_pod_operator': {},
65 'f:release': {},
66 'f:routed_via': {},
67 'f:run_id': {},
68 'f:task_id': {},
69 'f:try_number': {}}},
70 'f:spec': {'f:affinity': {},
71 'f:containers': {'k:{"name":"base"}': {'.': {},
72 'f:env': {'.': {},
73 'k:{"name":"AWS_REQUEST_CHECKSUM_CALCULATION"}': {'.': {},
74 'f:name': {},
75 'f:value': {}},
76 'k:{"name":"AWS_RESPONSE_CHECKSUM_VALIDATION"}': {'.': {},
77 'f:name': {},
78 'f:value': {}},
79 'k:{"name":"REQUESTS_CA_BUNDLE"}': {'.': {},
80 'f:name': {},
81 'f:value': {}}},
82 'f:image': {},
83 'f:imagePullPolicy': {},
84 'f:name': {},
85 'f:resources': {'.': {},
86 'f:limits': {'.': {},
87 'f:cpu': {},
88 'f:memory': {}},
89 'f:requests': {'.': {},
90 'f:cpu': {},
91 'f:memory': {}}},
92 'f:securityContext': {'.': {},
93 'f:allowPrivilegeEscalation': {},
94 'f:capabilities': {'.': {},
95 'f:drop': {}},
96 'f:runAsNonRoot': {},
97 'f:seccompProfile': {'.': {},
98 'f:type': {}}},
99 'f:terminationMessagePath': {},
100 'f:terminationMessagePolicy': {},
101 'f:volumeMounts': {'.': {},
102 'k:{"mountPath":"/srv/edit_check"}': {'.': {},
103 'f:mountPath': {},
104 'f:name': {},
105 'f:readOnly': {}}}}},
106 'f:dnsPolicy': {},
107 'f:enableServiceLinks': {},
108 'f:priorityClassName': {},
109 'f:restartPolicy': {},
110 'f:schedulerName': {},
111 'f:securityContext': {},
112 'f:terminationGracePeriodSeconds': {},
113 'f:volumes': {'.': {},
114 'k:{"name":"airflow-ml-model-training-volume"}': {'.': {},
115 'f:name': {},
116 'f:persistentVolumeClaim': {'.': {},
117 'f:claimName': {}}}}}},
118 'manager': 'OpenAPI-Generator',
119 'operation': 'Update',
120 'subresource': None,
121 'time': datetime.datetime(2025, 8, 26, 13, 25, 35, tzinfo=tzlocal())},
122 {'api_version': 'v1',
123 'fields_type': 'FieldsV1',
124 'fields_v1': {'f:status': {'f:conditions': {'.': {},
125 'k:{"type":"PodScheduled"}': {'.': {},
126 'f:lastProbeTime': {},
127 'f:lastTransitionTime': {},
128 'f:message': {},
129 'f:reason': {},
130 'f:status': {},
131 'f:type': {}}}}},
132 'manager': 'kube-scheduler',
133 'operation': 'Update',
134 'subresource': 'status',
135 'time': datetime.datetime(2025, 8, 26, 13, 25, 35, tzinfo=tzlocal())}],
136 'name': 'tonecheck-retrain-tj9hob6',
137 'namespace': 'airflow-dev',
138 'owner_references': None,
139 'resource_version': '728355394',
140 'self_link': None,
141 'uid': 'ae55396c-9473-4fee-8c7b-1ebbe228485b'},
142 'spec': {'active_deadline_seconds': None,
143 'affinity': {'node_affinity': None,
144 'pod_affinity': None,
145 'pod_anti_affinity': None},
146 'automount_service_account_token': None,
147 'containers': [{'args': None,
148 'command': None,
149 'env': [{'name': 'REQUESTS_CA_BUNDLE',
150 'value': '/etc/ssl/certs/ca-certificates.crt',
151 'value_from': None},
152 {'name': 'AWS_REQUEST_CHECKSUM_CALCULATION',
153 'value': 'WHEN_REQUIRED',
154 'value_from': None},
155 {'name': 'AWS_RESPONSE_CHECKSUM_VALIDATION',
156 'value': 'WHEN_REQUIRED',
157 'value_from': None}],
158 'env_from': None,
159 'image': 'docker-registry.wikimedia.org/repos/machine-learning/ml-pipelines:job-591470',
160 'image_pull_policy': 'IfNotPresent',
161 'lifecycle': None,
162 'liveness_probe': None,
163 'name': 'base',
164 'ports': None,
165 'readiness_probe': None,
166 'resize_policy': None,
167 'resources': {'claims': None,
168 'limits': {'cpu': '2', 'memory': '3Gi'},
169 'requests': {'cpu': '1',
170 'memory': '1500Mi'}},
171 'restart_policy': None,
172 'security_context': {'allow_privilege_escalation': False,
173 'app_armor_profile': None,
174 'capabilities': {'add': None,
175 'drop': ['ALL']},
176 'privileged': None,
177 'proc_mount': None,
178 'read_only_root_filesystem': None,
179 'run_as_group': None,
180 'run_as_non_root': True,
181 'run_as_user': None,
182 'se_linux_options': None,
183 'seccomp_profile': {'localhost_profile': None,
184 'type': 'RuntimeDefault'},
185 'windows_options': None},
186 'startup_probe': None,
187 'stdin': None,
188 'stdin_once': None,
189 'termination_message_path': '/dev/termination-log',
190 'termination_message_policy': 'File',
191 'tty': None,
192 'volume_devices': None,
193 'volume_mounts': [{'mount_path': '/srv/edit_check',
194 'mount_propagation': None,
195 'name': 'airflow-ml-model-training-volume',
196 'read_only': True,
197 'recursive_read_only': None,
198 'sub_path': None,
199 'sub_path_expr': None},
200 {'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
201 'mount_propagation': None,
202 'name': 'kube-api-access-l5kt5',
203 'read_only': True,
204 'recursive_read_only': None,
205 'sub_path': None,
206 'sub_path_expr': None}],
207 'working_dir': None}],
208 'dns_config': None,
209 'dns_policy': 'ClusterFirst',
210 'enable_service_links': True,
211 'ephemeral_containers': None,
212 'host_aliases': None,
213 'host_ipc': None,
214 'host_network': None,
215 'host_pid': None,
216 'host_users': None,
217 'hostname': None,
218 'image_pull_secrets': None,
219 'init_containers': None,
220 'node_name': None,
221 'node_selector': None,
222 'os': None,
223 'overhead': None,
224 'preemption_policy': 'PreemptLowerPriority',
225 'priority': -100,
226 'priority_class_name': 'low-priority-pod',
227 'readiness_gates': None,
228 'resource_claims': None,
229 'restart_policy': 'Never',
230 'runtime_class_name': None,
231 'scheduler_name': 'default-scheduler',
232 'scheduling_gates': None,
233 'security_context': {'app_armor_profile': None,
234 'fs_group': None,
235 'fs_group_change_policy': None,
236 'run_as_group': None,
237 'run_as_non_root': None,
238 'run_as_user': None,
239 'se_linux_options': None,
240 'seccomp_profile': None,
241 'supplemental_groups': None,
242 'supplemental_groups_policy': None,
243 'sysctls': None,
244 'windows_options': None},
245 'service_account': 'default',
246 'service_account_name': 'default',
247 'set_hostname_as_fqdn': None,
248 'share_process_namespace': None,
249 'subdomain': None,
250 'termination_grace_period_seconds': 30,
251 'tolerations': [{'effect': 'NoExecute',
252 'key': 'node.kubernetes.io/not-ready',
253 'operator': 'Exists',
254 'toleration_seconds': 300,
255 'value': None},
256 {'effect': 'NoExecute',
257 'key': 'node.kubernetes.io/unreachable',
258 'operator': 'Exists',
259 'toleration_seconds': 300,
260 'value': None}],
261 'topology_spread_constraints': None,
262 'volumes': [{'aws_elastic_block_store': None,
263 'azure_disk': None,
264 'azure_file': None,
265 'cephfs': None,
266 'cinder': None,
267 'config_map': None,
268 'csi': None,
269 'downward_api': None,
270 'empty_dir': None,
271 'ephemeral': None,
272 'fc': None,
273 'flex_volume': None,
274 'flocker': None,
275 'gce_persistent_disk': None,
276 'git_repo': None,
277 'glusterfs': None,
278 'host_path': None,
279 'image': None,
280 'iscsi': None,
281 'name': 'airflow-ml-model-training-volume',
282 'nfs': None,
283 'persistent_volume_claim': {'claim_name': 'airflow-ml-model-training',
284 'read_only': None},
285 'photon_persistent_disk': None,
286 'portworx_volume': None,
287 'projected': None,
288 'quobyte': None,
289 'rbd': None,
290 'scale_io': None,
291 'secret': None,
292 'storageos': None,
293 'vsphere_volume': None},
294 {'aws_elastic_block_store': None,
295 'azure_disk': None,
296 'azure_file': None,
297 'cephfs': None,
298 'cinder': None,
299 'config_map': None,
300 'csi': None,
301 'downward_api': None,
302 'empty_dir': None,
303 'ephemeral': None,
304 'fc': None,
305 'flex_volume': None,
306 'flocker': None,
307 'gce_persistent_disk': None,
308 'git_repo': None,
309 'glusterfs': None,
310 'host_path': None,
311 'image': None,
312 'iscsi': None,
313 'name': 'kube-api-access-l5kt5',
314 'nfs': None,
315 'persistent_volume_claim': None,
316 'photon_persistent_disk': None,
317 'portworx_volume': None,
318 'projected': {'default_mode': 420,
319 'sources': [{'cluster_trust_bundle': None,
320 'config_map': None,
321 'downward_api': None,
322 'secret': None,
323 'service_account_token': {'audience': None,
324 'expiration_seconds': 3607,
325 'path': 'token'}},
326 {'cluster_trust_bundle': None,
327 'config_map': {'items': [{'key': 'ca.crt',
328 'mode': None,
329 'path': 'ca.crt'}],
330 'name': 'kube-root-ca.crt',
331 'optional': None},
332 'downward_api': None,
333 'secret': None,
334 'service_account_token': None},
335 {'cluster_trust_bundle': None,
336 'config_map': None,
337 'downward_api': {'items': [{'field_ref': {'api_version': 'v1',
338 'field_path': 'metadata.namespace'},
339 'mode': None,
340 'path': 'namespace',
341 'resource_field_ref': None}]},
342 'secret': None,
343 'service_account_token': None}]},
344 'quobyte': None,
345 'rbd': None,
346 'scale_io': None,
347 'secret': None,
348 'storageos': None,
349 'vsphere_volume': None}]},
350 'status': {'conditions': [{'last_probe_time': None,
351 'last_transition_time': datetime.datetime(2025, 8, 26, 13, 25, 35, tzinfo=tzlocal()),
352 'message': '0/20 nodes are available: 20 '
353 'persistentvolumeclaim '
354 '"airflow-ml-model-training" not found.',
355 'reason': 'Unschedulable',
356 'status': 'False',
357 'type': 'PodScheduled'}],
358 'container_statuses': None,
359 'ephemeral_container_statuses': None,
360 'host_i_ps': None,
361 'host_ip': None,
362 'init_container_statuses': None,
363 'message': None,
364 'nominated_node_name': None,
365 'phase': 'Pending',
366 'pod_i_ps': None,
367 'pod_ip': None,
368 'qos_class': 'Burstable',
369 'reason': None,
370 'resize': None,
371 'resource_claim_statuses': None,
372 'start_time': None}}
373[2025-08-26, 13:47:18 UTC] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=ml_training_pipeline, task_id=tonecheck_retrain, run_id=manual__2025-08-26T13:02:25.619075+00:00, execution_date=20250826T130225, start_date=20250826T132535, end_date=20250826T134718
374[2025-08-26, 13:47:18 UTC] {taskinstance.py:341} ▶ Post task execution logs

What seems strange to me:

  • Line 350 in logs:
'status': {'conditions': [{'last_probe_time': None,
                            'last_transition_time': datetime.datetime(2025, 8, 26, 13, 25, 35, tzinfo=tzlocal()),
                            'message': '0/20 nodes are available: 20 '
                                       'persistentvolumeclaim '
                                       '"airflow-ml-model-training" not found.',
                            'reason': 'Unschedulable',
                            'status': 'False',
                            'type': 'PodScheduled'}],
            'container_statuses': None,
            'ephemeral_container_statuses': None,
            'host_i_ps': None,
            'host_ip': None,
            'init_container_statuses': None,
            'message': None,
            'nominated_node_name': None,
            'phase': 'Pending',
            'pod_i_ps': None,
            'pod_ip': None,
            'qos_class': 'Burstable',
            'reason': None,
            'resize': None,
            'resource_claim_statuses': None,
            'start_time': None}}

I am not sure if this could be an issue. What do you think about it ?

The PVC does not exist in the airflow-dev namespace. It exists in the airflow-ml namespace (see the pod spec in https://phabricator.wikimedia.org/T396495#11115415).

root@deploy1003:~# kubectl get pvc -n airflow-ml airflow-ml-model-training
NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
airflow-ml-model-training   Bound    pvc-d324c87c-e0ad-41de-8ed9-ad7095b4c4e7   20Gi       RWO            ceph-rbd-ssd   2d22h
root@deploy1003:~# kubectl get pvc -n airflow-dev airflow-ml-model-training
Error from server (NotFound): persistentvolumeclaims "airflow-ml-model-training" not found

If you wanted to replicate the same setup in airflow-dev, I'd need to create another PVC in that namespace, and copy the same files around.

While I can do that, I also want to stress that copying these base files to the PVC should be automated at some point, via some DAG.

So, things are going to work in parallel.
Basically we need first to test the retraining docker along with PVC if works correctly.
Then we will set up the extra step to copying the csv file into this PVC as the last part of the data_generation_pipeline.
The base model will not be moved from this PVC.

Ok, so if I create an equivalent PVC in the airflow-dev namespace with the same files in it, you should be gtg?

I think yes! Otherwise I need to merge my branch in airflow-DAG main and then test it, but I do not think this is a good way right?
I need something to test the current setup and then I will make an MR for merging the retraining DAG into the main.
Question: if you create an equivalent PVC in the airflow-dev namespace then I can test the current setup. If afterwards we merge it into main (airflow-DAG), will be working as well? Or should we create another PVC ?

Question: if you create an equivalent PVC in the airflow-dev namespace then I can test the current setup. If afterwards we merge it into main (airflow-DAG), will be working as well? Or should we create another PVC ?

I think so yes, it's just that the DAG in main / airflow-ml will be using the PVC in the airflow-ml namespace.

Alright perfect. So, whenever have time implement a PVC with the same files on it under the airflow-dev in order to test the logic. And then we will merge it into main the DAG will use the PVC under the airflow-ml namespace.

brouberol@deploy1003:~$ cat pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-ml-model-training
spec:
  accessModes:
    - ReadWriteOnce
  volumeMode: Filesystem
  resources:
    requests:
      storage: 20Gi
  storageClassName: ceph-rbd-ssd
brouberol@deploy1003:~$ kube_env airflow-dev
brouberol@deploy1003:~$ kubectl create -n airflow-dev -f pvc.yaml
persistentvolumeclaim/airflow-ml-model-training created
brouberol@deploy1003:~$ kubectl get -n airflow-dev pvc airflow-ml-model-training
NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
airflow-ml-model-training   Bound    pvc-8a6a2920-8d7e-4616-8ab6-a6a70b26d116   20Gi       RWO            ceph-rbd-ssd   15s

We don't have a helmfile.yaml for airflow-dev so I had to create this one in an ad-hoc fashion.

I've copied all data and base_model files into that airflow-dev PVC:

airflow@ml-copy-data:/mnt/model-training$ find .
.
./lost+found
./training
./training/tone_check
./training/tone_check/base_model
./training/tone_check/base_model/config.json
./training/tone_check/base_model/model.safetensors
./training/tone_check/base_model/special_tokens_map.json
./training/tone_check/base_model/rng_state.pth
./training/tone_check/base_model/tokenizer_config.json
./training/tone_check/base_model/vocab.txt
./training/tone_check/base_model/tokenizer.json
./training/tone_check/base_model/training_args.bin
./training/tone_check/base_model/scheduler.pt
./training/tone_check/base_model/trainer_state.json
./training/tone_check/base_model/optimizer.pt
./training/tone_check/output
./training/tone_check/data
./training/tone_check/data/peacock_detection_dataset.csv

Following T396495#10941826 and the improvements @brouberol has helped add to ml-airflow, I have worked on a third iteration of the example ml training pipeline ( job logic, scheduling logic ) that uses the flow below:

etl_data_generation_dag -> HDFS -> copy_hdfs_to_pvc_dag -> PVC -> ml_training_dag

1.ETL Data Generation: Uses the SparkSubmitOperator to run a Spark job that accesses a table from the data lake, generates training data, and writes the output as a parquet file to HDFS.

ETL Data Generation Screenshot from 2025-09-05 10-43-48.png (1×1 px, 367 KB)

2.Copy HDFS to PVC: Uses the WMFKubernetesPodOperator to run a pod that copies the parquet file from HDFS into a shared PVC accessible. The pod is configured with the correct security context and permissions to mount and write to the PVC. (thanks to Balthazar's bug fix in T403687#11146727)

Copy HDFS to PVC Screenshot from 2025-09-05 10-48-32.png (1×1 px, 493 KB)

3.ML Training: Uses the WMFKubernetesPodOperator to run a pod that mounts the same PVC and runs a training script. The script reads the parquet data from the PVC, confirms GPU access, and writes the trained model artifact to the PVC. The pod requests GPU resources and is configured with appropriate CPU and memory limits. (thanks to Balthazar, Tobias, and Luca in P82559)

ML Training Screenshot from 2025-09-05 10-57-13.png (1×1 px, 513 KB)

Now that this example ml pipeline runs end-to-end using airflow-devenv, we are going to apply a similar pattern to the tone-check data generation and model training pipelines.

kevinbazira opened https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1660

common/kubernetes: add node_anti_affinity_for_hostnames utility to kubernetes helpers

kevinbazira merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1660

common/kubernetes: add node_anti_affinity_for_hostnames utility to kubernetes helpers

In T407212#11288359, we finally have a tone-check model training pipeline that runs end-to-end in the airflow-ml production instance.

Huge thanks to everyone who contributed to this effort, and special thanks to DPE SRE (@brouberol, @BTullis, @Gehel) who are always so kind and helpful whenever we request them to enable features in Airflow or provide permissions to access the data infrastructure.

With this, we have achieved the main goals of this task: a reproducible, maintainable ML pipeline leveraging WMF's Airflow platform. We have also documented the process for future reference: https://wikitech.wikimedia.org/wiki/Machine_Learning/Airflow_ML_Pipelines

Thanks again to all involved!