Page MenuHomePhabricator

platform_eng Airflow instance Spark jobs failing after Iceberg changes
Closed, ResolvedPublic

Description

Getting the following stack:

23/05/16 16:24:36 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions.
java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:207)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1191)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1189)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1189)
	at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:103)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

Spark jobs on analytics Airflow instance are working fine

They use the for_virtualenv helper function though:

gather_commons_dataset = SparkSubmitOperator.for_virtualenv(
    task_id="commons_index",
    virtualenv_archive=conda_env,
    entry_point='lib/python3.10/site-packages/image_suggestions/commonswiki_file.py',
    launcher='skein',
    application_args=args
)

However, they do not override the spark version, so they should be picking up the jar?

Details

ReferenceSource BranchDest BranchAuthorTitle
repos/data-engineering/airflow-dags!398T336800-set-spark-homemainxcollazoAlways set SPARK_HOME on Spark jobs
repos/data-engineering/airflow-dags!389hotfix-platform-eng-dont-fail-on-icebergmainxcollazoplatform_eng: move all for_virtualenv() to cluster mode to avoid iceberg regression.
Customize query in GitLab

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

Debugging this with @JAllemandou, we compared the failed job with another for_virtualenv() that was running fine. We noticed that the job that runs fine also runs with the deploy_mode='cluster' parameter.

We tried this on the image_suggestions common_index job and we were able to move forward.

We are not sure, however, of what the root cause is, since in both cases (deploy_mode='cluster' or the default of deploy_mode='client') we should be using the cluster spark assembly located at hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar.

Resources being uploaded on a failing job:

23/05/16 18:58:08 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar
23/05/16 18:58:08 INFO Client: Source and destination file systems are the same. Not copying hdfs://analytics-hadoop/wmf/cache/artifacts/airflow/platform_eng/image-suggestions-0.11.0-v0.11.0.conda.tgz#venv
23/05/16 18:58:08 INFO Client: Uploading resource file:/var/lib/hadoop/data/d/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401586/container_e75_1678266962370_401586_01_000001/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401587/pyspark.zip
23/05/16 18:58:08 INFO Client: Uploading resource file:/var/lib/hadoop/data/d/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401586/container_e75_1678266962370_401586_01_000001/venv/lib/python3.10/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401587/py4j-0.10.9-src.zip
23/05/16 18:58:08 INFO Client: Uploading resource file:/var/lib/hadoop/data/h/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401586/spark-16f67cd8-aa65-4cc2-93b2-b9277e7900c0/__spark_conf__4737332238272077799.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401587/__spark_conf__.zip

Resources being uploaded on a successful job:

23/05/16 19:15:00 INFO Client: Uploading resource file:/var/lib/hadoop/data/e/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401661/container_e75_1678266962370_401661_01_000001/analytics-platform-eng.keytab -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401666/analytics-platform-eng.keytab
23/05/16 19:15:00 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar
23/05/16 19:15:01 INFO Client: Source and destination file systems are the same. Not copying hdfs://analytics-hadoop/wmf/cache/artifacts/airflow/platform_eng/image-suggestions-0.11.0-v0.11.0.conda.tgz#venv
23/05/16 19:15:01 INFO Client: Uploading resource file:/var/lib/hadoop/data/e/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401661/container_e75_1678266962370_401661_01_000001/venv/lib/python3.10/site-packages/image_suggestions/commonswiki_file.py -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401666/commonswiki_file.py
23/05/16 19:15:01 INFO Client: Uploading resource file:/var/lib/hadoop/data/e/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401661/container_e75_1678266962370_401661_01_000001/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401666/pyspark.zip
23/05/16 19:15:01 INFO Client: Uploading resource file:/var/lib/hadoop/data/e/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401661/container_e75_1678266962370_401661_01_000001/venv/lib/python3.10/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401666/py4j-0.10.9-src.zip
23/05/16 19:15:01 INFO Client: Uploading resource file:/var/lib/hadoop/data/l/yarn/local/usercache/analytics-platform-eng/appcache/application_1678266962370_401661/spark-7c0cf18e-a9ef-4ea7-9741-bc72b187f7c8/__spark_conf__6520906018389548258.zip -> hdfs://analytics-hadoop/user/analytics-platform-eng/.sparkStaging/application_1678266962370_401666/__spark_conf__.zip

So it looks like normal behavior for pyspark jobs to upload pyspark.zip.

