Page MenuHomePhabricator

Deploy additional yarn shuffler services to support several versions of spark in parallel
Closed, ResolvedPublic

Assigned To
Authored By
BTullis
Aug 24 2023, 11:57 AM
Referenced Files
F39781560: image.png
Oct 24 2023, 10:44 AM
F39777572: image.png
Oct 23 2023, 10:55 PM
F39679219: image.png
Oct 23 2023, 2:03 PM
F38924461: image.png
Oct 20 2023, 3:28 PM

Description

In T338057 we discussed performing a full migration of our production Spark deployment from its current version 3.1.2 to either 3.3.x or 3.4.x, complete with the yarm shuffler service.
This work was proposed in order to support ongoing development work on the T333013: [Iceberg Migration] Apache Iceberg Migration and the Dumps 2.0 work in T330296

However, in T340861 @xcollazo was able to validate a workaround which meant that we could use a different version of spark from that deployed for production jobs, by supplying a custom conda environment.

The most significant remaining challenge for this method is that our YARN resource managers only have one spark shuffler service running, which is that of Spark version 3.1.2

However, it is possible to run multiple version of the spark shuffler service for YARN in parallel:
https://spark.apache.org/docs/latest/running-on-yarn.html#running-multiple-versions-of-the-spark-shuffle-service
...and select which one to use at the time of a spark job submission.

This seems like it would be a useful mechanism for us to employ, to help speed up development work.

In addition, it will likely make future upgrades of the production Spark version easier too, since we would not have to coordinate code changes to the spark jobs with a big-bang upgrade of the shuffler service.

Done is:

  • Spark 3.3.2 Shuffle service available in the cluster
  • Spark 3.4.1 Shuffle service available in the cluster

Corresponding spark assembly files available in cluster ( T345440 will take care of this )

Details

TitleReferenceAuthorSource BranchDest Branch
Downgrade spark 3.1 to version 3.1.2repos/data-engineering/spark!7btullisdowngrade_spark_3_1main
Remove the shuffler jar file from conda-analyticsrepos/data-engineering/conda-analytics!36btullisspark_3_1_3main
Fix the postinst script with the minor verion symlinksrepos/data-engineering/spark!6btullisuse_minor_versionsmain
Use minor versions for symlinks createdrepos/data-engineering/spark!5btullisuse_minor_versionsmain
Fix the postinst and prerm scriptsrepos/data-engineering/spark!4btullisfix_postinstmain
Fix typo in control.templaterepos/data-engineering/spark!3btullisfix_dependencymain
Set the debian/rules file to be executablerepos/data-engineering/spark!2btullisfix_rules_executablemain
Create a script for packaging multiple spark shufflers for yarnrepos/data-engineering/spark!1btullisrewrite_scriptmain
Customize query in GitLab

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

I decided to pause work on the GitLab-CI based build of spark itself.
I got the build to work with blubber/buildkit, but I realised that I was missing a feature of the current production-images setup which is that all versions of spark were going to be built on each push to any feature branch.
In order to make it more efficient I would have had to find a way to check for whether an image had already been published and only run those jobs if the image didn't exist on the registry.
Since this was getting a bit more complicated, I decided to merge the existing CR to produciton-images and proceed with this way of doing things at the moment.

Now I have built the production images as explained here: https://wikitech.wikimedia.org/wiki/Kubernetes/Images#Production_images and the new images have been published.

Successfully published image docker-registry.discovery.wmnet/spark3.1:3.1.3-1
Successfully published image docker-registry.discovery.wmnet/spark3.1-build:3.1.3-1
Successfully published image docker-registry.discovery.wmnet/spark3.1-operator:1.3.7-3.1.3-1
Successfully published image docker-registry.discovery.wmnet/spark3.4-build:3.4.1-1
Successfully published image docker-registry.discovery.wmnet/spark3.3-build:3.3.2-1
Successfully published image docker-registry.discovery.wmnet/spark3.4-operator:1.3.8-3.4.1-1
Successfully published image docker-registry.discovery.wmnet/spark3.3:3.3.2-1
Successfully published image docker-registry.discovery.wmnet/spark3.3-operator:1.3.8-3.3.2-1
Successfully published image docker-registry.discovery.wmnet/spark3.4:3.4.1-1

I'get on with thisnking about the distribution mechanism now, whether that's via:

  • Debian packages
  • Publishing to archiva and deploying with scap
  • Adding multiple versions of the jar to conda-analytics

My work on the spark build pipeline is here: https://gitlab.wikimedia.org/repos/data-engineering/spark
I may carry on with it as part of the work on packaging the jar files.

I've gone with the option of making a separate Debian package for each of the yarn shufflers that we wish to have installed and the merge request to enable it is: https://gitlab.wikimedia.org/repos/data-engineering/spark/-/merge_requests/1

At present, it will require a manual build step on build2001 and then rsyncing to apt1001 and adding the packages with reprepro.

Each package includes a postinst script that sets up the symlink and a prerm script that removes it. Therefore, before deploying the packages we will need to change to this script, which currently does it for the co0nda-analytics based version.
https://github.com/wikimedia/operations-puppet/blob/production/modules/profile/files/hadoop/spark3/spark3_yarn_shuffle_jar_install.sh

We will also want to remove the jar files from conda-analytics as we
https://gitlab.wikimedia.org/repos/data-engineering/conda-analytics/-/blob/main/docker/Dockerfile#L98-100l:

I'm awaiting reviews on the approach and I've requested that @MoritzMuehlenhoff make sure that the packaging method is sound, before I proceed.

Change 963281 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] [WIP] Support multiple spark yarn shufflers in parallel

https://gerrit.wikimedia.org/r/963281

Change 963304 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] [WIP] Deploy multiple spark shuffler services to the test cluster

https://gerrit.wikimedia.org/r/963304

Change 963989 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Configure the spark3 defaults with the default yarn shuffler

https://gerrit.wikimedia.org/r/963989

Change 964008 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] [WIP] Deploy multiple spark shufflers for yarn to production

https://gerrit.wikimedia.org/r/964008

I believe that the chain of patches for deploying multiple spark shuffler versions is ready for review.

As they know the bigtop module well and this is potentially disruptive if it goes wrong, I have requested a review from @elukey and @Ottomata, in addition to @xcollazo, @brouberol and @Stevemunene.

The first two are intended to be no-ops, with the third deploying the configuration to the test hadoop cluster.

It's dependent on getting the jar files packages in here: https://gitlab.wikimedia.org/repos/data-engineering/spark/-/merge_requests/1 and subsequently hosted on apt.wikimedia.org but I don't think that there is any blocker there.

