As part of Airflow's dependency management system,
we want to have an automatized process in place,
that grabs all packages in a given Archiva repository and syncs them to a predetermined HDFS location.
That will allow teams to reference their dependencies in spark-submit calls with HDFS paths,
with no deployment step needed.
The only requirement would be to package the dependency and upload it to Archiva.
Description
Details
| Title | Reference | Author | Source Branch | Dest Branch | |
|---|---|---|---|---|---|
| wmf_airflow_lib with artifact syncing support | repos/data-engineering/airflow-dags!4 | otto | wmf_airflow_lib | main |
| Status | Subtype | Assigned | Task | ||
|---|---|---|---|---|---|
| Resolved | • odimitrijevic | T282033 Airflow collaborations | |||
| Resolved | Ottomata | T294024 [Airflow] Automate sync'ing archiva packages to HDFS | |||
| Declined | None | T288271 Make it possible to use anaconda + stacked conda envs for Airflow executors | |||
| Open | amastilovic | T322690 Add support for repository artifacts in Airflow | |||
| Resolved | Spike | amastilovic | T360968 [Developer Experience] [SPIKE] Investigate process to automate deployment of folders and artifacts to HDFS | ||
| Resolved | amastilovic | T365659 Implement automated deployment of refinery HQL files to HDFS (via blunderbuss) | |||
| Resolved | amastilovic | T371994 Deploy the HDFS synchronizer (blunderbuss) service to the dse-k8s cluster | |||
| Resolved | amastilovic | T392244 Facilitate automatic artifact cache warming for airflow-dags artifacts |
Event Timeline
I had an idea, then realized it won't work easily enough. Writing it here just for my reference.
fsspec supports fs abstractions around local, http hdfs (via pyarrow), etc.). I wondered if we could use its built in file caching to cache http files to hdfs;
Unfortunetly, the file caching only caches on the local file system. We could probably copy a lot of their CachingFileSystem and make a version that works either specifically with fsspec HDFS file system, or even better just make their CachingFileSystem use any of the fsspec filesystems as a cache storage...but I think that is probably overkill for this task.
ALRIGHT! Got some stuff working on the hadoop-test instance.
# Declare available artifact_sources artifact_sources: # Simple URL based artifact source. The id of # artifacts that use this source should just be a full URL. url: class_name: workflow_utils.artifact.FsArtifactSource # Archiva mirror-maven-central repository wmf_archiva_central: class_name: workflow_utils.artifact.MavenArtifactSource base_uri: https://archiva.wikimedia.org/repository/mirror-maven-central # Archiva python repository wmf_archiva_releases: class_name: workflow_utils.artifact.MavenArtifactSource base_uri: https://archiva.wikimedia.org/repository/releases # Archiva python repository wmf_archiva_python: class_name: workflow_utils.artifact.MavenArtifactSource base_uri: https://archiva.wikimedia.org/repository/python # Declare available artifact caches. artifact_caches: hdfs: class_name: workflow_utils.artifact.FsArtifactCache base_uri: hdfs:///tmp/airflow_artifact_cache/files # TODO: do we want to use a FsMavenArtifactCache as a default? # It will make the cache directory look nice! But will be strange # for non maven artifacts. # This will be used if no source is defined for your artifact default_artifact_source: url # This will be used if no caches are defined for your artifact default_artifact_caches: [hdfs]
Test deploying refinery-job.jar as an artifact to hdfs caches.
# Declare all artifacts that might be needed by jobs in the analytics airflow instance. artifacts: # Test syncing this artifact to caches refinery-job-0.1.20: id: org.wikimedia.analytics.refinery.job:refinery-job:0.1.20 source: wmf_archiva_releases
scap checks.yaml will run artifact-cache warm during promote stage of deployment:
checks: artifacts_sync: type: command stage: promote # artifact-cache is a CLI script installed into the # /usr/lib/airflow conda environment via the # data-engineering/workflow_utils python library. # It uses artifact config files in the airflow-dags # repository to sync artifacts from their source locations # to defined caches (mainly HDFS). command: /usr/lib/airflow/bin/artifact-cache warm /srv/deployment/airflow-dags/analytics/conf/artifact_config.yaml /srv/deployment/airflow-dags/analytics/analytics/conf/artifacts.yaml timeout: 300 # default is 30 seconds
Deploy to hadoop-test cluster (an-test-client1001).
22:14:54 [@deploy1002:/srv/deployment/airflow-dags/analytics] (main)[2f59257] ± scap deploy -e hadoop-test --force 22:15:02 Started deploy [airflow-dags/analytics@2f59257] (hadoop-test) 22:15:02 Deploying Rev: HEAD = 2f59257f6be1651fc295bf14ba2a35221e1588d3 22:15:02 Started deploy [airflow-dags/analytics@2f59257] (hadoop-test): (no justification provided) 22:15:02 == DEFAULT == :* an-test-client1001.eqiad.wmnet airflow-dags/analytics: fetch stage(s): 100% (ok: 1; fail: 0; left: 0) airflow-dags/analytics: config_deploy stage(s): 100% (ok: 1; fail: 0; left: 0) airflow-dags/analytics: promote stage(s): 100% (ok: 1; fail: 0; left: 0) 22:15:09 == DEFAULT == :* an-test-client1001.eqiad.wmnet airflow-dags/analytics: finalize stage(s): 100% (ok: 1; fail: 0; left: 0) 22:15:10 Finished deploy [airflow-dags/analytics@2f59257] (hadoop-test): (no justification provided) (duration: 00m 07s) 22:15:10 Finished deploy [airflow-dags/analytics@2f59257] (hadoop-test) (duration: 00m 07s)
Check that defined artifacts are synced to caches:
22:14:42 [@an-test-client1001:/srv/deployment/airflow-dags/analytics] (2f59257)[2f59257] ± sudo -u analytics /usr/lib/airflow/bin/artifact-cache status ./conf/artifact_config.yaml ./analytics/conf/artifacts.yaml # ... (some warnings here; fsspec needs a newer version of pyarrow...TBD.) Artifact(refinery-job-0.1.20): hdfs:///tmp/airflow_artifact_cache/files/org.wikimedia.analytics.refinery.job_refinery-job_0.1.20 (exists=True) https://archiva.wikimedia.org/repository/releases/org/wikimedia/analytics/refinery/job/refinery-job/0.1.20/refinery-job-0.1.20.jar (exists=True)