Mentioned in SAL (#wikimedia-operations) [2023-05-17T14:34:32Z] <xcollazo@deploy1002> Started deploy [airflow-dags/platform_eng@ad1cc7c]: deploying hotfix for T336800

Mentioned in SAL (#wikimedia-operations) [2023-05-17T14:34:41Z] <xcollazo@deploy1002> Finished deploy [airflow-dags/platform_eng@ad1cc7c]: deploying hotfix for T336800 (duration: 00m 09s)

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/389

platform_eng: move all for_virtualenv() to cluster mode to avoid iceberg regression.

Merge Request 389 is a hot fix that sets deploy_mode='cluster' on all for_virtualenv() calls.

Still we need to understand better the root cause.

After discussions with @Antoine_Quhen, we speculate the issue is that when on deploy_mode='client' the skein master uses the provided packed conda environment as is instead of augmenting it with spark.yarn.archive (i.e. the assembly file). That would explain the behavior since environments with just pyspark would neccesarily fail on finding the iceberg jar. I will try to repro this independently of the Structure Data jobs.

xcollazo changed the task status from Open to In Progress.Wed, May 17, 8:40 PM

xcollazo opened https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/398

Revert "platform_eng: move all for_virtualenv() to cluster mode to avoid iceberg regression."

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/398

Revert "platform_eng: move all for_virtualenv() to cluster mode to avoid iceberg regression."

I unfortunately followed a couple red herrings here, mostly because I was convinced that PYSPARK_DRIVER_PYTHON in the diff below was the culprit. Not true.

--- untitled 2
+++ (clipboard)
@@ -9,7 +9,6 @@
 file_systems: []
 master:
   env:
-    PYSPARK_DRIVER_PYTHON: venv/bin/python
     PYSPARK_PYTHON: venv/bin/python
     SPARK_CONF_DIR: /etc/spark3/conf
   files:
@@ -29,19 +28,18 @@
   resources:
     fpgas: 0
     gpus: 0
-    memory: 11445
-    vcores: 2
-  script: spark3-submit --driver-cores 2 --conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python
-    --conf spark.executorEnv.PYSPARK_PYTHON=venv/bin/python --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf
-    --master yarn --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=64
+    memory: 4096
+    vcores: 1
+  script: spark3-submit --driver-cores 2 --conf spark.executorEnv.PYSPARK_PYTHON=venv/bin/python
+    --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf --master yarn --conf
+    spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=64
     --conf spark.shuffle.service.enabled=true --conf spark.yarn.maxAppAttempts=1 --conf
     spark.sql.shuffle.partitions=256 --conf spark.sql.sources.partitionOverwriteMode=dynamic
     --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv/bin/python
-    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python --archives
-    'hdfs:///wmf/cache/artifacts/airflow/platform_eng/image-suggestions-0.11.0-v0.11.0.conda.tgz#venv'
+    --archives 'hdfs:///wmf/cache/artifacts/airflow/platform_eng/image-suggestions-0.11.0-v0.11.0.conda.tgz#venv'
     --executor-cores 4 --executor-memory 20G --driver-memory 12G --keytab analytics-platform-eng.keytab
     --principal analytics-platform-eng/an-airflow1004.eqiad.wmnet@WIKIMEDIA --name
-    image_suggestions__commons_index__20230501 --queue production --deploy-mode client
+    image_suggestions__commons_index__20230501 --queue production --deploy-mode cluster
     venv/lib/python3.10/site-packages/image_suggestions/commonswiki_file.py analytics_platform_eng
     2023-05-01 2023-04-24 4
 max_attempts: 1

When running Skein Spark jobs in client mode, if we do not set SPARK_HOME, we cannot find custom jars available in the cluster. In our repro, we could not find the iceberg jar. This was not an issue before because there were no custom jars.

We now set SPARK_HOME as a default env var for all Spark jobs. This is a bit overkill, but harmless.

We can still override this on for_virtualenv() if we wish to use a custom Spark distribution, but testing this is left as a future exercise.

Mentioned in SAL (#wikimedia-operations) [2023-05-24T16:54:07Z] <xcollazo@deploy1002> Started deploy [airflow-dags/platform_eng@1603ecf]: Deploying T336800 on platform_eng Airflow instance

Mentioned in SAL (#wikimedia-operations) [2023-05-24T16:54:16Z] <xcollazo@deploy1002> Finished deploy [airflow-dags/platform_eng@1603ecf]: Deploying T336800 on platform_eng Airflow instance (duration: 00m 09s)

I was going to try this out first on the analytics_test instance, but that one is currently blocked as we are testing the Airflow 2.6.0 changes there (T336286).

Thus deployed to the platform_eng Airflow instance. Spark jobs happily running there. We should be good. Will monitor for a bit before closing this.