Hopefully we will be able to try this on the test cluster next week and tweak if necessary.

Change 963281 merged by Btullis:

[operations/puppet@production] Support multiple spark yarn shufflers in parallel

https://gerrit.wikimedia.org/r/963281

Change 963989 merged by Btullis:

[operations/puppet@production] Support configuring the spark3 defaults with the default shuffler

https://gerrit.wikimedia.org/r/963989

I've deployed the first two patches:

These should be no-ops, so I'm going to monitor things until next week, when we can enable the multiple shufflers on the test cluster.

I have added the yarn shuffler jars to the apt repository.

btullis@apt1001:~/spark$ sudo -i reprepro include bullseye-wikimedia `pwd`/spark-3.1-yarn-shuffle_3.1.3-1_amd64.changes
Exporting indices...
btullis@apt1001:~/spark$ sudo -i reprepro include bullseye-wikimedia `pwd`/spark-3.3-yarn-shuffle_3.3.2-1_amd64.changes
Exporting indices...
btullis@apt1001:~/spark$ sudo -i reprepro include bullseye-wikimedia `pwd`/spark-3.4-yarn-shuffle_3.4.1-1_amd64.changes
Exporting indices...

Copied them to the buster distribution:

btullis@apt1001:~/spark$ sudo -i reprepro copy buster-wikimedia bullseye-wikimedia spark-3.1-yarn-shuffle
Exporting indices...
btullis@apt1001:~/spark$ sudo -i reprepro copy buster-wikimedia bullseye-wikimedia spark-3.3-yarn-shuffle
Exporting indices...
btullis@apt1001:~/spark$ sudo -i reprepro copy buster-wikimedia bullseye-wikimedia spark-3.4-yarn-shuffle
Exporting indices...

Verified that they are present.

btullis@apt1001:~/spark$ sudo -i reprepro ls spark-3.1-yarn-shuffle
spark-3.1-yarn-shuffle | 3.1.3-1 |   buster-wikimedia | amd64, source
spark-3.1-yarn-shuffle | 3.1.3-1 | bullseye-wikimedia | amd64, source
btullis@apt1001:~/spark$ sudo -i reprepro ls spark-3.3-yarn-shuffle
spark-3.3-yarn-shuffle | 3.3.2-1 |   buster-wikimedia | amd64, source
spark-3.3-yarn-shuffle | 3.3.2-1 | bullseye-wikimedia | amd64, source
btullis@apt1001:~/spark$ sudo -i reprepro ls spark-3.4-yarn-shuffle
spark-3.4-yarn-shuffle | 3.4.1-1 |   buster-wikimedia | amd64, source
spark-3.4-yarn-shuffle | 3.4.1-1 | bullseye-wikimedia | amd64, source

Change 963304 merged by Btullis:

[operations/puppet@production] Deploy multiple spark shuffler services to the test cluster

https://gerrit.wikimedia.org/r/963304

Change 966533 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Fix typo on yarn-spark-shuffle package name

https://gerrit.wikimedia.org/r/966533

Change 966533 merged by Btullis:

[operations/puppet@production] Fix typo on yarn-spark-shuffle package name

https://gerrit.wikimedia.org/r/966533

Change 966534 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Fix the name of the spark yarn shuffler packages

https://gerrit.wikimedia.org/r/966534

Change 966534 merged by Btullis:

[operations/puppet@production] Fix the name of the spark yarn shuffler packages

https://gerrit.wikimedia.org/r/966534

The first restart of the hadoop-yarn-nodemanager service on an-test-woerker1001 was unsuccessful.

This is shown in /var/log/hadoop-yarn/yarn-yarn-nodemanager-an-test-worker1001.log

2023-10-17 16:17:31,927 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service mapreduce_shuffle, "mapreduce_shuffle"
2023-10-17 16:17:32,063 ERROR org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Failed to initialize spark_shuffle_3_1
java.lang.RuntimeException: No class defined for spark_shuffle_3_1

Change 966853 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Set the class for each of the spark shuffle services

https://gerrit.wikimedia.org/r/966853

Change 966853 merged by Btullis:

[operations/puppet@production] Set the class for each of the spark shuffle services

https://gerrit.wikimedia.org/r/966853

The nodemanager isn't correctly loading the shuffler services.

2023-10-18 13:26:15,615 INFO org.apache.hadoop.util.ApplicationClassLoader: classpath: [file:/usr/lib/hadoop-yarn/lib/spark-3.1-yarn-shuffle.jar]
2023-10-18 13:26:15,708 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark
2023-10-18 13:26:15,708 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The aux service:spark_shuffle_3_1 are using the custom classloader
2023-10-18 13:26:15,708 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The Auxiliary Service named 'spark_shuffle_3_1' in the configuration is for class org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxiliaryServiceWithCustomClassLoader which has a name of 'org.apache.spark.network.yarn.YarnShuffleService with custom class loader'. Because these are not the same tools trying to send ServiceData and read Service Meta Data may have issues unless the refer to the name in the config.
2023-10-18 13:26:15,708 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service org.apache.spark.network.yarn.YarnShuffleService with custom class loader, "spark_shuffle_3_1"
2023-10-18 13:26:15,906 INFO org.apache.spark.network.yarn.YarnShuffleService: Recovery location is: /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_1/sparkShuffleRecovery.ldb
2023-10-18 13:26:15,906 INFO org.apache.spark.network.yarn.YarnShuffleService: Going to reload spark shuffle data
2023-10-18 13:26:16,009 INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port 7337. Authentication is enabled.  Registered executor file is /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_1/registeredExecutors.ldb



2023-10-18 13:26:16,009 INFO org.apache.hadoop.util.ApplicationClassLoader: classpath: [file:/usr/lib/hadoop-yarn/lib/spark-3.3-yarn-shuffle.jar]
2023-10-18 13:26:16,104 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark
2023-10-18 13:26:16,104 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The aux service:spark_shuffle_3_3 are using the custom classloader
2023-10-18 13:26:16,104 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The Auxiliary Service named 'spark_shuffle_3_3' in the configuration is for class org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxiliaryServiceWithCustomClassLoader which has a name of 'org.apache.spark.network.yarn.YarnShuffleService with custom class loader'. Because these are not the same tools trying to send ServiceData and read Service Meta Data may have issues unless the refer to the name in the config.
2023-10-18 13:26:16,104 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service org.apache.spark.network.yarn.YarnShuffleService with custom class loader, "spark_shuffle_3_3"
2023-10-18 13:26:16,262 INFO org.apache.spark.network.yarn.YarnShuffleService: Recovery location is: /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_3/sparkShuffleRecovery.ldb
2023-10-18 13:26:16,262 INFO org.apache.spark.network.yarn.YarnShuffleService: Going to reload spark shuffle data
2023-10-18 13:26:16,514 INFO org.apache.hadoop.service.AbstractService: Service spark_shuffle failed in state INITED; cause: java.net.BindException: Address already in use



