Page MenuHomePhabricator

Integrate Spark with DataHub with lineage (Data-Engineering)
Closed, ResolvedPublic13 Estimated Story Points

Description

Spike Goal
Determine what the user experience is when integrating DataHub with Spark
Key Questions:
  • What do we get when we integrate Spark with DataHub? Is this something we want to support?
  • Evaluate the creation of
    • Pipelines
    • Tasks
    • Lineage between source and destination datasets
  • Can this play a part in the broader Data-Platform strategy.
  • Can we just do this for one Airflow/Spark job and see what we can visualize in DataHub

Deprecated: https://datahubproject.io/docs/metadata-integration/java/spark-lineage/
New version: https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta
New-new version: https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage

Spike results:
The Spike has been successfully completed in Q1, demonstrating the usage of the DatahubSparkListener in practise, surfacing column level lineage information in DataHub.

Completion Requirements
  • Get Spark Lineage working in DataHub
  • Make Spark based lineage configurable
  • Enable Spark based lineage for a suitable test Spark job (hive)
  • Enable Spark based lineage for all suitable Spark jobs in the analytics airflow instance(hive)
Follow ups
  • Version upgrade Spark and Iceberg and the connector to support Iceberg
  • Enable Spark based lineage for all remaining Spark jobs using Iceberg tables
  • Enable for other airflow instances.

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Enable Spark Lineage on even more dagsrepos/data-engineering/airflow-dags!1059tchinenable-even-more-spark-lineagemain
Disable Spark Lineage on questionable Spark operationsrepos/data-engineering/airflow-dags!991tchindisable-spark-lineagemain
Enable Spark Lineage on a DAG that generates temp filesrepos/data-engineering/airflow-dags!976tchintemp-file-lineagemain
Enable spark lineage on more dagsrepos/data-engineering/airflow-dags!935tchinenable-more-lineagemain
Add Datahub Spark lineage support (no iceberg)repos/data-engineering/airflow-dags!820tchinsupport-spark-lineagemain
Customize query in GitLab

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
lbowmaker removed the point value 2 for this task.Mar 26 2024, 12:15 AM

I ran a simple spark sql job on a statbox with:

sudo -u analytics-privatedata spark3-sql --jars ./acryl-spark-lineage-0.2.16.jar --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" --master local[12] --driver-memory 8G --conf "spark.datahub.emitter=file" --conf "spark.datahub.file.filename=./il_lineage" -f il_test.hql

il_test.hql

INSERT OVERWRITE TABLE tchin.interlanguage_navigation  
    SELECT /*+ COALESCE(1) */  
        *
    FROM wmf_traffic.interlanguage_navigation
    WHERE day="2024-08-14"
;

Note that these are both iceberg tables.
It outputted these urns:

