Page MenuHomePhabricator

Setup config to allow lineage instrumentation
Closed, ResolvedPublic3 Estimated Story Points

Description

User Story
As a Data Engineer, I need to setup Airflow config so that data pipeline producers can easily include lineage information about their job
Why?

So that we can use the lineage information in DataHub to inform those interested in how data was created or transformed. Examples from DataHub (ex 1, ex 2)

Success Criteria

  • Config is setup to allow instrumentation
  • Simple test job runs to prove it is working as expected

Open Questions

  • Is this just PA instance or do we need to do something for other instances?

Event Timeline

JArguello-WMF set the point value for this task to 3.

Update: I'm emitting metadata to Kafka from an ad-hoc Airflow data lineage task. The configuration is setting up the communication with Kafka and the schema registry, Karapace. Then the metadata is well-fetched by the mce-consumer service on the DataHub side. Now I'm looking to use the detailed version of the data lineage event, containing more information than just the link upstream<>downstream.

Change 919019 had a related patch set uploaded (by Aqu; author: Aqu):

[operations/puppet@production] Add Airflow configuration to connect to DataHub

https://gerrit.wikimedia.org/r/919019

Some propositions for an immediate and more useful next step:

1/ Declare simple lineage between datasets in our Airflow datasets listing

  • Simple lineage (e.g. 2 hive tables -> 1 hive table)
  • Add dependencies in .../config/datasets.yaml e.g.
hive_aqs_hourly:
    datastore: hive
    table_name: wmf.aqs_hourly
    partitioning: "@hourly"
    upstream:
      - hive_webrequest_hourly
  • Create an Airflow daily job to parse through the list of Airflow datasets and feed DataHub lineages

2/ Add grained lineage to each job

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/analytics/config/datasets.yaml

  • grained lineage contains relations between fields, not only tables
  • By launching the lineage operator from the actual job, we could get more metadata (job duration, timestamp, partition/snapshot logical date, the job entity itself)

3/ We could generate grain linage by running an HQL parser on refinery/hql

https://github.com/sqlparser/sqlflow_public

Oh, wow! 3/ looks magical. It has an on-premise version that we could install as a server. We could call it from Airflow from either 1/ or 2/, no? That said, it seems a thing that would need some work.
I would either go with 1/ or 2/ (since we need one of them in any case IIUC), and later expand it with 3/.

Sadly, I believe that the on-premise version of 3/ is incompatible from a licencing perspective :-(
https://github.com/sqlparser/sqlflow_public/blob/master/install_sqlflow.md#gudu-sqlflow-license-file

I prefer the sound of option 2/ personally, but that's just my opinion. Are there any downsides to 2/ over 1/ ?

Here is a standardized version of the first iteration for easy use by ppl without knowledge of DataHub: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/386

I agree with Marcel we shouldn't mess with SQL parsing right now. But if/when we do, it seems to me the analyzers built into Spark can give us all the info we need. I haven't looked into it but the Execution Plan has information about the columns involved and what's happening to them and all that. Might be enough for our purposes here, and others might've already played with it.

Thanks all for the reviews. Even if the DAG is working, deciding the single source of truth for our dataset metadata could be great. Right now, its located in:

  • airflow-dags/../dataset.yml
  • airflow-dags/../..._dag.py
  • DataHub

We could eventually create a new repo centralizing our metadata.

Change 931683 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Add a datahub_kafka_jumbo connection to the analytics airflow instance

https://gerrit.wikimedia.org/r/931683

Change 931683 merged by Btullis:

[operations/puppet@production] Add a datahub_kafka_jumbo connection to the analytics airflow instance

https://gerrit.wikimedia.org/r/931683

Change 931690 had a related patch set uploaded (by Aqu; author: Aqu):

[operations/puppet@production] Fix datahub connections

https://gerrit.wikimedia.org/r/931690

Change 931690 merged by Stevemunene:

[operations/puppet@production] Fix datahub connections

https://gerrit.wikimedia.org/r/931690

Change 919019 merged by Btullis:

[operations/puppet@production] Add Airflow configuration to connect to DataHub

https://gerrit.wikimedia.org/r/919019

Change 936792 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Configure the test datahub jobs to use the staging schema registry

https://gerrit.wikimedia.org/r/936792

Change 936792 merged by Btullis:

[operations/puppet@production] Configure the test datahub jobs to use the staging schema registry

https://gerrit.wikimedia.org/r/936792