2023-10-18 13:26:16,518 INFO org.apache.hadoop.util.ApplicationClassLoader: classpath: [file:/usr/lib/hadoop-yarn/lib/spark-3.4-yarn-shuffle.jar]
2023-10-18 13:26:16,624 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark
2023-10-18 13:26:16,624 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The aux service:spark_shuffle_3_4 are using the custom classloader
2023-10-18 13:26:16,624 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: The Auxiliary Service named 'spark_shuffle_3_4' in the configuration is for class org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxiliaryServiceWithCustomClassLoader which has a name of 'org.apache.spark.network.yarn.YarnShuffleService with custom class loader'. Because these are not the same tools trying to send ServiceData and read Service Meta Data may have issues unless the refer to the name in the config.
2023-10-18 13:26:16,624 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service org.apache.spark.network.yarn.YarnShuffleService with custom class loader, "spark_shuffle_3_4"
2023-10-18 13:26:16,626 INFO org.apache.spark.network.yarn.YarnShuffleService: Use LEVELDB as the implementation of spark.shuffle.service.db.backend
2023-10-18 13:26:16,777 INFO org.apache.spark.network.shuffle.ExternalShuffleBlockResolver: Use LEVELDB as the implementation of spark.shuffle.service.db.backend
2023-10-18 13:26:16,796 INFO org.apache.spark.network.yarn.YarnShuffleService: Recovery location is: /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_4/sparkShuffleRecovery.ldb
2023-10-18 13:26:16,796 INFO org.apache.spark.network.yarn.YarnShuffleService: Going to reload spark shuffle data
2023-10-18 13:26:16,924 INFO org.apache.hadoop.service.AbstractService: Service spark_shuffle failed in state INITED; cause: java.net.BindException: Address already in use

For one thing the port numbers aren't correct, so perhaps the configuration files aren't being correctly loaded.

I have got it to load the second two shufflers correctly, with the custom port numbers, but the first one seems to be ignoring the overlay.

2023-10-18 14:20:47,943 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark
2023-10-18 14:20:48,204 INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port 7337. Authentication is enabled.  Registered executor file is /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_1/registeredExecutors.ldb

2023-10-18 14:20:48,303 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing Spark YARN shuffle service with configuration overlay from file:/etc/hadoop/conf/spark_shuffle_3_3_config/spark-shuffle-site.xml
2023-10-18 14:20:48,660 INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port 7002. Authentication is enabled.  Registered executor file is /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_3/registeredExecutors.ldb

2023-10-18 14:20:48,759 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing Spark YARN shuffle service with configuration overlay from file:/etc/hadoop/conf/spark_shuffle_3_4_config/spark-shuffle-site.xml
2023-10-18 14:20:49,061 INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port 7003. Authentication is enabled.  Registered executor file is /tmp/hadoop-yarn/yarn-nm-recovery/nm-aux-services/spark_shuffle_3_4/registeredExecutors.ldb

The first one should be port 7001.

Change 966889 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Partial fix for multiple spark shufflers

https://gerrit.wikimedia.org/r/966889

Change 966892 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Change the first spark shuffler service to use the default port

https://gerrit.wikimedia.org/r/966892

Change 966889 merged by Btullis:

[operations/puppet@production] Partial fix for multiple spark shufflers

https://gerrit.wikimedia.org/r/966889

Change 966892 merged by Btullis:

[operations/puppet@production] Change the first spark shuffler service to use the default port

https://gerrit.wikimedia.org/r/966892

Change 966905 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Disable the multiple spark shufflers on the test cluster temporarily

https://gerrit.wikimedia.org/r/966905

Change 966905 merged by Btullis:

[operations/puppet@production] Disable the multiple spark shufflers on the test cluster temporarily

https://gerrit.wikimedia.org/r/966905

Change 967434 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Fix issues with multiple spark shufflers specific to version 3.1

https://gerrit.wikimedia.org/r/967434

Change 967448 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Enable the multiple spark shufflers on the test cluster

https://gerrit.wikimedia.org/r/967448

We discovered a small problem in testing:

The spark.shuffle.service.name configuration option was only introduced in version 3.2.0 of spark
From this page: https://spark.apache.org/docs/latest/configuration.html

image.png (432×1 px, 79 KB)

Therefore, when we tried to use the shuffler service named yarn_shuffle_3_1 all of our spark jobs failed.

I reverted that change, but I have now prepared another patch to fix this issue by using the default name of spark_shuffle for the 3.1 shuffler.
We also use the default port number of 3773 for this version, since it did not want to load the overlay configuration file.

Change 967434 merged by Btullis:

[operations/puppet@production] Fix issues with multiple spark shufflers specific to version 3.1

https://gerrit.wikimedia.org/r/967434

Change 967448 merged by Btullis:

[operations/puppet@production] Enable the multiple spark shufflers on the test cluster

https://gerrit.wikimedia.org/r/967448

Mentioned in SAL (#wikimedia-analytics) [2023-10-23T10:11:37Z] <btullis> deploying multiple spark shufflers to the test cluster for T344910

Well I've done something rather silly, which is that the version 3.1 spark shuffler is actually version 3.1.3 whereas the version of spark that we ship with conda-analytics is version 3.1.2.

When I tested running a spark job against version 3.1.3 of the shuffler, I got errors.

I have managed to fix it for now by manually changing the symlink as shown.

image.png (99×1 px, 35 KB)

So either I could downgrade the version of the 3.1 jar file or I could upgrade the version of spark in conda-analytics to version 3.1.3.

I'm currently working on a draft MR to upgrade to spark 3.1.3 https://gitlab.wikimedia.org/repos/data-engineering/conda-analytics/-/merge_requests/36 and I'll see if this works with the correct symlinks.

or I could upgrade the version of spark in conda-analytics to version 3.1.3

+1 to that. We might as well benefit from the bug fixes from the patch release.

Also, I am amazed that the yarn shuffler is incompatible between patches? Would we need to do the same alignment for 3.3 and 3.4 lines? I hope not!

Change 967986 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/docker-images/production-images@master] Downgrade spark 3.1 to version 3.1.2

