Page MenuHomePhabricator

Emit lineage information about Airflow jobs to DataHub
Open, MediumPublic

Description

Following up on T311882, this task would implement emitting lineage from Airflow, if the team agrees.

Details

Other Assignee
Ahoelzl

Event Timeline

Milimetric triaged this task as Medium priority.Jul 7 2022, 3:57 PM
Milimetric created this task.

This Gitlab MR https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/204 is:

  • adding the datahub dependency
  • adding the configurations in:
    • dev_instance/airflow_datahub.cfg
    • dev_instance/connections.yaml

The configuration needed for production (and on the test cluster) will be here https://gerrit.wikimedia.org/r/c/operations/puppet/+/878128 .

Two options were chosen:

lmkwyt

lmkwyt

First of all, thank you!!

+1, we should be explicit about emitting lineage until everyone using Airflow as a platform understands what's going on. I think magic at this phase of a platform like this would be confusing.

  • using Kafka connections to Datahub (not the rest API) because events are the future 😄

+1, in this case sending events this way seems supported without any custom code from us. I think in the general ingestion case we would have to build the events ourselves so I continue to choose batch there out of laziness.

This Gitlab MR https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/204

Thx, the datahub bits look good. I just left a comment about the config, but you can ignore it and we can fiddle with it later when we start instrumenting.

Yes, thank you @Antoine_Quhen! This is very exciting. Will the lineage be emitted with the upgrade or will there be another step to configure lineage to be emitted? Doing the latter will minimize the risk and give us some time to understand what lineage looks like and how to explain it... which leads to the question about the best way to test it out.

Quick recap for anyone looking to implement lineage. First, a note regarding lineage as part of centralized configuration. I think this would be very useful, and I'm in no way suggesting that we slow down on the work that @JAllemandou and @lbowmaker are leading on that front. The reality is that a centralized config may take a few more months to get implemented. In the meantime, we could instrument lineage in the airflow DAGs in a few minutes per DAG. Done in a standard way, this would be very easy to migrate to centralized config. In addition, as we implement this we may find exceptions and edge cases that would inform the centralized config. If anyone disagrees with anything here, you are very welcome, please don't take this as a "decision". Just a thought. If we agree with this and there's some slow-down to migrate back to the centralized config, I hereby promise that I'll do it myself on all DAGs.

Ok, so, general code cleanup needed before we do lineage:

  1. update config. Currently the test cluster has the connection to datahub configured here, and the production cluster does not. Installing v2 of the plugin seems like a good idea as well.
  2. Make some utility code to deal with lineage. The docs on datahub's side talk about manual lineage and point to how one would do that in airflow docs. So essentially, study this and maybe disagree with my suggestions in point 3 below
  3. Suggestion: just factor out common things and boilerplate, keep it simple. For example, lineage for mediawiki_history_denormalize would look like this:
inlets=[
    Dataset(platform="hive", name="wmf_raw.mediawiki_project_namespace_map", env="PROD"),

    Dataset(platform="hive", name="wmf_raw.mediawiki_archive", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_change_tag", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_change_tag_def", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_logging", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_page", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_revision", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_user", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_user_groups", env="PROD"),

    Dataset(platform="hive", name="wmf_raw.mediawiki_private_actor", env="PROD"),
    Dataset(platform="hive", name="wmf_raw.mediawiki_private_comment", env="PROD"),
],
outlets=[
    Dataset(platform="hive", name="wmf.mediawiki_user_history", env="PROD"),
    Dataset(platform="hive", name="wmf.mediawiki_page_history", env="PROD"),
    Dataset(platform="hive", name="wmf.mediawiki_history", env="PROD"),
]

And we could factor out the common stuff to make it look like this:

inlets, outlets = lineage_with_context("hive", "PROD", inlets=[
    "wmf_raw.mediawiki_project_namespace_map",

    "wmf_raw.mediawiki_archive",
    "wmf_raw.mediawiki_change_tag",
    "wmf_raw.mediawiki_change_tag_def",
    "wmf_raw.mediawiki_logging",
    "wmf_raw.mediawiki_page",
    "wmf_raw.mediawiki_revision",
    "wmf_raw.mediawiki_user",
    "wmf_raw.mediawiki_user_groups",

    "wmf_raw.mediawiki_private_actor",
    "wmf_raw.mediawiki_private_comment",
],
outlets=[
    wmf.mediawiki_user_history",
    wmf.mediawiki_page_history",
    wmf.mediawiki_history",
])

And that's basically it, passing those to the SparkSubmitOperator there would just allow Airflow to use the configured Dataflow backend and send lineage information. Then we can look into the other fancier features like keeping track of individual DAG runs, how that ties in with data quality, all that.

NOTE: since Antoine's work in T312566#8542940, the Datahub plugin does support automatic lineage instrumentation in a non-experimental fashion. We can't really use this because our logic is so removed from the airflow DAGs themselves. Even if we passed the HQL to the parser, we have scala jobs that wouldn't be covered.

automatic lineage instrumentation [...] We can't really use this because our logic is so removed from the airflow DAGs themselves.

QQ, if we do it manually, do we get still the column level lineage? Or just table level? I guess no column stats, as the Datasets are not really schema aware.

Dataset(platform="hive", name="wmf_raw.mediawiki_user", env="PROD"),

Is this our custom Dataset or something from airflow lineage entities?

Anyway, +1 to this idea, sounds great and not that hard to do!

I'm in no way suggesting that we slow down on the work that @JAllemandou and @lbowmaker are leading on that front

Wait, how is this relevant? Because iceberg sensors need the centralized config?

All of this is just table > table lineage though right? In the one example we have in Datahub now, we show webrequest > aqs_hourly (not the transformation).

Seems like we could add Airflow/Spark jobs as Datahub entities/assets and have that too which would help people know what is doing the transformation. Here is an example I found of this:

https://blog.datahubproject.io/harnessing-the-power-of-data-lineage-with-datahub-ad086358dec4

What do you think?