[
{
    "aspectName" : "dataFlowInfo",
    "entityUrn" : "urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)",
    "entityType" : "dataFlow",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"customProperties\":{\"spark.master\":\"local[12]\",\"jobId\":\"0\",\"processingEngineVersion\":\"3.1.2\",\"appId\":\"local-1723723383169\",\"startedAt\":\"2024-08-15T12:03:02.435Z\",\"spark.app.name\":\"SparkSQL::10.64.21.17\",\"sparkUser\":\"analytics-privatedata\",\"jobDescription\":\"INSERT OVERWRITE TABLE tchin.interlanguage_navigation  \\n    SELECT /*+ COALESCE(1) */  \\n        *\\n    FROM wmf_traffic.interlanguage_navigation\\n    WHERE day=\\\"2024-08-14\\\"\\n\",\"processingEngine\":\"spark\",\"finishedAt\":\"2024-08-15T12:03:12.220Z\"},\"name\":\"SparkSQL::10.64.21.17\"}"
    }
},
{
    "aspectName" : "status",
    "entityUrn" : "urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)",
    "entityType" : "dataFlow",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"removed\":false}"
    }
},
{
    "aspectName" : "dataJobInfo",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"customProperties\":{\"spark.master\":\"local[12]\",\"jobId\":\"0\",\"processingEngineVersion\":\"3.1.2\",\"spark.app.name\":\"SparkSQL::10.64.21.17\",\"jobDescription\":\"INSERT OVERWRITE TABLE tchin.interlanguage_navigation  \\n    SELECT /*+ COALESCE(1) */  \\n        *\\n    FROM wmf_traffic.interlanguage_navigation\\n    WHERE day=\\\"2024-08-14\\\"\\n\",\"processingEngine\":\"spark\"},\"created\":{\"time\":1723723382435},\"name\":\"SparkSQL::10.64.21.17\",\"flowUrn\":\"urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)\",\"type\":{\"string\":\"spark\"}}"
    }
},
{
    "aspectName" : "status",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"removed\":false}"
    }
},
{
    "aspectName" : "dataJobInputOutput",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"inputDatajobs\":[],\"inputDatasetEdges\":[],\"outputDatasetEdges\":[],\"inputDatasets\":[],\"outputDatasets\":[]}"
    }
},
{
    "aspectName" : "dataProcessInstanceInput",
    "entityUrn" : "urn:li:dataProcessInstance:019155eb-8b42-79f7-a071-0d10165278b2",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"inputs\":[]}"
    }
},
{
    "aspectName" : "dataProcessInstanceOutput",
    "entityUrn" : "urn:li:dataProcessInstance:019155eb-8b42-79f7-a071-0d10165278b2",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"outputs\":[]}"
    }
},
{
    "aspectName" : "dataProcessInstanceProperties",
    "entityUrn" : "urn:li:dataProcessInstance:019155eb-8b42-79f7-a071-0d10165278b2",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"name\":\"019155eb-8b42-79f7-a071-0d10165278b2\",\"created\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1723723392200}}"
    }
},
{
    "aspectName" : "dataProcessInstanceRunEvent",
    "entityUrn" : "urn:li:dataProcessInstance:019155eb-8b42-79f7-a071-0d10165278b2",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"result\":{\"type\":\"SUCCESS\",\"nativeResultType\":\"COMPLETE\"},\"timestampMillis\":1723723392146,\"status\":\"COMPLETE\"}"
    }
},
{
    "aspectName" : "dataProcessInstanceRelationships",
    "entityUrn" : "urn:li:dataProcessInstance:019155eb-8b42-79f7-a071-0d10165278b2",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"parentTemplate\":\"urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)\",\"upstreamInstances\":[]}"
    }
}
]

I also ran it inserting into datahub so you can see what it looks like:
https://datahub.wikimedia.org/tasks/urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)/Documentation?is_lineage_mode=false

For some reason, it seems like it's not linking the lineage of wmf_traffic.interlanguage_navigation, even though the documentation says it defaults to looking up the tables in the hive data platform for lineage.

When I look at the logs some things jump out at me:

12:03:12.183 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.lifecycle.plan.StreamingDataSourceV2RelationVisitor - The result of checking whether org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation is an instance of org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation is false
12:03:12.184 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder - Visiting query plan Optional[== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [tchin, interlanguage_navigation], [], false, true, false
+- 'UnresolvedHint COALESCE, [1]
   +- 'Project [*]
      +- 'Filter ('day = 2024-08-14)
         +- 'UnresolvedRelation [wmf_traffic, interlanguage_navigation], [], false

== Analyzed Logical Plan ==