https://gerrit.wikimedia.org/r/967986

Change 967986 merged by Btullis:

[operations/docker-images/production-images@master] Downgrade spark 3.1 to version 3.1.2

https://gerrit.wikimedia.org/r/967986

OK, I did a little experimentation with a 0.0.24-dev version of conda-analytics, but in the interest of being methodical about it, I decided to stick with the 3.1.2 shuffler service.
I can come back to the version 3.1.3 upgrade soon.

For now, we have three shuffler services running on the test cluster. These are the jar files providing those services.

btullis@an-test-worker1001:~$ ls -l /usr/lib/hadoop-yarn/lib/spark*
lrwxrwxrwx 1 root root 60 Oct 23 22:05 /usr/lib/hadoop-yarn/lib/spark-3.1-yarn-shuffle.jar -> /usr/lib/spark-3.1-yarn-shuffle/spark-3.1.2-yarn-shuffle.jar
lrwxrwxrwx 1 root root 60 Oct 23 22:05 /usr/lib/hadoop-yarn/lib/spark-3.3-yarn-shuffle.jar -> /usr/lib/spark-3.3-yarn-shuffle/spark-3.3.2-yarn-shuffle.jar
lrwxrwxrwx 1 root root 60 Oct 23 22:05 /usr/lib/hadoop-yarn/lib/spark-3.4-yarn-shuffle.jar -> /usr/lib/spark-3.4-yarn-shuffle/spark-3.4.1-yarn-shuffle.jar

I've verified that it's working with the default shuffler by re-running the failed refine tasks from airflow and then allowing the aqs_hourly DAG runs to complete. We can see here that the SLA was missed, but that they eventually worked.

image.png (1×1 px, 150 KB)

The next step is to see whether we can select either the spark_shuffle_3_3 or spark_shuffle_3_4 services.

I'll try making a custom conda environment and getting a simple test case working, but hopefully @xcollazo will be able to help test this as well.

I can't create a new conda environment with pyspark3.3.2 without getting an oom-error on the machine.
I was trying this:

conda create -n spark33 python=3.10.8 pyspark=3.3.2 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0

...as well as just trying conda install pyspark=3.3.2 within an existing environment.

Whatever I do it eats all of the remaining RAM and gets killed in a few seconds. I tried with both the new libmamba solver and with the classic solver, but there was no real difference.
I think that I wil have to add more RAM to an-test-client1002 and try again.

an-test-client1002 is running on ganeti1010.

btullis@ganeti1027:~$ sudo gnt-instance list an-test-client1002.eqiad.wmnet
Instance                       Hypervisor OS                  Primary_node           Status  Memory
an-test-client1002.eqiad.wmnet kvm        debootstrap+default ganeti1010.eqiad.wmnet running   4.0G

ganeti1010 currently has 32 GB of RAM available.

btullis@ganeti1027:~$ sudo gnt-node list ganeti1010.eqiad.wmnet
Node                   DTotal DFree MTotal MNode MFree Pinst Sinst
ganeti1010.eqiad.wmnet   2.1T  1.5T  62.5G 28.9G 32.1G     5     4

Having checked the other hosts, I think it's pretty reasonable to increase this by 4 GB to 8GB of RAM.

image.png (263×295 px, 15 KB)

btullis@ganeti1027:~$ sudo gnt-instance modify -B memory=8g an-test-client1002.eqiad.wmnet
Modified instance an-test-client1002.eqiad.wmnet
 - be/memory -> 8192
Please don't forget that most parameters take effect only at the next (re)start of the instance initiated by ganeti; restarting from within the instance will not be enough.

Cold booting the VM to pick up the additional RAM.

btullis@ganeti1027:~$ sudo gnt-instance shutdown an-test-client1002.eqiad.wmnet
Waiting for job 2268928 for an-test-client1002.eqiad.wmnet ...
btullis@ganeti1027:~$ sudo gnt-instance startup an-test-client1002.eqiad.wmnet
Waiting for job 2268929 for an-test-client1002.eqiad.wmnet ...

Great! I can now build a conda environment for testing this. e.g.

(base) btullis@an-test-client1002:~$ conda create -n spark34 python=3.10.8 pyspark=3.4.1 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0
 conda activate spark34
(spark34) btullis@an-test-client1002:~$ pyspark --version
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/
                        
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_372
Branch HEAD
Compiled by user centos on 2023-06-19T23:01:01Z
Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
Url https://github.com/apache/spark
Type --help for more information.

Install wmfdata-python into my new environment.

pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.1

Now starting a jupyterlab notebook and trying something like this:

import wmfdata
spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_4',
        "spark.shuffle.service.port": '7339'
    }
)

Not quite working yet, but maybe not far away.

TL;DR: I was able to verify that Spark 3.1.2 and Spark 3.3.2 work as expected on the test cluster 🎉 . I ran out of time, but happy to test Spark 3.4.X tomorrow.

Longer version:

We will have fun via a Jupyter notebook:

ssh -N an-test-client1002.eqiad.wmnet -L 8880:127.0.0.1:8880

Check whether default Spark 3.2.1 is working fine:

conda env:

xcollazo@an-test-client1002:~$ conda-analytics-list 
# conda environments:
#
2023-10-18T16.31.33_xcollazo     /home/xcollazo/.conda/envs/2023-10-18T16.31.33_xcollazo
base                     /opt/conda-analytics

xcollazo@an-test-client1002:~$ ls /home/xcollazo/.conda/envs/2023-10-18T16.31.33_xcollazo/lib/python3.10/site-packages/ | grep pyspark | grep dist
pyspark-3.1.2.dist-info

Quick notebook test (and sorry for the fragmented markdown but that is what the notebook Save As is generating... ):

import wmfdata
spark = wmfdata.spark.create_session(type='yarn-large')
SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/24 18:27:36 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/10/24 18:27:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/24 18:27:48 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/10/24 18:27:48 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
spark.version
'3.1.2'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
[Stage 16:=======================================================>(95 + 1) / 96]

+------+
| count|
+------+
|325526|
+------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100, truncate=False)
[Stage 19:=====================================================>(508 + 4) / 512]

+------+------------------------+
|count |referer_class           |
+------+------------------------+
|244532|internal                |
|63247 |none                    |
|15181 |external (search engine)|
|2374  |external                |
|110   |unknown                 |
|82    |external (media sites)  |
+------+------------------------+

Now let's try Spark 3.3.2:

create conda env:

