Page MenuHomePhabricator

Enable Spark data lineage for all Airflow instances
Open, Needs TriagePublic

Description

After successfully delivering Spark based column-level data lineage for Data Engineering, we want to provide data lineage to all other data platform Airflow instances.

https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/Airflow/Instances

Stakeholders

Implementation Steps

  • list out all Airflow instances and stakeholders
  • coordinate with stakeholders (Slack)
  • for each instance list out DAGs and mark suitable ones
  • submit data lineage integration patches for suitable DAGs
  • roll out changes and monitor

Details

Other Assignee
Ahoelzl
Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Draft: Set DataHub Kafka connection depending on execution environmentrepos/data-engineering/airflow-dags!1340mfornsfix-spark-lineage-for-kubernetesmain
Add port to spark.datahub.kafka.bootstraprepos/data-engineering/airflow-dags!1205tchinadd-port-spark-lineagemain
Enable spark lineage for search/dags/query_clicks.pyrepos/data-engineering/airflow-dags!1190tchinenable-spark-lineage-query-clicksmain
Add spark lineage config to search instancerepos/data-engineering/airflow-dags!1125tchinadd-spark-lineage-searchmain
Customize query in GitLab

Event Timeline

Ahoelzl updated Other Assignee, added: tchin; removed: Ahoelzl.

Fixed k8s DatahubEventEmitter issue and re-deploy on all Airflow instances.

Currently encountering this error, with the search instance, think it's the same issue we encountered when migrating the analytics instance to k8s:

25/04/01 15:44:20 ERROR AsyncEventQueue: Listener DatahubSparkListener threw an exception
datahub.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at datahub.client.kafka.KafkaEmitter.<init>(KafkaEmitter.java:55)
	at datahub.spark.DatahubEventEmitter.getEmitter(DatahubEventEmitter.java:85)
	at datahub.spark.DatahubEventEmitter.emitMcps(DatahubEventEmitter.java:401)
	at datahub.spark.DatahubEventEmitter.emitCoalesced(DatahubEventEmitter.java:190)
	at datahub.spark.DatahubSparkListener.onApplicationEnd(DatahubSparkListener.java:279)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: datahub.shaded.org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: kafka-jumbo-eqiad.external-services.svc.cluster.local
	at datahub.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:54)
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
	... 20 more

Actually nevermind, the error we had last time was

Caused by: datahub.shaded.org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at datahub.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:84)
        at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
        ... 20 more

@brouberol would you happen to have some insight into this issue?

The previous issue you linked to in https://phabricator.wikimedia.org/T386862#10701370 was fixed by introducing the following patch:

    datahub_kafka_jumbo:
      conn_type: datahub_kafka
-     host: kafka-jumbo-eqiad.external-services.svc.cluster.local:9092
+     host: kafka-jumbo-eqiad.external-services.svc.cluster.local

What I'm reading here is

_extra_default_args = {
    **instance_default_args,
    # Arguments for operators that have artifact dependencies
    "hdfs_tools_shaded_jar_path": artifact("hdfs-tools-0.0.6-shaded.jar"),
    "spark_sql_driver_jar_path": artifact("wmf-sparksqlclidriver-1.0.0.jar"),
    "spark_datahub_lineage_enabled": False,
    "spark_datahub_lineage_jar": artifact("acryl-spark-lineage-0.2.16.jar"),
    "spark_datahub_lineage_conf": {
        "spark.datahub.emitter": "kafka",
        "spark.extraListeners": "datahub.spark.DatahubSparkListener",
        "spark.datahub.kafka.bootstrap": "{{ conn.datahub_kafka_jumbo.host }}", # <--
        "spark.datahub.kafka.schema_registry_url": "{{ conn.datahub_kafka_jumbo.extra_dejson.connection.schema_registry_url }}",  # noqa: E501
        "spark.datahub.flow_name": "airflow_{{ dag.dag_id }}__{{ task.task_id }}",
        "spark.datahub.log.mcps": "false",
    },
}

which is consumed by the spark config builder. IMO bootstrap.server should contain the port (cf my own experience and this answer), so we might be missing :9092 here.

What I don't understand is how things work over in airflow-main in the first place.

Hit me up when you're around and we'll investigate.

What I'm thinking atm is that multiple things use datahub_kafka_jumbo.host: the python Kafka client used by airflow to interact with datahub as a hook, which concatenates host and port, and the spark client, that assumes that both hostname and port are found in conn.datahub_kafka_jumbo.host.

My hypothesis is that things might actually be broken in airflow-main (we'll see if I'm right or not). If so, I think a proper fix would be to have