OverwriteByExpression RelationV2[project_family#5, current_project#6, previous_project#7, navigation_count#8L, day#9] spark_catalog.tchin.interlanguage_navigation, true, false
+- Repartition 1, false
   +- Project [project_family#0, current_project#1, previous_project#2, navigation_count#3L, day#4]
      +- Filter (day#4 = cast(2024-08-14 as date))
         +- SubqueryAlias spark_catalog.wmf_traffic.interlanguage_navigation
            +- RelationV2[project_family#0, current_project#1, previous_project#2, navigation_count#3L, day#4] spark_catalog.wmf_traffic.interlanguage_navigation

== Optimized Logical Plan ==
OverwriteByExpression RelationV2[project_family#5, current_project#6, previous_project#7, navigation_count#8L, day#9] spark_catalog.tchin.interlanguage_navigation, true, false
+- Repartition 1, false
   +- Filter (isnotnull(day#4) AND (day#4 = 19949))
      +- RelationV2[project_family#0, current_project#1, previous_project#2, navigation_count#3L, day#4] spark_catalog.wmf_traffic.interlanguage_navigation

== Physical Plan ==
OverwriteByExpression spark_catalog.tchin.interlanguage_navigation, [AlwaysTrue()], [], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1983/1139659498@7202009e
+- Coalesce 1
   +- *(1) Filter (isnotnull(day#4) AND (day#4 = 19949))
      +- *(1) ColumnarToRow
         +- BatchScan[project_family#0, current_project#1, previous_project#2, navigation_count#3L, day#4] spark_catalog.wmf_traffic.interlanguage_navigation [filters=day IS NOT NULL, day = 19949]
] with output dataset builders [<function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>]
12:03:12.187 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.lifecycle.plan.WriteToDataSourceV2Visitor - The supplied logical plan IS NOT an instance of org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
12:03:12.188 [spark-listener-group-shared] INFO io.openlineage.spark.agent.util.PlanUtils - apply method failed with
java.lang.NoSuchMethodError: org.apache.iceberg.spark.SparkSessionCatalog.icebergCatalog()Lorg/apache/iceberg/catalog/Catalog;
	at io.openlineage.spark3.agent.lifecycle.plan.catalog.IcebergHandler.getIcebergTable(IcebergHandler.java:171)
	at io.openlineage.spark3.agent.lifecycle.plan.catalog.IcebergHandler.getDatasetVersion(IcebergHandler.java:156)
	at io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogUtils3.getDatasetVersion(CatalogUtils3.java:100)
	at io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils.extractVersionFromDataSourceV2Relation(DatasetVersionDatasetFacetUtils.java:46)
	at io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils.includeDatasetVersion(DatasetVersionDatasetFacetUtils.java:86)
	at io.openlineage.spark3.agent.lifecycle.plan.TableContentChangeDatasetBuilder.apply(TableContentChangeDatasetBuilder.java:83)
	at io.openlineage.spark3.agent.lifecycle.plan.TableContentChangeDatasetBuilder.apply(TableContentChangeDatasetBuilder.java:30)
	at io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder$1.apply(AbstractQueryPlanDatasetBuilder.java:97)
	at io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder$1.apply(AbstractQueryPlanDatasetBuilder.java:86)
	at io.openlineage.spark.agent.util.PlanUtils.safeApply(PlanUtils.java:295)
	at io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder.lambda$apply$0(AbstractQueryPlanDatasetBuilder.java:76)
	at java.util.Optional.map(Optional.java:215)
	at io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder.apply(AbstractQueryPlanDatasetBuilder.java:68)
	at io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder.apply(AbstractQueryPlanDatasetBuilder.java:40)
	at io.openlineage.spark.agent.util.PlanUtils.safeApply(PlanUtils.java:295)
	at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.lambda$null$29(OpenLineageRunEventBuilder.java:381)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:313)
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildOutputDatasets(OpenLineageRunEventBuilder.java:335)
	at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:196)
	at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:139)
	at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecEnd$4(OpenLineageSparkListener.java:131)
	at io.openlineage.client.circuitBreaker.NoOpCircuitBreaker.run(NoOpCircuitBreaker.java:27)
	at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecEnd(OpenLineageSparkListener.java:128)
	at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:103)
	at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:339)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
12:03:12.191 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext - Posting event for end 0: {"eventTime":"2024-08-15T12:03:12.146Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019155eb-9955-7a97-8414-885778bc84e6","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019155eb-8b42-79f7-a071-0d10165278b2"},"job":{"namespace":"default","name":"spark_sql::10.64.21.17"}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[12]","spark.app.name":"SparkSQL::10.64.21.17"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"3.1.2","name":"spark"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"default","name":"spark_sql::10.64.21.17.overwrite_by_expression.spark_catalog_tchin_interlanguage_navigation","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.19.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}

Unsure if the spark datasource class mismatches matter, but it seems like it can't find some method in the iceberg catalog. It might be that we need to use a different version of iceberg to get a working lineage? I'll test with non-iceberg tables and see if we get anything.

I can see that Iceberg for Spark 3.1 does not in fact have an icebergCatalog method but for > Spark 3.3 it does. Going to see if I can use the Spark 3.3 configs from the airflow dags repo

sudo -u analytics-privatedata spark3-sql --jars ./acryl-spark-lineage-0.2.16.jar...

I might be confiused, but are you working with the correct jar here?

Maybe try this jar instead? https://central.sonatype.com/artifact/io.acryl/datahub-spark-lineage

I was reading here: https://datahubproject.io/docs/0.13.1/metadata-integration/java/spark-lineage/#configuration-instructions-standalone-java-applications

We're currently on version 0.13.3 of datahub, so I would perhaps try these: https://repo1.maven.org/maven2/io/acryl/datahub-spark-lineage/0.13.3-6/

Oh yes, I see. Sorry, I got that the wrong way around :-)

Update: Tried using spark 3.3.2 with this:

sudo -u analytics-privatedata spark3-sql \
--master local[12] --driver-memory 8G \
--jars ./iceberg-spark-runtime-3.3_2.12-1.2.1.jar,./acryl-spark-lineage-0.2.16.jar \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.emitter=file" \
--conf "spark.datahub.file.filename=./il_lineage" \
--conf "spark.yarn.archive=hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip" \
--conf "spark.jars.ivySettings=/etc/maven/ivysettings.xml" \
-f il_test.hql

Got an error:

24/08/16 21:56:56 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.IncompatibleClassChangeError: Implementing class
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at io.openlineage.spark.agent.util.ReflectionUtils.hasClass(ReflectionUtils.java:47)
	at io.openlineage.spark34.agent.lifecycle.plan.column.MergeIntoIceberg13ColumnLineageVisitor.hasClasses(MergeIntoIceberg13ColumnLineageVisitor.java:32)
	at io.openlineage.spark.agent.lifecycle.Spark3DatasetBuilderFactory.getColumnLevelLineageVisitors(Spark3DatasetBuilderFactory.java:104)
	at io.openlineage.spark.agent.lifecycle.InternalEventHandlerFactory.createColumnLevelLineageVisitors(InternalEventHandlerFactory.java:243)
	at io.openlineage.spark.agent.lifecycle.InternalEventHandlerFactory.createColumnLevelLineageVisitors(InternalEventHandlerFactory.java:55)
	at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.<init>(OpenLineageRunEventBuilder.java:175)
	at io.openlineage.spark.agent.lifecycle.ContextFactory.createSparkApplicationExecutionContext(ContextFactory.java:60)
	at io.openlineage.spark.agent.OpenLineageSparkListener.getSparkApplicationExecutionContext(OpenLineageSparkListener.java:231)
	at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$onApplicationStart$20(OpenLineageSparkListener.java:300)
	at io.openlineage.client.circuitBreaker.NoOpCircuitBreaker.run(NoOpCircuitBreaker.java:27)
	at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationStart(OpenLineageSparkListener.java:298)
	at datahub.spark.DatahubSparkListener.onApplicationStart(DatahubSparkListener.java:94)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:55)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Seems like >0.13.1, they introduced support for Openlineage, and this line jumps out at me: io.openlineage.spark34.agent.lifecycle.plan.column.MergeIntoIceberg13ColumnLineageVisitor.hasClasses(MergeIntoIceberg13ColumnLineageVisitor.java:32) maybe it thinks it's on spark 3.4? I don't know where to get the spark 3.4 assembly zip to test this theory

Just for fun I also tried using the older connector:

sudo -u analytics-privatedata spark3-sql \
--master local[12] --driver-memory 8G \
--jars ./iceberg-spark-runtime-3.3_2.12-1.2.1.jar,./datahub-spark-lineage-0.13.3-6.jar \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.emitter=file" \
--conf "spark.datahub.file.filename=./il_lineage" \
--conf "spark.yarn.archive=hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip" \
--conf "spark.jars.ivySettings=/etc/maven/ivysettings.xml" \
-f il_test.hql

Exception in thread "main" java.lang.UnsupportedClassVersionError: datahub/spark/DatahubSparkListener has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

I don't know if it is useful, but @xcollazo did some work on using custom spark assemblies and there are some notes in T340861#9136895

I don't know if it is useful, but @xcollazo did some work on using custom spark assemblies and there are some notes in T340861#9136895

Unfortunately, using Spark 3.3+ only works from a PySpark context, as we need a conda artifact that bundles the whole of Spark. See here for a production example, and take note of L149:

"SPARK_HOME": "venv/lib/python3.10/site-packages/pyspark",  # point to the packaged Spark

So we are telling Skein to run spark3-sql out of the conda environment (3.3) rather than the production one (3.1).

If we want to use Spark 3.3 generally in production, we have to upgrade the production Spark.

I ran a job using our regular prod configs just without iceberg tables. It ran successfully and outputted this:

[
{
    "aspectName" : "dataFlowInfo",
    "entityUrn" : "urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)",
    "entityType" : "dataFlow",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"customProperties\":{\"spark.master\":\"local[12]\",\"jobId\":\"0\",\"processingEngineVersion\":\"3.1.2\",\"appId\":\"local-1724123844455\",\"startedAt\":\"2024-08-20T03:17:23.782Z\",\"spark.app.name\":\"SparkSQL::10.64.21.17\",\"sparkUser\":\"analytics-privatedata\",\"jobDescription\":\"INSERT INTO tchin.lineage_test  \\n    SELECT\\n        *\\n    FROM wmf.interlanguage_navigation\\n    WHERE date=\\\"2024-08-14\\\"\\n\",\"processingEngine\":\"spark\",\"finishedAt\":\"2024-08-20T03:17:31.513Z\"},\"name\":\"SparkSQL::10.64.21.17\"}"
    }
},
{
    "aspectName" : "status",
    "entityUrn" : "urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)",
    "entityType" : "dataFlow",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"removed\":false}"
    }
},
{
    "aspectName" : "dataJobInfo",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"customProperties\":{\"spark.master\":\"local[12]\",\"jobId\":\"0\",\"processingEngineVersion\":\"3.1.2\",\"spark.app.name\":\"SparkSQL::10.64.21.17\",\"jobDescription\":\"INSERT INTO tchin.lineage_test  \\n    SELECT\\n        *\\n    FROM wmf.interlanguage_navigation\\n    WHERE date=\\\"2024-08-14\\\"\\n\",\"processingEngine\":\"spark\"},\"created\":{\"time\":1724123843782},\"name\":\"SparkSQL::10.64.21.17\",\"flowUrn\":\"urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default)\",\"type\":{\"string\":\"spark\"}}"
    }
},
{
    "aspectName" : "status",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"removed\":false}"
    }
},
{
    "aspectName" : "upstreamLineage",
    "entityUrn" : "urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD)",
    "entityType" : "dataset",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"fineGrainedLineages\":[{\"confidenceScore\":0.5,\"downstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD),project_family)\"],\"upstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),date)\",\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),project_family)\"],\"downstreamType\":\"FIELD_SET\",\"upstreamType\":\"FIELD_SET\"},{\"confidenceScore\":0.5,\"downstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD),current_project)\"],\"upstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),current_project)\",\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),date)\"],\"downstreamType\":\"FIELD_SET\",\"upstreamType\":\"FIELD_SET\"},{\"confidenceScore\":0.5,\"downstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD),previous_project)\"],\"upstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),date)\",\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),previous_project)\"],\"downstreamType\":\"FIELD_SET\",\"upstreamType\":\"FIELD_SET\"},{\"confidenceScore\":0.5,\"downstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD),navigation_count)\"],\"upstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),date)\",\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),navigation_count)\"],\"downstreamType\":\"FIELD_SET\",\"upstreamType\":\"FIELD_SET\"},{\"confidenceScore\":0.5,\"downstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD),date)\"],\"upstreams\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD),date)\"],\"downstreamType\":\"FIELD_SET\",\"upstreamType\":\"FIELD_SET\"}],\"upstreams\":[{\"type\":\"TRANSFORMED\",\"dataset\":\"urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD)\"}]}"
    }
},
{
    "aspectName" : "dataJobInputOutput",
    "entityUrn" : "urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)",
    "entityType" : "dataJob",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"inputDatajobs\":[],\"inputDatasetEdges\":[{\"destinationUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD)\",\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1724123851500}}],\"outputDatasetEdges\":[{\"destinationUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD)\",\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1724123851500}}],\"inputDatasets\":[],\"outputDatasets\":[]}"
    }
},
{
    "aspectName" : "dataProcessInstanceInput",
    "entityUrn" : "urn:li:dataProcessInstance:01916dca-1750-7480-b365-a311ff5ad22d",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"inputs\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,wmf.interlanguage_navigation,PROD)\"]}"
    }
},
{
    "aspectName" : "dataProcessInstanceOutput",
    "entityUrn" : "urn:li:dataProcessInstance:01916dca-1750-7480-b365-a311ff5ad22d",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"outputs\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,tchin.lineage_test,PROD)\"]}"
    }
},
{
    "aspectName" : "dataProcessInstanceProperties",
    "entityUrn" : "urn:li:dataProcessInstance:01916dca-1750-7480-b365-a311ff5ad22d",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"name\":\"01916dca-1750-7480-b365-a311ff5ad22d\",\"created\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1724123851500}}"
    }
},
{
    "aspectName" : "dataProcessInstanceRunEvent",
    "entityUrn" : "urn:li:dataProcessInstance:01916dca-1750-7480-b365-a311ff5ad22d",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"result\":{\"type\":\"SUCCESS\",\"nativeResultType\":\"COMPLETE\"},\"timestampMillis\":1724123851448,\"status\":\"COMPLETE\"}"
    }
},
{
    "aspectName" : "dataProcessInstanceRelationships",
    "entityUrn" : "urn:li:dataProcessInstance:01916dca-1750-7480-b365-a311ff5ad22d",
    "entityType" : "dataProcessInstance",
    "changeType" : "UPSERT",
    "aspect" : {
        "contentType" : "application/json",
        "value" : "{\"parentTemplate\":\"urn:li:dataJob:(urn:li:dataFlow:(spark,SparkSQL::10.64.21.17,default),SparkSQL::10.64.21.17)\",\"upstreamInstances\":[]}"
    }
}
]