export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org,.wikibooks.org,.wikiquote.org,.wiktionary.org,.wikisource.org,.wikispecies.org,.wikiversity.org,.wikidata.org,.mediawiki.org,.wikinews.org,.wikivoyage.org
export HTTP_PROXY=$http_proxy
export HTTPS_PROXY=$https_proxy
export NO_PROXY=$no_proxy

source /opt/conda-analytics/etc/profile.d/conda.sh
conda create -n spark33 python=3.10.8 pyspark=3.3.2 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark33
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.1

Looks like Spark assemly file is missing from HDFS test cluster:

xcollazo@an-test-client1002:~$ hdfs dfs -ls /user/spark/share/lib
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Found 3 items
-rw-r--r--   2 hdfs hadoop  195258057 2021-02-19 15:49 /user/spark/share/lib/spark-2.4.4-assembly.zip
-rw-r--r--   2 hdfs hadoop  255798851 2023-06-13 10:43 /user/spark/share/lib/spark-3.1.2-assembly.jar
-rw-r-----   2 hdfs hadoop  228506202 2023-06-13 10:33 /user/spark/share/lib/spark-3.1.2-assembly.jar.bak

Ideally this file will be generated on the fly, but because it is specified in our spark-defaults.conf, then we have to override it to make other Spark versions work on yarn:

xcollazo@an-test-client1002:~$ cat /etc/spark3/conf/spark-defaults.conf | grep yarn.archive
spark.yarn.archive                                  hdfs:///user/spark/share/lib/spark-3.1.2-assembly.jar

So let's copy over my assembly jar for Spark 3.3.2:

xcollazo@an-test-client1002:~/artifacts$ hdfs dfs -mkdir /user/xcollazo/artifacts
xcollazo@an-test-client1002:~/artifacts$ hdfs dfs -copyFromLocal ./spark-3.3.2-assembly.zip /user/xcollazo/artifacts

And now let's run a similar notebook as for Spark 3.1, but configured so that we pickup Spark from the conda env rather than the cluster (and sorry for the fragmented markdown but that is what the notebook Save As is generating... ):

%env SPARK_HOME=/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
env: SPARK_HOME=/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
%env SPARK_CONF_DIR=/etc/spark3/conf
env: SPARK_CONF_DIR=/etc/spark3/conf
!echo $SPARK_HOME
!echo $SPARK_CONF_DIR
/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
/etc/spark3/conf
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_3',
        "spark.shuffle.service.port": '7338',
        "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.3.2-assembly.zip",
        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)
SPARK_HOME: /home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at hadoop-client-api-3.3.2.jar
hadoop-client-runtime-3.3.2.jar, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
:: loading settings :: file = /etc/maven/ivysettings.xml
:: loading settings :: url = jar:file:/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xcollazo/.ivy2/cache
The jars for the packages stored in: /home/xcollazo/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13419751-b56d-434f-b3b6-bebc029f1305;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.2.1 in mirrored
downloading https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.2.1/iceberg-spark-runtime-3.3_2.12-1.2.1.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.2.1!iceberg-spark-runtime-3.3_2.12.jar (3371ms)
:: resolution report :: resolve 6520ms :: artifacts dl 3376ms
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1   |   1   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-13419751-b56d-434f-b3b6-bebc029f1305
	confs: [default]
	1 artifacts copied, 0 already retrieved (27248kB/38ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/24 20:03:04 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/10/24 20:03:04 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
23/10/24 20:03:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/24 20:03:05 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/24 20:03:11 WARN Client: Same path resource file:///home/xcollazo/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.3_2.12-1.2.1.jar added multiple times to distributed cache.
23/10/24 20:03:15 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/10/24 20:03:15 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13001. Attempting port 13002.
23/10/24 20:03:16 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
spark.version
'3.3.2'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
                                                                                

+------+
| count|
+------+
|325526|
+------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100, truncate=False)
[Stage 3:========================================================>(95 + 1) / 96]

+------+------------------------+
|count |referer_class           |
+------+------------------------+
|244532|internal                |
|63247 |none                    |
|15181 |external (search engine)|
|2374  |external                |
|110   |unknown                 |
|82    |external (media sites)  |
+------+------------------------+
python

BTW, just fixed permissions on my Spark 3.3.2 assembly file to readable by all for if other folks want to repro the above notebook runs:

$ hdfs dfs -ls /user/xcollazo/artifacts/
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Found 1 items
-rw-r--r--   2 xcollazo xcollazo  278485306 2023-10-24 19:45 /user/xcollazo/artifacts/spark-3.3.2-assembly.zip

Great! Thanks so much @xcollazo - It was the assembly file that I had overlooked in my brief testing.
I'll mark this ticket as ready to be deployed to production and we can work out when is best to roll it out.

I'll also have a think about the options that you have outlined in T345440: Make it easier to run custom Spark versions via for_virtual_env() about the different assembly files.
Maybe there is a neat solution to automate keeping a set of assembly files up-to-date, which match the shufflers.

@xcollazo I'm happy to be guided by you as to when this should be deployed to production.

I believe that this patch is ready to go:

So we could deploy it tomorrow if you think that there is value in doing so, or we could leave it until I'm back from leave on November 6th, or someone else could deploy it while I am out next week.

Once the patch is merged and puppet has run on all hadoop workers to install the new shuffler packages, we will need to run the following:

sudo cumin A:hadoop-worker 'rm /usr/lib/hadoop-yarn/lib/spark3-yarn-shuffle.jar'

This removes the symlink that was previously created to the version of that jar that is shipped in conda-analytics, leaving the three remaining symlinks to the packaged versions of the shuffler jar.

Then we will need to do a restart of all nodemanagers with:

sudo cumin A:hadoop-worker 'systemctl restart hadoop-yarn-nodemanager.service'

...or similar.

Finally, we will need to make the spark assembly files available, as you have described above.

TL;DR: Confirmed that Spark 3.4.1 works as well! 🎉

(I also wanted to check Spark 3.3.3 on top of our Spark 3.3.2 shuffler, but that version is not available yet on pyspark.)

First, build conda env:

export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org,.wikibooks.org,.wikiquote.org,.wiktionary.org,.wikisource.org,.wikispecies.org,.wikiversity.org,.wikidata.org,.mediawiki.org,.wikinews.org,.wikivoyage.org
export HTTP_PROXY=$http_proxy
export HTTPS_PROXY=$https_proxy
export NO_PROXY=$no_proxy

source /opt/conda-analytics/etc/profile.d/conda.sh
conda create -n spark34 python=3.10.8 pyspark=3.4.1 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark34
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.1

