Our Hadoop cluster currently supports Spark 3.1.2. In T340861, a lot of debugging was needed to get a Spark 3.3.2 conda environment to work.
For reference, a working call to for_virtual_env() for Spark 3.3.2:
merge_into = SparkSubmitOperator.for_virtualenv( task_id="spark_merge_into", virtualenv_archive=props.conda_env, entry_point="bin/events_merge_into.py", driver_memory=props.driver_memory, driver_cores=props.driver_cores, executor_memory=props.executor_memory, executor_cores=props.executor_cores, num_executors=props.num_executors, conf={ "spark.driver.maxResultSize": props.spark_driver_maxResultSize, "spark.shuffle.service.enabled": props.spark_shuffle_service_enabled, "spark.dynamicAllocation.enabled": props.spark_dynamicAllocation_enabled, "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1", "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp/ivy_spark3/cache -Divy.home=/tmp/ivy_spark3/home", # fix jar pulling # noqa "spark.jars.ivySettings": "/etc/maven/ivysettings.xml", # fix jar pulling "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.3.2-assembly.zip", # override 3.1's assembly }, launcher="skein", application_args=args, use_virtualenv_spark=True, default_env_vars={ "SPARK_HOME": "venv/lib/python3.10/site-packages/pyspark", # point to the packaged Spark "SPARK_CONF_DIR": "/etc/spark3/conf", }, )
Some of the issues:
(1) Incompatibility between Spark YARN assemblies: When running Spark on top of YARN, Spark requires to have all its jars available to all executors. If spark.yarn.archive is not defined, Yarn automatically generates and distributes this archive. If spark.yarn.archive is set, YARN blindly copies this archive, regardless of what Spark version is attempting to run.
- Option 1: Although there is a minor perf cost to not set spark.yarn.archive, we should consider not setting it in our default configuration so that folks can run whatever Spark version they want.
- Option 2: Have a bunch of assemblies 'officially' available: right now, perhaps that set could be 3.1.2, 3.3.2, 3.4.1. But this is problematic as it requires SRE cycles every time someone wants to run a different version.
(2) Incompatibility between Spark Shuffle Service: An external Shuffle Service in Spark allows dynamic sizing of jobs, as well as taking away coordination responsibilities from executors. Although typically there is good shuffler compatibility between versions, Spark's 3.1.2 shuffler is not forward compatible with 3.3.2. We should consider having multiple shuffle services. This is being taken care by T344910.
(3) Bugs in for_virtual_env(): Current code clears SPARK_CONF_DIR when use_virtualenv_spark=True. This should not happen. Similarly, SPARK_HOME is complicated to figure out. Perhaps we could figure this automatically when use_virtualenv_spark=True.
(4) Figure out why copy pasting the Airflow Spark script doesn't seem to work for one off runs: @Milimetric reports that ad-hoc runs fail. Let's investigate this as we should be able to debug this.