Page MenuHomePhabricator

Make it easier to run custom Spark versions via for_virtual_env()
Closed, ResolvedPublic5 Estimated Story Points

Description

Our Hadoop cluster currently supports Spark 3.1.2. In T340861, a lot of debugging was needed to get a Spark 3.3.2 conda environment to work.

For reference, a working call to for_virtual_env() for Spark 3.3.2:

merge_into = SparkSubmitOperator.for_virtualenv(
    task_id="spark_merge_into",
    virtualenv_archive=props.conda_env,
    entry_point="bin/events_merge_into.py",
    driver_memory=props.driver_memory,
    driver_cores=props.driver_cores,
    executor_memory=props.executor_memory,
    executor_cores=props.executor_cores,
    num_executors=props.num_executors,
    conf={
        "spark.driver.maxResultSize": props.spark_driver_maxResultSize,
        "spark.shuffle.service.enabled": props.spark_shuffle_service_enabled,
        "spark.dynamicAllocation.enabled": props.spark_dynamicAllocation_enabled,
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp/ivy_spark3/cache -Divy.home=/tmp/ivy_spark3/home",  # fix jar pulling  # noqa
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
        "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.3.2-assembly.zip",  # override 3.1's assembly
    },
    launcher="skein",
    application_args=args,
    use_virtualenv_spark=True,
    default_env_vars={
        "SPARK_HOME": "venv/lib/python3.10/site-packages/pyspark",  # point to the packaged Spark
        "SPARK_CONF_DIR": "/etc/spark3/conf",
    },
)

Some of the issues:

(1) Incompatibility between Spark YARN assemblies: When running Spark on top of YARN, Spark requires to have all its jars available to all executors. If spark.yarn.archive is not defined, Yarn automatically generates and distributes this archive. If spark.yarn.archive is set, YARN blindly copies this archive, regardless of what Spark version is attempting to run.

  • Option 1: Although there is a minor perf cost to not set spark.yarn.archive, we should consider not setting it in our default configuration so that folks can run whatever Spark version they want.
  • Option 2: Have a bunch of assemblies 'officially' available: right now, perhaps that set could be 3.1.2, 3.3.2, 3.4.1. But this is problematic as it requires SRE cycles every time someone wants to run a different version.

(2) Incompatibility between Spark Shuffle Service: An external Shuffle Service in Spark allows dynamic sizing of jobs, as well as taking away coordination responsibilities from executors. Although typically there is good shuffler compatibility between versions, Spark's 3.1.2 shuffler is not forward compatible with 3.3.2. We should consider having multiple shuffle services. This is being taken care by T344910.

(3) Bugs in for_virtual_env(): Current code clears SPARK_CONF_DIR when use_virtualenv_spark=True. This should not happen. Similarly, SPARK_HOME is complicated to figure out. Perhaps we could figure this automatically when use_virtualenv_spark=True.

(4) Figure out why copy pasting the Airflow Spark script doesn't seem to work for one off runs: @Milimetric reports that ad-hoc runs fail. Let's investigate this as we should be able to debug this.

Event Timeline

@BTullis regarding point (1) in description above, what is your opinion on options 1 and 2 (or other options I didn't think of)?

xcollazo set the point value for this task to 5.Oct 13 2023, 5:02 PM
xcollazo claimed this task.

A summary of the original issues, for closure:

(1) Incompatibility between Spark YARN assemblies: When running Spark on top of YARN, Spark requires to have all its jars available to all executors. If spark.yarn.archive is not defined, Yarn automatically generates and distributes this archive. If spark.yarn.archive is set, YARN blindly copies this archive, regardless of what Spark version is attempting to run.

  • Option 1: Although there is a minor perf cost to not set spark.yarn.archive, we should consider not setting it in our default configuration so that folks can run whatever Spark version they want.
  • Option 2: Have a bunch of assemblies 'officially' available: right now, perhaps that set could be 3.1.2, 3.3.2, 3.4.1. But this is problematic as it requires SRE cycles every time someone wants to run a different version.

We decided on option 2. In the end it is not onerous to generate the assembly and make it available to all via HDFS.

(2) Incompatibility between Spark Shuffle Service: An external Shuffle Service in Spark allows dynamic sizing of jobs, as well as taking away coordination responsibilities from executors. Although typically there is good shuffler compatibility between versions, Spark's 3.1.2 shuffler is not forward compatible with 3.3.2. We should consider having multiple shuffle services. This is being taken care by T344910.

T344910 fixed this.

(3) Bugs in for_virtual_env(): Current code clears SPARK_CONF_DIR when use_virtualenv_spark=True. This should not happen. Similarly, SPARK_HOME is complicated to figure out. Perhaps we could figure this automatically when use_virtualenv_spark=True.

This is still an issue, but I have decided not to pursue it further, as there is little gain to be had considering that we have now documented working code.

(4) Figure out why copy pasting the Airflow Spark script doesn't seem to work for one off runs: @Milimetric reports that ad-hoc runs fail. Let's investigate this as we should be able to debug this.

I have not hit this issue myself, and have been able to run code manually.