There is no spark-assembly yet for this one, so let's create it off of the pyspark bundle (similar to T340861#9136895):

cd /home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark/jars
zip -r ~/artifacts/spark-3.4.1-assembly.zip .
hdfs dfs -copyFromLocal ~/artifacts/spark-3.4.1-assembly.zip /user/xcollazo/artifacts
hdfs dfs -chmod +r /user/xcollazo/artifacts/spark-3.4.1-assembly.zip

Note the above assembly *does not include Iceberg*, so we have to pull it from ivy, just like we have been doing elsewhere. I think this is preferable anyway so that we can also pick and choose the iceberg version more easily, as Iceberg should be back and forward compatible, and also, our current prod iceberg version of 1.2.1 is simply not available for Spark 3.4.X ( https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/ ).

And now, notebook (and again excuse the formatting from exporting notebook to markdown):

%env SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
env: SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
%env SPARK_CONF_DIR=/etc/spark3/conf
env: SPARK_CONF_DIR=/etc/spark3/conf
!echo $SPARK_HOME
!echo $SPARK_CONF_DIR
/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
/etc/spark3/conf
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_4',
        "spark.shuffle.service.port": '7339',
        "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.4.1-assembly.zip",
        ##
        # extras to make Iceberg work on 3.4.1:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)
SPARK_HOME: /home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at hadoop-client-api-3.3.4.jar
hadoop-client-runtime-3.3.4.jar, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
:: loading settings :: file = /etc/maven/ivysettings.xml
:: loading settings :: url = jar:file:/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xcollazo/.ivy2/cache
The jars for the packages stored in: /home/xcollazo/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-53ab8a32-8d89-4d35-8783-06ee57271c73;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.1 in mirrored
downloading https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.1/iceberg-spark-runtime-3.4_2.12-1.4.1.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.1!iceberg-spark-runtime-3.4_2.12.jar (3719ms)
:: resolution report :: resolve 7773ms :: artifacts dl 3723ms
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1   |   1   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-53ab8a32-8d89-4d35-8783-06ee57271c73
	confs: [default]
	1 artifacts copied, 0 already retrieved (28642kB/54ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 17:50:08 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/10/26 17:50:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/26 17:50:14 WARN Client: Same path resource file:///home/xcollazo/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.4_2.12-1.4.1.jar added multiple times to distributed cache.
23/10/26 17:50:23 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
spark.version
'3.4.1'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
[Stage 0:========================================================>(95 + 1) / 96]

+------+
| count|
+------+
|325526|
+------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100, truncate=False)
[Stage 3:=======================================================> (94 + 2) / 96]

+------+------------------------+
|count |referer_class           |
+------+------------------------+
|244532|internal                |
|63247 |none                    |
|15181 |external (search engine)|
|2374  |external                |
|110   |unknown                 |
|82    |external (media sites)  |
+------+------------------------+
python

TL;DR: Confirmed that Spark 3.4.1 also works against Shuffler from Spark 3.3.2 🎉. This is good since folks that want bleeding edge (say Spark 3.5.0 that was just released) may just use a compatible shuffler without wating on us delivering it. I speculate that the shuffle incompatibilty is isolated to Spark 3.1 due to their switch from log4j 1.x to 2.x.

Notebook:

%env SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
env: SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
%env SPARK_CONF_DIR=/etc/spark3/conf
env: SPARK_CONF_DIR=/etc/spark3/conf
!echo $SPARK_HOME
!echo $SPARK_CONF_DIR
/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
/etc/spark3/conf
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_3',
        "spark.shuffle.service.port": '7338',
        "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.4.1-assembly.zip",
        ##
        # extras to make Iceberg work on 3.4.1:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)
SPARK_HOME: /home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at hadoop-client-api-3.3.4.jar
hadoop-client-runtime-3.3.4.jar, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
:: loading settings :: file = /etc/maven/ivysettings.xml
:: loading settings :: url = jar:file:/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xcollazo/.ivy2/cache
The jars for the packages stored in: /home/xcollazo/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4d3617dd-6cb4-412c-bf80-23e5461d2acf;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.1 in mirrored
:: resolution report :: resolve 174ms :: artifacts dl 3ms
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-4d3617dd-6cb4-412c-bf80-23e5461d2acf
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 18:03:13 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/10/26 18:03:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/26 18:03:18 WARN Client: Same path resource file:///home/xcollazo/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.4_2.12-1.4.1.jar added multiple times to distributed cache.
23/10/26 18:03:24 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
spark.version
'3.4.1'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
                                                                                

+------+
| count|
+------+
|325526|
+------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100, truncate=False)
[Stage 3:=======================================================> (94 + 2) / 96]

+------+------------------------+
|count |referer_class           |
+------+------------------------+
|244532|internal                |
|63247 |none                    |
|15181 |external (search engine)|
|2374  |external                |
|110   |unknown                 |
|82    |external (media sites)  |
+------+------------------------+
python

@BTullis :

Maybe there is a neat solution to automate keeping a set of assembly files up-to-date, which match the shufflers.

Digged out T336513 which we had opened a while ago for the same issue. As we can see above, generating an assembly file is easy. However, for the sake of folks not linking to their own rolled files on their personal folders, I still think we could try to automate generating official ones that live under /user/spark/share/lib. Perhaps a couple docker files and some Gitlab CI could take care of this?

So we could deploy it tomorrow if you think that there is value in doing so, or we could leave it until I'm back from leave on November 6th, or someone else could deploy it while I am out next week.

This can wait until you come back. No rush. I wouldn't be able to benefit from it immediately anyway as we are prioritizing other work on this sprint.

Finally, thanks for the effort and the great documentation in this phab! I learnt a lot!

Change 964008 merged by Btullis:

[operations/puppet@production] Deploy multiple spark shufflers for yarn to production

https://gerrit.wikimedia.org/r/964008