Note the addition of the upstreamLineage object. This means that lineage is being emitting. So the issue before is purely because of Iceberg.

You can see the working pipeline in Datahub now

Screenshot 2024-08-19 at 11.50.47 PM.png (598×2 px, 95 KB)

It seems like right now, unless we upgrade to at least Spark 3.4 and Iceberg 1.4, we will not be able to use Datahub's spark lineage connector on iceberg tables

Ottomata renamed this task from Spike: Integrate Spark with DataHub to Spike: Integrate Spark with DataHub with lineage.Aug 20 2024, 2:25 PM

Yeah I think we should prioritize that.

I also tested joins and they work:

image.png (352×1 px, 39 KB)

Yeah I think we should prioritize that.

I also tested joins and they work:

image.png (352×1 px, 39 KB)

That is pretty cool!

I don't know how/where spark's appName is autogenerated, but for dags to use spark lineage we should make it required for them to also define a static appName or else there will be a new pipeline + task(s) for every dag run

Screenshot 2024-09-12 at 8.16.44 AM.png (714×1 px, 87 KB)

This looks like it is happening because of the way the default SparkSubmitOperator name is set.

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/operators/spark.py#L51

We should change that to be named after the dag+task name, but not necessarily the task instance.

We had set the name with date information on purpose, to facilitate identifying tasks in Yarn (there is a bug, hourly tasks are named with day data, and this should be corrected).
I understand it would make things better for lineage to name them without the date-moving part. But looking in the Datahub spark doc, I found that we can set the flow name explicitely using spark.datahub.flow_name.

