Page MenuHomePhabricator

[Airflow] Automate sync'ing archiva packages to HDFS
Closed, ResolvedPublic

Description

As part of Airflow's dependency management system,
we want to have an automatized process in place,
that grabs all packages in a given Archiva repository and syncs them to a predetermined HDFS location.
That will allow teams to reference their dependencies in spark-submit calls with HDFS paths,
with no deployment step needed.
The only requirement would be to package the dependency and upload it to Archiva.

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
wmf_airflow_lib with artifact syncing supportrepos/data-engineering/airflow-dags!4ottowmf_airflow_libmain
Customize query in GitLab

Event Timeline

odimitrijevic moved this task from Incoming to Airflow on the Analytics board.
mforns renamed this task from [Airflow] Implement DAG that syncs archiva packages to HDFS to [Airflow] Automate sync'ing archiva packages to HDFS.Oct 22 2021, 4:57 PM
mforns added a project: Data-Engineering.
mforns updated the task description. (Show Details)
mforns removed mforns as the assignee of this task.Nov 5 2021, 9:25 PM
mforns moved this task from Backlog to Discussed (Radar) on the Data Pipelines board.

I had an idea, then realized it won't work easily enough. Writing it here just for my reference.

fsspec supports fs abstractions around local, http hdfs (via pyarrow), etc.). I wondered if we could use its built in file caching to cache http files to hdfs;

Unfortunetly, the file caching only caches on the local file system. We could probably copy a lot of their CachingFileSystem and make a version that works either specifically with fsspec HDFS file system, or even better just make their CachingFileSystem use any of the fsspec filesystems as a cache storage...but I think that is probably overkill for this task.

ALRIGHT! Got some stuff working on the hadoop-test instance.

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/conf/artifact_config.yaml

# Declare available artifact_sources
artifact_sources:

  # Simple URL based artifact source.  The id of
  # artifacts that use this source should just be a full URL.
  url:
    class_name: workflow_utils.artifact.FsArtifactSource

  # Archiva mirror-maven-central repository
  wmf_archiva_central:
    class_name: workflow_utils.artifact.MavenArtifactSource
    base_uri: https://archiva.wikimedia.org/repository/mirror-maven-central

  # Archiva python repository
  wmf_archiva_releases:
    class_name: workflow_utils.artifact.MavenArtifactSource
    base_uri: https://archiva.wikimedia.org/repository/releases

  # Archiva python repository
  wmf_archiva_python:
    class_name: workflow_utils.artifact.MavenArtifactSource
    base_uri: https://archiva.wikimedia.org/repository/python


# Declare available artifact caches.
artifact_caches:
  hdfs:
    class_name: workflow_utils.artifact.FsArtifactCache
    base_uri: hdfs:///tmp/airflow_artifact_cache/files

  # TODO: do we want to use a FsMavenArtifactCache as a default?
  # It will make the cache directory look nice! But will be strange
  # for non maven artifacts.

# This will be used if no source is defined for your artifact
default_artifact_source: url

# This will be used if no caches are defined for your artifact
default_artifact_caches: [hdfs]

Test deploying refinery-job.jar as an artifact to hdfs caches.

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/analytics/conf/artifacts.yaml

# Declare all artifacts that might be needed by jobs in the analytics airflow instance.

artifacts:
  # Test syncing this artifact to caches
  refinery-job-0.1.20:
    id: org.wikimedia.analytics.refinery.job:refinery-job:0.1.20
    source: wmf_archiva_releases

scap checks.yaml will run artifact-cache warm during promote stage of deployment:

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags-scap-analytics/-/blob/main/checks.yaml

checks:
  artifacts_sync:
    type: command
    stage: promote
    # artifact-cache is a CLI script installed into the
    # /usr/lib/airflow conda environment via the
    # data-engineering/workflow_utils python library.
    # It uses artifact config files in the airflow-dags
    # repository to sync artifacts from their source locations
    # to defined caches (mainly HDFS).
    command: /usr/lib/airflow/bin/artifact-cache warm /srv/deployment/airflow-dags/analytics/conf/artifact_config.yaml /srv/deployment/airflow-dags/analytics/analytics/conf/artifacts.yaml
    timeout: 300 # default is 30 seconds

Deploy to hadoop-test cluster (an-test-client1001).

22:14:54 [@deploy1002:/srv/deployment/airflow-dags/analytics] (main)[2f59257] ± scap deploy -e hadoop-test --force
22:15:02 Started deploy [airflow-dags/analytics@2f59257] (hadoop-test)
22:15:02 Deploying Rev: HEAD = 2f59257f6be1651fc295bf14ba2a35221e1588d3
22:15:02 Started deploy [airflow-dags/analytics@2f59257] (hadoop-test): (no justification provided)
22:15:02
== DEFAULT ==
:* an-test-client1001.eqiad.wmnet
airflow-dags/analytics: fetch stage(s): 100% (ok: 1; fail: 0; left: 0)
airflow-dags/analytics: config_deploy stage(s): 100% (ok: 1; fail: 0; left: 0)
airflow-dags/analytics: promote stage(s): 100% (ok: 1; fail: 0; left: 0)
22:15:09
== DEFAULT ==
:* an-test-client1001.eqiad.wmnet
airflow-dags/analytics: finalize stage(s): 100% (ok: 1; fail: 0; left: 0)
22:15:10 Finished deploy [airflow-dags/analytics@2f59257] (hadoop-test): (no justification provided) (duration: 00m 07s)
22:15:10 Finished deploy [airflow-dags/analytics@2f59257] (hadoop-test) (duration: 00m 07s)

Check that defined artifacts are synced to caches:

22:14:42 [@an-test-client1001:/srv/deployment/airflow-dags/analytics] (2f59257)[2f59257] ± sudo -u analytics /usr/lib/airflow/bin/artifact-cache status ./conf/artifact_config.yaml ./analytics/conf/artifacts.yaml
# ... (some warnings here; fsspec needs a newer version of pyarrow...TBD.)
Artifact(refinery-job-0.1.20):
	hdfs:///tmp/airflow_artifact_cache/files/org.wikimedia.analytics.refinery.job_refinery-job_0.1.20	(exists=True)
	https://archiva.wikimedia.org/repository/releases/org/wikimedia/analytics/refinery/job/refinery-job/0.1.20/refinery-job-0.1.20.jar	(exists=True)