_extra_default_args = {
    **instance_default_args,
    # Arguments for operators that have artifact dependencies
    "hdfs_tools_shaded_jar_path": artifact("hdfs-tools-0.0.6-shaded.jar"),
    "spark_sql_driver_jar_path": artifact("wmf-sparksqlclidriver-1.0.0.jar"),
    "spark_datahub_lineage_enabled": False,
    "spark_datahub_lineage_jar": artifact("acryl-spark-lineage-0.2.16.jar"),
    "spark_datahub_lineage_conf": {
        "spark.datahub.emitter": "kafka",
        "spark.extraListeners": "datahub.spark.DatahubSparkListener",
-       "spark.datahub.kafka.bootstrap": "{{ conn.datahub_kafka_jumbo.host }}", 
+       "spark.datahub.kafka.bootstrap": "{{ conn.datahub_kafka_jumbo.host }}:{{ conn.datahub_kafka_jumbo.port }}", 
        "spark.datahub.kafka.schema_registry_url": "{{ conn.datahub_kafka_jumbo.extra_dejson.connection.schema_registry_url }}",  # noqa: E501
        "spark.datahub.flow_name": "airflow_{{ dag.dag_id }}__{{ task.task_id }}",
        "spark.datahub.log.mcps": "false",
    },
}

I think it actually is broken on main as well and it's just been silently failing. I opened a patch to add the port back in

Ok so that wasn't it, now we get this error:

25/04/08 19:42:35 ERROR AsyncEventQueue: Listener DatahubSparkListener threw an exception
datahub.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at datahub.client.kafka.KafkaEmitter.<init>(KafkaEmitter.java:55)
	at datahub.spark.DatahubEventEmitter.getEmitter(DatahubEventEmitter.java:85)
	at datahub.spark.DatahubEventEmitter.emitMcps(DatahubEventEmitter.java:401)
	at datahub.spark.DatahubEventEmitter.emitCoalesced(DatahubEventEmitter.java:190)
	at datahub.spark.DatahubSparkListener.onApplicationEnd(DatahubSparkListener.java:279)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: datahub.shaded.org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at datahub.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:84)
	at datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
	... 20 more
Ahoelzl updated Other Assignee, added: Ahoelzl; removed: tchin.

Hmm, we might need to iterate on a test environment so we can figure this out.

Throwing out a guess, it seems like because where the lineage runs depends on the driver, it needs to somehow be aware of whether or not it's running on k8s and choose the correct kafka bootstrap url. I wonder if there's an easy way to figure this out? This will probably require some refactoring.

I think we can make use of the existing methods: k8s_proof_url() or is_running_in_kubernetes().

@brouberol, usually when we use them, we transform an external URL like https://meta.wikimedia.org to something like http://envoy:6501.
I could not find any envoy configuration for datahub, though. Does this mean that datahub is not accessible from inside Kubernetes? 🙏

@mforns we shoud have an entry in the service mesh for datahub. We did cut corners at the time, and circumvented the mesh when it comes to how we deployed Datahub (we were not using the mesh at the time). We defined a Kubernetes Service resource for Datahub though, meaning it is reachable via vanilla K8s service discovery, at http://datahub-gms-production.datahub.svc:8080

brouberol@deploy1003:~$ k exec -it airflow-scheduler-695fc84dbb-25hhn -- python3
Defaulted container "airflow-production" out of: airflow-production, statsd-exporter
Python 3.10.15 (main, Mar 31 2025, 16:03:46) [GCC 10.2.1 20210110] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import socket
>>> socket.gethostbyname('datahub-gms-production.datahub.svc')
'10.67.45.254'

The issue (as I understand it) is that Spark applications running outside of Kubernetes need to talk to either Kafka or Datahub, using a different DNS than when they are running in Kubernetes.

This isn't so much of a configuration issue than a code issue. When you launch spark in cluster ( I think?) mode, then the driver will be running in YARN, meaning that any service discovery that is Kubernetes specific will not work. So your code needs to figure out that when the driver is running in YARN, it should be calling kafka-jumbo100x.eqiad.wmnet and datahub.wikimedia.org instead of kafka-jumbo-eqiad.external-services:9092 and datahub-gms-production.datahub.svc.

I think the solution is to make the code aware of both endpoints, and then pick the correct one inside the SparkSubmitOperator based off of the launcher param before it sets the rest of the config. Right now the endpoint is set by a jinja template, but by the time airflow templates the string it's probably too late?

Thanks @brouberol and @tchin!

OK, I think we can indeed switch the desired URL depending on whether we are in Kubernetes or not.
Working on that.

@brouberol I've written some code that I think might work.
But I think that we would need to define another connection in values-analytics-production.yaml, say, datahub_kafka_jumbo_external, that points to kafka-jumbo1007.eqiad.wmnet:9092.
Would that make sense? 🙏

Change #1148408 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] Add a connection for kafka-jumbo-eqiad to be used by systems outside Kubernetes

https://gerrit.wikimedia.org/r/1148408

Change #1148408 merged by Brouberol:

[operations/deployment-charts@master] Add a connection for kafka-jumbo-eqiad to be used by systems outside Kubernetes

https://gerrit.wikimedia.org/r/1148408

@mforns I didn't report back, but you should be gtg by the way!