Wathever direction we prefer to take, we should make and log an official decision on this, probably involving Data product.

Nice! that sounds good! That is going to be a job specific setting then, so hm. That will make the discussion around how to parameterize more annoying.

tchin renamed this task from Spike: Integrate Spark with DataHub with lineage to Integrate Spark with DataHub with lineage.Oct 21 2024, 2:26 PM
tchin updated the task description. (Show Details)

Change #1085449 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/puppet@production] Add airflow connection conf for datahub

https://gerrit.wikimedia.org/r/1085449

Ahoelzl changed the point value for this task from 8 to 13.

Change #1085449 abandoned by Ottomata:

[operations/puppet@production] Add airflow connection conf for datahub

https://gerrit.wikimedia.org/r/1085449

I went through all the dags in the analytics instance for a surface-level evaluation on whether we can apply spark lineage to it

DAGUses easy_dag?Uses Spark?Uses Iceberg?Other NotesCan apply Spark Lineage?
AnomalyDetectionDAGNoYesNoSpecial Custom DAG classNo
apis_metrics_to_graphite_hourlyNoYesNoNo
aqs_hourlyYesYesYes, for one operationHas manual lineage from DatahubLineageEmitterOperatorDone (manually)
browser_general_dailyYesYesYesNo
browser_metrics_weeklyYesYesYesOperator generates a temp file. Unsure how that appears in DataHubNo
canary_eventsYesNo-No
load_commons_impact_metricsYesYesYesUses create_easy_cassandra_loading_dagNo
cassandra_load_editors_by_country_monthlyYesYesNoUses create_easy_cassandra_loading_dagCassandra tables aren't in datahub, should probably not enable until they are
cassandra_load_mediarequest_per_file_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_mediarequest_per_referer_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_mediarequest_top_files_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_pageview_per_article_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_pageview_per_project_hourlyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_pageview_top_articles_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_pageview_top_by_country_monthlyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_pageview_top_per_country_dailyYesYesNoUses create_easy_cassandra_loading_dag
cassandra_load_unique_devices_dailyYesYesNoUses create_easy_cassandra_loading_dag
create_easy_cassandra_loading_dagYesYesNoUses create_easy_cassandra_loading_dag
clickstream_monthlyYesYesNoSQL dynamically generated using a java class. It also creates files.Maybe?
commons_impact_metrics_dumps_monthlyYesYesYesOperator generates a temp file.No
commons_impact_metrics_monthlyYesYesYesNo
metadata_ingest_dailyNoNo-Datahub ingestion dagNo
druid_load_banner_activity_minutely_aggregated_dailyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
druid_load_edit_hourlyNoNo-No
druid_load_editattemptstepNoNo-No
druid_load_geoeditors_monthlyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
druid_load_navigationtimingNoNo-No
druid_load_netflowNoNo-No
druid_load_network_flows_internalNoNo-No
druid_load_pageviews_daily_aggregated_monthlyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
druid_load_pageviews_hourlyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
druid_load_prefupdateNoNo-No
druid_load_unique_devices_per_domain_dailyYesYesYesNo
druid_load_unique_devices_per_domain_monthlyYesYesYesNo
druid_load_unique_devices_per_project_family_dailyYesYesYesNo
druid_load_unique_devices_per_project_family_monthlyYesYesYesNo
druid_load_virtualpageview_dailyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
druid_load_webrequest_sampled_128_hourlyYesYesNoUses an intermediate temp table and creates files in a temp directoryNeeds mechanism to handle temp files
commons_structured_data_dump_to_hive_weeklyNoYesNo(?)Uses a java class to read from a file and put into a table. Unsure how it would be reflected in lineageNo
wikidata_dump_to_hive_weeklyNoYesNo(?)See aboveNo
dumps_merge_backfill_to_wikitext_rawYesYesYesDag still in development. Uses python script, creates and deletes intermediate tables and filesNo
dumps_merge_events_to_wikitext_raw_dailyYesYesYesDag still in developmentNot yet
dumps_publish_wikitext_raw_to_xmlYesYesYesDag still in developmentNot yet
dumps_reconcile_wikitext_raw_dailyYesYesYesDag still in developmentNot yet
edit_hourlyNoYesNoPotentially in the future
editors_daily_monthlyNoYesNoPotentially in the future
geoeditors_edits_monthlyNoYesNoPotentially in the future
geoeditors_monthlyNoYesNoPotentially in the future
geoeditors_public_monthlyNoYesNoCreates/deletes filesNeeds mechanism to handle temp files
geoeditors_yearlyNoYesNoCreates/deletes filesNeeds mechanism to handle temp files
unique_editors_by_country_monthlyNoYesNoPotentially in the future
hdfs_usage_weeklyYesYesNoUses a java classPotentially in the future
interlanguage_dailyYesYesYes, for one operationDone
mediacounts_archive_dailyNoYesNoGenerates temp filesNeeds mechanism to handle temp files
mediacounts_load_hourlyNoYesNoPotentially in the future
mediarequest_hourlyNoYesNoPotentially in the future
mediawiki_history_check_denormalizeNoYesNo
mediawiki_history_denormalizeNoYesNoGenerates temp filesNeeds mechanism to handle temp files
mediawiki_history_dumpsNoYesNoGenerates filesNeeds mechanism to handle temp files
mediawiki_history_loadNoYesNoCreating partitions over files
mediawiki_history_metrics_monthlyYesYesYes (metrics)Outputs metrics into tables
mediawiki_history_reducedYesYesNoYes, partially
mediawiki_history_shapshot_configYesYesNoUses create_easy_cassandra_loading_dagPotentially in the future
dump_day_of_hourly_pageviewsNoYesNoGenerates temp filesNeeds mechanism to handle temp files
dump_month_of_daily_pageviewsNoYesNoGenerates temp filesNeeds mechanism to handle temp files
pageview_actor_hourlyNoYesNoPotentially in the future
pageview_allowlist_checkNoYesNoPotentially in the future
pageview_hourlyNoYesNoGenerates temp filesPotentially in the future
pingback_report_weekly_v2YesYesNoGenerates temp files
projectview_geoNoYesNoGenerates temp files
projectview_hourlyNoYesNoGenerates temp files
referrer_dailyYesYesYesNo
refine_hourlyYesYesUnsure
refine_to_hive_hourlyYesYesUnsure
session_length_dailyYesYesYes, for one operationYes, partially
table_maintenance_iceberg_NoYesYesNo
unique_devices_YesYesYes, for one operationGenerates temp filesYes for one operation
virtualpageview_hourlyNoYesNoPotentially in the future
webrequest_actor_label_hourlyNoYesNoPotentially in the future
webrequest_actor_metrics_analyzer_hourlyYesYesNoYes
webrequest_actor_metrics_hourlyNoYesNoPotentially in the future
webrequest_actor_metrics_rollup_hourlyNoYesNoPotentially in the future
webrequest_analyzerYesYesNoThink this dag is still in development
wikidata_coeditors_metrics_to_graphite_monthlyNoYesNoGraphite is deprecated
wikidata_metrics_to_graphite_dailyNoYesNoGraphite is deprecated
wmcs_report_monthlyYesYesNoGenerates temp filesNeeds mechanism to handle temp files