Mentioned in SAL (#wikimedia-analytics) [2023-11-09T11:14:43Z] <btullis> deploying multiple spark shufflers to production for T344910

Deploying this change with: sudo cumin P:hadoop::common run-puppet-agent

I will remove the existing symlink /usr/lib/hadoop-yarn/lib/spark3-yarn-shuffle.jar with cumin before inintiating a rolling restart of the nodemanager processes.

Puppet has now run on all nodes.
All of the symlinks are present.

btullis@cumin1001:~$ sudo cumin A:hadoop-worker 'find /usr/lib/hadoop-yarn/lib/ -type l -name spark*|sort'
86 hosts will be targeted:
an-worker[1078-1095,1097-1156].eqiad.wmnet,analytics[1070-1077].eqiad.wmnet
OK to proceed on 86 hosts? Enter the number of affected hosts to confirm or "q" to quit: 86
===== NODE GROUP =====                                                                                                                                                                                             
(86) an-worker[1078-1095,1097-1156].eqiad.wmnet,analytics[1070-1077].eqiad.wmnet                                                                                                                                   
----- OUTPUT of 'find /usr/lib/ha...name spark*|sort' -----                                                                                                                                                        
/usr/lib/hadoop-yarn/lib/spark-3.1-yarn-shuffle.jar                                                                                                                                                                
/usr/lib/hadoop-yarn/lib/spark-3.3-yarn-shuffle.jar                                                                                                                                                                
/usr/lib/hadoop-yarn/lib/spark-3.4-yarn-shuffle.jar
/usr/lib/hadoop-yarn/lib/spark3-yarn-shuffle.jar
================                                                                                                                                                                                                   
PASS |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100% (86/86) [00:01<00:00, 51.17hosts/s]
FAIL |                                                                                                                                                                            |   0% (0/86) [00:01<?, ?hosts/s]
100.0% (86/86) success ratio (>= 100.0% threshold) for command: 'find /usr/lib/ha...name spark*|sort'.
100.0% (86/86) success ratio (>= 100.0% threshold) of nodes successfully executed all commands.

I will now remove the old symlink.

btullis@cumin1001:~$ sudo cumin A:hadoop-worker 'rm /usr/lib/hadoop-yarn/lib/spark3-yarn-shuffle.jar'
86 hosts will be targeted:
an-worker[1078-1095,1097-1156].eqiad.wmnet,analytics[1070-1077].eqiad.wmnet
OK to proceed on 86 hosts? Enter the number of affected hosts to confirm or "q" to quit: 86
===== NO OUTPUT =====                                                                                                                                                                                              
PASS |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100% (86/86) [00:01<00:00, 68.92hosts/s]
FAIL |                                                                                                                                                                            |   0% (0/86) [00:01<?, ?hosts/s]
100.0% (86/86) success ratio (>= 100.0% threshold) for command: 'rm /usr/lib/hado...yarn-shuffle.jar'.
100.0% (86/86) success ratio (>= 100.0% threshold) of nodes successfully executed all commands.

Next I will restart the yarn-nodemanager on a canary worker host.

Mentioned in SAL (#wikimedia-analytics) [2023-11-09T11:47:31Z] <btullis> restarting yarn-nodemanager service on an-worker1100.eqiad.wmnet as a canary for T344910

The startup log for the hadoop-yarn-nodemanager log looks clean.

btullis@an-worker1100:~$ tail -f /var/log/hadoop-yarn/yarn-yarn-nodemanager-an-worker1100.log
2023-11-09 11:47:02,624 INFO org.apache.hadoop.http.HttpServer2: Jetty bound to port 8042
2023-11-09 11:47:02,624 INFO org.mortbay.log: jetty-6.1.26
2023-11-09 11:47:02,688 INFO org.mortbay.log: Extract jar:file:/usr/lib/hadoop-yarn/hadoop-yarn-common-2.10.2.jar!/webapps/node to /tmp/Jetty_0_0_0_0_8042_node____19tj0x/webapp
2023-11-09 11:47:03,525 INFO org.mortbay.log: Started HttpServer2$SelectChannelConnectorWithSafeStartup@0.0.0.0:8042
2023-11-09 11:47:03,528 INFO org.apache.hadoop.yarn.webapp.WebApps: Web app node started at 8042
2023-11-09 11:47:03,532 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Node ID assigned is : an-worker1100.eqiad.wmnet:8041
2023-11-09 11:47:03,533 INFO org.apache.hadoop.util.JvmPauseMonitor: Starting JVM pause monitor
2023-11-09 11:47:03,572 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Sending out 0 NM container statuses: []
2023-11-09 11:47:03,576 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Registering with RM using containers :[]
2023-11-09 11:47:03,875 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Registered with ResourceManager as an-worker1100.eqiad.wmnet:8041 with total resource of <memory:108518, vCores:71>

Waiting to see what happens when a container gets scheduled or a job comes in for the shuffler.

Mentioned in SAL (#wikimedia-analytics) [2023-11-09T12:29:16Z] <btullis> Proceeding to roll-restart yarn nodemanagers with sudo cumin A:hadoop-worker -b 1 -s 30 'systemctl restart hadoop-yarn-nodemanager.service' for T344910

All hadoop nodemanagers have been restarted.

btullis@cumin1001:~$ sudo cumin -b 1 -s 30 A:hadoop-worker 'systemctl restart hadoop-yarn-nodemanager.service'
86 hosts will be targeted:
an-worker[1078-1095,1097-1156].eqiad.wmnet,analytics[1070-1077].eqiad.wmnet
OK to proceed on 86 hosts? Enter the number of affected hosts to confirm or "q" to quit: 86
===== NO OUTPUT =====                                                                                                                                                                                              
PASS |█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100% (86/86) [1:00:03<00:00, 41.90s/hosts]
FAIL |                                                                                                                                                                          |   0% (0/86) [1:00:03<?, ?hosts/s]
100.0% (86/86) success ratio (>= 100.0% threshold) for command: 'systemctl restar...emanager.service'.
100.0% (86/86) success ratio (>= 100.0% threshold) of nodes successfully executed all commands.
btullis@cumin1001:~$

I think we should be good to go. @xcollazo - if you have some time to test please, that would be great.
The next steps in this project, I suppose, are these:

and then the upgrade of the production version of spark in:

At least in the meantime these three shufflers should now be available for use.

I think we should be good to go. @xcollazo - if you have some time to test please, that would be great.

Hey @BTullis, haven't had a chance to test this yet, but intend to do it by Nov 14 EOD.

TL;DR: Spark 3.3.2 test successful on Production cluster.

Longer:

On stat1007.eqiad.wmnet:

create conda env:

export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org,.wikibooks.org,.wikiquote.org,.wiktionary.org,.wikisource.org,.wikispecies.org,.wikiversity.org,.wikidata.org,.mediawiki.org,.wikinews.org,.wikivoyage.org
export HTTP_PROXY=$http_proxy
export HTTPS_PROXY=$https_proxy
export NO_PROXY=$no_proxy

source /opt/conda-analytics/etc/profile.d/conda.sh
conda create -n spark33 python=3.10.8 pyspark=3.3.2 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark33
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.1

Spark assembly is available:

xcollazo@stat1007:~$ hdfs dfs -ls /user/spark/share/lib | grep 3.3.2
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
-rw-r--r--   3 hdfs  spark  278485306 2023-10-19 19:55 /user/spark/share/lib/spark-3.3.2-assembly.zip

Let's now run the usual notebook test with two SQL queries:

%env SPARK_HOME=/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
env: SPARK_HOME=/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
%env SPARK_CONF_DIR=/etc/spark3/conf
env: SPARK_CONF_DIR=/etc/spark3/conf
!echo $SPARK_HOME
!echo $SPARK_CONF_DIR
/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
/etc/spark3/conf
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_3',
        "spark.shuffle.service.port": '7338',
        "spark.yarn.archive": "hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip",
        "spark.dynamicAllocation.maxExecutors": 128,
        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)
SPARK_HOME: /home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at hadoop-client-api-3.3.2.jar
hadoop-client-runtime-3.3.2.jar, provided by Spark.
PYSPARK_PYTHON=/home/xcollazo/.conda/envs/spark33/bin/python
:: loading settings :: file = /etc/maven/ivysettings.xml
:: loading settings :: url = jar:file:/srv/home/xcollazo/.conda/envs/spark33/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xcollazo/.ivy2/cache
The jars for the packages stored in: /home/xcollazo/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-929a577d-1d4c-46a1-8d3f-5a27243889fc;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.2.1 in wmf-archiva
:: resolution report :: resolve 191ms :: artifacts dl 8ms
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-929a577d-1d4c-46a1-8d3f-5a27243889fc
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/7ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/14 18:13:14 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/11/14 18:13:18 WARN Client: Same path resource file:///srv/home/xcollazo/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.3_2.12-1.2.1.jar added multiple times to distributed cache.
23/11/14 18:13:24 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
spark.version
'3.3.2'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
[Stage 3:>                                                          (0 + 1) / 1]

+-----------+
|      count|
+-----------+
|11522526744|
+-----------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100)
[Stage 6:>                                                          (0 + 3) / 3]

+----------+--------------------+
|     count|       referer_class|
+----------+--------------------+
|9068891148|            internal|
|1869245312|                none|
| 431587995|external (search ...|
| 146270138|            external|
|   3888571|             unknown|
|   2643580|external (media s...|
+----------+--------------------+

TL;DR: Spark 3.4.1 test successful on Production cluster.

Longer:

Spark 3.4.1:

On stat1007.eqiad.wmnet:

create conda env:

export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org,.wikibooks.org,.wikiquote.org,.wiktionary.org,.wikisource.org,.wikispecies.org,.wikiversity.org,.wikidata.org,.mediawiki.org,.wikinews.org,.wikivoyage.org
export HTTP_PROXY=$http_proxy
export HTTPS_PROXY=$https_proxy
export NO_PROXY=$no_proxy

source /opt/conda-analytics/etc/profile.d/conda.sh
conda create -n spark34 python=3.10.8 pyspark=3.4.1 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark34
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.1

There is no spark-assembly in production cluster yet for this one, so let's create it off of the pyspark bundle (similar to T340861#9136895):

cd /home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark/jars
zip -r ~/artifacts/spark-3.4.1-assembly.zip .
hdfs dfs -copyFromLocal ~/artifacts/spark-3.4.1-assembly.zip /user/xcollazo/artifacts
hdfs dfs -chmod +r /user/xcollazo/artifacts/spark-3.4.1-assembly.zip

Note the above assembly *does not include Iceberg*, so we have to pull it from ivy, just like we have been doing elsewhere. I think this is preferable anyway so that we can also pick and choose the iceberg version more easily, as Iceberg should be back and forward compatible, and also, our current prod iceberg version of 1.2.1 is simply not available for Spark 3.4.X ( https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/ ).

And now, notebook (and again excuse the formatting from exporting notebook to markdown):

%env SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
env: SPARK_HOME=/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
%env SPARK_CONF_DIR=/etc/spark3/conf
env: SPARK_CONF_DIR=/etc/spark3/conf
!echo $SPARK_HOME
!echo $SPARK_CONF_DIR
/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
/etc/spark3/conf
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_4',
        "spark.shuffle.service.port": '7339',
        "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.4.1-assembly.zip",
        "spark.dynamicAllocation.maxExecutors": 128,
        ##
        # extras to make Iceberg work on 3.4.1:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)
SPARK_HOME: /home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at hadoop-client-api-3.3.4.jar
hadoop-client-runtime-3.3.4.jar, provided by Spark.
PYSPARK_PYTHON=/home/xcollazo/.conda/envs/spark34/bin/python
:: loading settings :: file = /etc/maven/ivysettings.xml
:: loading settings :: url = jar:file:/srv/home/xcollazo/.conda/envs/spark34/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xcollazo/.ivy2/cache
The jars for the packages stored in: /home/xcollazo/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b86bcb8b-3788-49f6-b91d-3fcc6ca15b68;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.1 in mirrored
downloading https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.1/iceberg-spark-runtime-3.4_2.12-1.4.1.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.1!iceberg-spark-runtime-3.4_2.12.jar (3035ms)
:: resolution report :: resolve 6807ms :: artifacts dl 3041ms
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1   |   1   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b86bcb8b-3788-49f6-b91d-3fcc6ca15b68
	confs: [default]
	1 artifacts copied, 0 already retrieved (28642kB/48ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/14 18:30:03 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/11/14 18:30:07 WARN Client: Same path resource file:///srv/home/xcollazo/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.4_2.12-1.4.1.jar added multiple times to distributed cache.
23/11/14 18:30:15 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
spark.version
'3.4.1'
spark.sql("""
SELECT count(1) as count
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
""").show(100)
[Stage 3:>                                                          (0 + 1) / 1]

+-----------+
|      count|
+-----------+
|11522526744|
+-----------+
spark.sql("""
SELECT count(1) as count, referer_class
FROM wmf.webrequest
WHERE year = 2023
  AND month = 10
  AND day = 13
GROUP BY referer_class
ORDER BY count DESC
""").show(100)
                                                                                

+----------+--------------------+
|     count|       referer_class|
+----------+--------------------+
|9068891148|            internal|
|1869245312|                none|
| 431587995|external (search ...|
| 146270138|            external|
|   3888571|             unknown|
|   2643580|external (media s...|
+----------+--------------------+
python

Thanks for all the work to make this one happen @BTullis !

This unblocks a path to production for the Dumps 2.0 effort! 🎉