The automated test fixture system we have for Airflow (tests/test_on_disk_fixtures.py) generates and validates fixtures for SparkSubmitOperator, SkeinOperator/SimpleSkeinOperator, DatahubEmitterOperator, and BashOperator. As KubernetesPodOperator usage grows (24+ DAG files across test_k8s/ and ml/ instances), we need fixture coverage for these tasks too.
Description
Details
| Title | Reference | Author | Source Branch | Dest Branch | |
|---|---|---|---|---|---|
| global: Add fixture generation for KubernetesPodOperator tasks | repos/data-engineering/airflow-dags!2086 | xcollazo | add-fixtures-for-k8s-tasks | main |
Event Timeline
xcollazo opened https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/2086
global: Add fixture generation for KubernetesPodOperator tasks
KubernetesPodOperator Fixture Coverage — Decision Summary
- Deterministic pod names
WMFKubernetesPodOperator generates a random 7-char suffix for pod names. We mock random.choice to always return "a" (session-wide in setup_airflow()), making pod names
deterministic across runs.
- Dataset-triggered DAGs handled in render_task
DAGs with schedule=[Dataset(...)] use DatasetTriggeredTimetable, which returns an empty iterator from iter_dagrun_infos_between(). Rather than skipping these tasks, we
fixed render_task to catch the StopIteration and fall back to DataInterval.exact(task.start_date).
- XCom-dependent pod_template_dict mocked with realistic pod spec
Most dump KubernetesPodOperator tasks get their pod_template_dict from cronjob_pod_spec_fetcher() via XCom, which can't be resolved at test time. Instead of skipping
these tasks, we substitute a realistic mock pod spec (tests/test_k8s/fixtures/mock_cronjob_pod_spec.json, based on production data). This allows fixtures to capture
the rest of the operator configuration — cmds, arguments, resources, volumes, namespace, etc.
- Serialization approach
KubernetesPodOperator fixtures serialize: image, name, cmds, arguments, namespace, pod_template_file, env_vars, volumes, volume_mounts, container_resources, and
optionally full_pod_spec. Kubernetes model objects are converted to JSON-serializable dicts via ApiClient.sanitize_for_serialization().
- sql_xml dump DAGs — open question
The auto-generated sql_xml dump DAGs (scripts/generate_test_k8s_sql_xml_dump_dags.py) are currently excluded at collection time via exclude_path_segment="sql_xml". All
tasks within these DAGs are stamped from a single template, so coverage value per fixture is low. The numbers:
┌────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────┐ │ Option │ Fixture files added │ ├────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────┤ │ Current state (no sql_xml) │ 71 │ ├────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────┤ │ Add one representative DAG (e.g. large_a_to_z_full: 44 wikis × 18 KubernetesPodOperator tasks) │ +792 (863 total) │ ├────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────┤ │ Add all 10 sql_xml DAGs (~1000 wikis × 12–18 KubernetesPodOperator tasks each) │ ~+14,000 (14,071 total) │ └────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────┘
Do we want to cover sql_xml at all, and if so, at what level?
- Coverage result (current)
71 KubernetesPodOperator fixture files generated across test_k8s/, ml/, and search/ instances, covering all non-sql_xml KubernetesPodOperator tasks including the
XCom-dependent dump tasks.
sql_xml Dump DAG KubernetesPodOperator Coverage — Decision Summary
=== Problem ===
The sql_xml dump DAGs are auto-generated from a template (scripts/test_k8s_sql_xml_dump_dag.tpl) and contain ~100 wikis each, producing ~14,000 KubernetesPodOperator
tasks in total. Generating fixtures for all of them was impractical and provided little value since all tasks are stamped from a single template.
=== Solution ===
Added a wiki limit mechanism to the DAG template. When the env var AIRFLOW_TEST_SQL_XML_WIKI_LIMIT is set, each DAG slices its wiki_list to that many wikis:
if _wiki_limit := int(os.getenv("AIRFLOW_TEST_SQL_XML_WIKI_LIMIT", "0")):
wiki_list = wiki_list[:_wiki_limit]
The test suite sets AIRFLOW_TEST_SQL_XML_WIKI_LIMIT=1 in conftest.py, so each of the 10 sql_xml DAGs runs with one wiki during testing. The generated DAG files (under
test_k8s/dags/dumps/sql_xml/) are committed to the repo — the template is only re-run when DAGs need to be regenerated, so there is no runtime cost.
=== Additional fix ===
The sql_xml DAGs generate pod names using a Jinja macro random_pod_suffix(), which internally calls random.sample. The existing random.choice mock (used by
WMFKubernetesPodOperator) did not cover this, making pod names non-deterministic across runs. Added a second mock:
mock.patch.object(random, "sample", side_effect=lambda population, k: ["a"] * k).start()
=== Result ===
155 new fixture files generated — one wiki's worth of tasks per sql_xml DAG (10 DAGs × ~15 tasks each). Combined with the previous commit, the suite now covers all
KubernetesPodOperator tasks in the codebase: 226 fixture files total.xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/2086
global: Add fixture generation for KubernetesPodOperator tasks
xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/2086
global: Add fixture generation for KubernetesPodOperator tasks