Thomas and I just checked a few things to see how we should move forward. tl;dr

  • spark jobs that write to files (instead of tables) might end up with some incomplete lineage info, but nothing will break.

Screenshot 2024-12-16 at 12.17.01.png (262×886 px, 19 KB)

  • iceberg jobs shouldn't break, but there will be error logs in the job, and maybe incomplete lineage info.

Next steps:

  • Enable lineage for a spark DAG that uses both Hive tables and writes to output files and verify that everything is okay
  • Enable lineage by default for all analytics instance dags in dag_config, but manually disable lineage for any DAG (or operator?) that uses Iceberg.

How we verified file lineage doesn't break:

$ cat ./spark_datahub_otto_test2.hql



INSERT INTO otto.p1
select 
    meta.domain as name,
    1 as val,
    null as s
from event.navigationtiming    
limit 10
;    


INSERT OVERWRITE DIRECTORY "./spark_datahub_otto_test1"
    USING csv
    OPTIONS ('compression' 'uncompressed', 'sep' ' ')
SELECT * FROM otto.p1 
limit 10
;

$ spark3-sql \
--jars hdfs://analytics-hadoop/wmf/cache/artifacts/airflow/analytics/acryl-spark-lineage-0.2.16.jar \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.emitter=rest" \
--conf "spark.datahub.rest.server=https://datahub-gms.discovery.wmnet:30443" \
-f ./spark_datahub_otto_test2.hql

Mentioned in SAL (#wikimedia-operations) [2025-02-13T03:39:09Z] <tchin@deploy2002> Started deploy [airflow-dags/analytics@aaba3ff]: Deploying airflow for T306896

Mentioned in SAL (#wikimedia-operations) [2025-02-13T03:40:04Z] <tchin@deploy2002> Finished deploy [airflow-dags/analytics@aaba3ff]: Deploying airflow for T306896 (duration: 01m 07s)

Column-level data lineage of the newly deployed Spark jobs can be queried and navigated in DataHub:
https://datahub.wikimedia.org/dataset/urn:li:dataset:(urn:li:dataPlatform:hive,wmf.webrequest,PROD)/Lineage?end_time_millis&is_lineage_mode=true&separate_siblings=false&show_columns=false&start_time_millis

More enabled jobs will become visible as they execute and submit data in their upcoming run.

Ahoelzl renamed this task from Integrate Spark with DataHub with lineage to Integrate Spark with DataHub with lineage (Data-Engineering).Feb 14 2025, 6:26 PM
Ahoelzl updated the task description. (Show Details)
Ahoelzl triaged this task as High priority.