[Placeholder Text]
Description
Status | Subtype | Assigned | Task | ||
---|---|---|---|---|---|
Resolved | BTullis | T296527 Deploy DE Production Instance of Airflow | |||
Resolved | Ottomata | T296543 Tooling for Deploying Conda Environments |
Event Timeline
In today's Airflow sync meeting, Marcel and Fabian and I discussed how we can support deploying anything a DAG might need to run. We've previously mostly just considered conda environments and jars to contain dependencies that are needed to run either SQL queries or Spark jobs.
However, we are really just using conda envs because we don't have the ability to run Docker images in Hadoop. If we could use Docker, we'd package everything the job needed to run inside of that docker image, both job specific code or SQL, as well as big dependencies like Spark. The docker image would then be fully usable to run the job, and we'd just use airflow DAGs to schedule running an executable inside of that Docker image.
Which begs the question...why not do the same with a conda env?
Idea: Instead of figuring out how to deploy job code (SQL, Spark, whatever) separately from dependencies, why not just do it all together in a conda env. This would treat the conda env like a docker image or fat job jar.
Team's job code would live in their own repository, with nothing airflow related in it. If this job repo was a python module (setup.py or whatever) with requirements.txt dependencies, then our conda env tooling could just do the following once a commit is merged in the job repo:
- create conda env
- pip install .
- conda pack
- upload to archiva.
Then, instead of having a Spark or Skein aware Airflow operator wrappers, we'd have a CondaExecOperator that could do the following:
- Get the conda env artifact
- unpack it locall
- launch an executable inside the conda env
For Spark, the executable would be a special SparkSubmit wrapper (wmfdata?) that would know how to run spark-submit to use the conda env artifact and launch a specific spark job. Or, it would be a special Hive wrapper that would know how to do the same and run some SQL.
Pros:
- Abstract away the differences between how we deploy SQL, Spark, or Skein job code.
- Each job could (maybe?) use its own Spark version
Cons:
- Harder to reason about for developers
- Deployed code is always inside of an artifact (this is true of JVM world anyway)
- too crazy?
- Get the conda env artifact
- unpack it locall
- launch an executable inside the conda env
Would this happen in the Airflow instance or in the cluster?
Likely on the airflow instance, as often the executable would probably be a wrapper that runs spark-submit.
I can imagine this could be problematic if the conda-envs are big, leaading to a lot of different big environements being stored on the airflow machine. If we can make those small, why not :)
So, I'm looking into how to automate generating these conda envs.
I'm trying to figure out how to best integrate conda envs with regular python package dependency systems, and it indeed gets complicated.
A tricky thing I had to deal with when creating the airflow debian package (based on conda), was dealing with both conda packages and pip packages. Conda is really useful for including the python binary with the env, as well as other binary dependencies, which virtualenvs+pip cannot do. However, conda packages are not always 1:1 available with pip packages, and pip seems to be much more common and complete. Perhaps we should adopt the following convention:
- Use conda for python and other binary dependencies
- Use pip inside of a conda environment for all other python dependencies.
Then, the steps to build a conda env (in a docker container?) would be something like:
- install miniconda
- conda create -f enviroment.yml # environment.yml is like conda's requirements.txt
- ...conda-pack, etc.
- upload to archiva
- On airflow-dags deploy, sync conda env artifact
- On usage of conda env in YARN, conda-unpack
To create environment.yml, we'll need a clean conda env with dependencies created from either pyproject.toml (poetry) or requirements.txt (setuptools). I'd like to automate this too, but I'm not quite sure how yet. TBD.
Or, hm, if we use pip for all other python dependencies, then a basic python only conda env could just use a requirements.txt file instead of a conda environment.yml file. Oh ho, yes, then conda environment.yml files would not specify much more than the python version to use, and other requirements would just be pip installed as usual.
Experimental Dockerfile that does this here:
https://gist.github.com/ottomata/2fd842a1b3d323579dc9ebe88be724ef
@gmodena let's sync up and see if something like this would work. I think you have some insights from working on your docker builder too.
Hi @Ottomata,
Apologies for the late reply. I like the multi-stage approach! Also, super neat that you remove the compiler toolchain from the packed env. That should shave some MBs.
Mixing conda and pip packages is tricky, tbh we should see when/where this breaks. Did you maybe have a look at the pip_interop_enabled setting? Seems promising to me, but I've got no experience with it (and it's experimental). FWIW I turned it on in our build toolchain to find out how it works.
Did you consider unifying package management and express pip+conda deps in the same environment.yml file? I did initially but settled on keeping distinct files, because I (maybe wrongly) assume that folks would use requirements.txt anyway. I wonder if that's the right convention to promote though. I'd be curious to hear your thoughts.
let's sync up and see if something like this would work
IMHO this would work provided we can use the image in Gitlab.
Our build toolchain relies on a base miniconda image, but performs conda install ... and conda-pack steps at run time in a container. This is wasteful but was allows us to support:
- Builds in a container (e.g. cross-build linux x86 venvs on macOS).
- Builds in a native environment where conda is available (e.g. systems where docker is not available, or that are already containerised. For example, inside a Github Action, a Gitlab CI container or a stat node).
Basically we use docker only to provide a thin conda layer, but the venv build is orthogonal to it. Would you consider moving some of the conda_build_env logic to a script and allow reuse outside of the image?
Did you consider unifying package management and express pip+conda deps in the same environment.yml file?
I did, but then decided we could support it all! If environment.yml contains pip stuff, then it'll be installed too.
Would you consider moving some of the conda_build_env logic to a script and allow reuse outside of the image?
Yeah been thinking about that too.
https://etherpad.wikimedia.org/p/airflow
I think that would be ideal: scripts that could build the conda env outside of docker, and then docker build steps just use those scripts. However, I found that some of the build steps might be OS dependent (unless we figure out how to vary them correctly). E.g. I found I needed to explicitly install the gcc_linux-64 gxx_linux-64 packages for gcc and g++ when needed. We'd have to make the conda build scripts know which packages to correctly install if needed (if no gcc/g++ already existed?). When including wmfdata as a a dependency, I got errors about missing krb5-config for kerberos support. This is just another case of having to make the conda build scripts do the right thing based on the current OS. (I solved this by installing the krb5 anaconda package, but I could have possibly just installed the krb5-config debian package in the docker build too).
Ok, more findings, this time about how to run a python function in a packaged conda environment as a Spark job without having that conda env and python function locally on the Airflow worker.
For Java/Scala Spark jobs, we launch spark like:
spark-sumbit ... --class org.wikimedia.MySparkJob my-jobs-0.1.0.jar jobArg1 jobArg2 ...
my-jobs-0.1.0.jar can exist anywhere as long as Spark can access the filesystem it is on. I.e. hdfs:///user/otto/my-jobs-0.1.0.jar works just as well as ./my-jobs-0.1.0.jar. Spark will just use that jar to launch the main --class.
For Python, spark-submit expects a local python file to exist, which will eventually instantiate your SparkSession and run your spark job. It would nice if spark-submit could work with a conda env like it does with jar files. If we have a packed conda env cached in HDFS, why can't we instruct spark to call a function in it to launch the SparkSession? If we could do this, we wouldn't need any local copy of the actual spark job code on the airflow workers.
I was able to do this today! I had to make a custom python call wrapper script (this was Marcel's idea!) to do this, but once done, we can call any python function we want that is within a PYTHONPATH.
If this lives on the airflow workers, or even better just somewhere in HDFS, then we can launch python spark jobs in yarn cluster modes out of a packed conda environment like this:
PYSPARK_PYTHON=conda_env/bin/python spark2-submit --master yarn --deploy-mode cluster --archives=hdfs:///user/otto/conda_env.tgz#conda_env hdfs:///user/otto/call.py 'test_project.spark:main'
- hdfs:///user/otto/conda_env.tgz is a path to a packed conda env
- hdfs:///user/otto/call.py is the path to call.py that spark-submit will run
- 'test_project.spark:main' is the python callable name à la setuptools entry_points or poetry tool.poetry.scripts. This is the job code that has been installed into the conda_env.
spark-submit will run call.py (it has to end in .py or SparkSubmit will not realize it is python!), and call.py will then import 'test_project.spark' and run the main function, which should instantiate a SparkSession.
Another idea might be to use Skein to launch spark-submit on a YARN worker, rather than running it on the Airflow worker. I haven't tried this yet, but it would look like:
CondaExecOperator(conda_env, command) -> skein unpacking conda_env and running command on a yarn worker, where command is spark-submit (with PYSPARK_PYTHON) set appropriately
PYSPARK_PYTHON=conda_env/bin/python spark2-submit --master yarn --deploy-mode cluster --archives=hdfs:///user/otto/conda_env.tgz#conda_env hdfs:///user/otto/call.py 'test_project.spark:main'
This looks great!
I've been able to build a packed and stacked (stack 'N' pack?) conda env on to of anaconda-wmf. This allows us to build the conda env without including dependencies already available on all of the hadoop workers.
https://gist.github.com/ottomata/2fd842a1b3d323579dc9ebe88be724ef#file-dockerfile-anaconda-wmf
Next up is to unify all these build stuff into a single script and/or templated Dockerfile generator in workflow_utils.
Update. Stuff not quite ready for review, but I've done a lot of bootstrapping work in https://gitlab.wikimedia.org/otto/workflow_utils/.
- conda.py module with a conda-dist CLI that will bootstrap and build a packed conda env. No docker here.
- Dockerfile-conda-dist that builds a docker image with conda and workflow_utils installed
- Will next make a short wrapper that can run the docker image with a project path mounted, call conda-dist, and generate a packed conda_env.tgz file for uploading to archiva or elsewhere.
The Q is if I can upload the docker image somewhere useful. This def not a blubber/wikimedia-registry compatible docker image. If not, I could make a script that will aide in building the image locally using Dockerfile-conda-dist, and then running it.
So many bootstrapping Qs!
Update: https://gitlab.wikimedia.org/otto/workflow_utils/
- Have been improving and experimenting with conda-dist built envs.
- I can successfully build conda dist envs with and without python, with and without pyspark (different versions) and run them in yarn cluster mode via call.py, with *almost* no local files needed.
- However, to run different spark versions, the launcher (e.g. airflow scheduler) needs to have a pyspark local, e.g. to be able to shell out to spark-submit.
- I don't see how it will be possible to support different versions of spark without those dependencies available and unpacked on the airflow scheduler.
- Unless...we always use skein.
Idea:
- Create an Airflow SkeinShellOperator that can optionally use packed conda envs to launch any shell command.
- To run a custom spark version, we'd launch a SkeinShellOperator to run spark-submit out of an unpacked conda env on the yarn worker.
- Something like:
SkeinShellOperator( exec='my-conda-env/bin/spark-submit' files='hdfs:///archives/cache/my-conda-env.tgz' ... )
We could extend this and make our SparkSubmitOperator always use the SkeinShellOperator to be sure that spark-submit is always run from a worker node.
This should be nicely dooable with with Skein Python API.
Example of launching a custom spark version from a packed conda env using skein:
myproject_spark_yarn_cluster = skein.Master( resources=skein.Resources(memory='2 GiB', vcores=1), files={ 'conda_env': 'hdfs:///user/otto/c5.tgz', }, script=("export SPARK_DIST_CLASSPATH=$(hadoop classpath) && conda_env/bin/spark-submit --master yarn --deploy-mode cluster --driver-java-options='-Dhttp.proxyHost=http://webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=http://webproxy.eqiad.wmnet -Dhttps.proxyPort=8080' --conf spark.yarn.maxAppAttempts=1 --archives hdfs:///user/otto/c5.tgz#conda_env hdfs:///user/otto/call.py --prefix conda_env 'myproject.spark.main'"), env = { 'REQUESTS_CA_BUNDLE': '/etc/ssl/certs', 'HADOOP_CONF_DIR': '/etc/hadoop/conf', 'SPARK_HOME': 'conda_env/lib/python3.7/site-packages/pyspark', 'PYSPARK_PYTHON': 'conda_env/bin/python', } ) app_id = client.submit(skein.ApplicationSpec(master=myproject_spark_yarn_cluster))
This creates 2 Yarn applications. The Skein ApplicationMaster just runs the spark-submit command. spark-submit then creates a new Spark Yarn application in yarn cluster mode.
In hindsight, this is what Oozie always did too. The oozie server process would create Yarn applications that handled running launching the oozie actions like hive queries or spark jobs.
End of day update:
I have been successful in automating the generation of different types of conda dist envs that use spark:
- Custom python & pyspark versions
- Custom python versions with pyspark provided
- Custom pyspark versions with python provided
- Just job code and dependencies, with pyspark and python provided.
I believe (although I need to try again more thoroughly to be sure) that I have successfully been able to run spark jobs with all three types of environments in local and yarn client mode, but not all of them yet in yarn cluster mode.
Specifically, the case I'd really like to support is the latter of the 4. Much of the time, I expect folks will want to just write jobs that can use anaconda-wmf and perhaps /usr/lib/spark2 to satisfy python and pyspark dependencies. This will keep their conda dist envs small (hundreds of MB smaller), as they won't need to include python and pyspark. However, I'm trying quite hard to make this work and am getting a little lost. I'm not sure why this works in client mode but not cluster mode. I believe it has to do with the call.py wrapper and how PYTHONPATH and/or sys.path is handled by spark executors. Maybe something is funky between our debian based deployment of spark in /usr/lib/spark2 and how the call.py.
...
While typing ^, I got an invocation of yarn cluster mode with no spark and python in conda env to work! Here's what I did:
PYTHONPATH=/usr/lib/anaconda-wmf/lib/python3.7/:/usr/lib/anaconda-wmf/lib/python3.7/site-packages:/usr/lib/spark2/spark-2.4.4-python3.7.zip PYSPARK_PYTHON=/usr/lib/anaconda-wmf/bin/python REQUESTS_CA_BUNDLE=/etc/ssl/certs spark2-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=1 --archives hdfs:///user/otto/c3.tgz#conda_env ~/call.py --prefix=conda_env myproject.spark.main
This uses /usr/lib/anaconda-wmf/bin/python and /usr/lib/spark2, but runs job code out of the packed conda dist env in hdfs dfs:///user/otto/c3.tgz.
However, I shouldn't have to hack PYTHONPATH like that. In my job code, I seem to have to propagate PYTHONPATH to spark master and executors, otherwise it can't find myproject.spark.main (in my conda env). call.py with --prefix should take care of that for the master, and it seems to work fine for the workers in client mode, just not in cluster mode. In cluster mode, for some reason I have to mangle PYTHONPATH directly and propagate it to spark via spark confs.
I keep feeling like I'm SO close to getting this all to work...but it is really taking a lot of time...perhaps too much.
Alright, figured some stuff out (thanks Marcel and Joseph for the brian bounces).
The problem is between call.py and the serialization of the spark code for the executors. call.py handles making sure python sys.path will work with the conda dist env in the driver, but the executors no nothing about the sys.path context of the driver.
We tried working around this by setting and propagating PYTHONPATH to the executors via spark.executorEnv.PYTHONPATH, but this caused some other crazy symptom where the executors weren't able to load pyspark anymore. Joseph and I think there might still be a way to make this work, but at this point I think it is too hacky and error-prone to continue trying. call.py is nice and only messes with python load paths; to propagate PYTHONPATH to executors we have to control the user's SparkSession somehow, perhaps by forcing everuyone to use a custom spark API wrapper, which is going too far IMO :)
So, the conclusion here is that a python-less conda dist environment is just too hard. The size of my test spark job's conda dist env without python and without spark was only about 800K. Once we add python in, each conda dist env grows to around at least 300M. Adding Spark brings them closer to 450M.
We could probably still make provided dependencies like Spark work by stacking the conda dist envs on top of anaconda-wmf. However, to do this, the conda dist envs would have to be prepared in such a way that the stacking is 'baked in'. I.e. we can't control WHERE the conda dist env is stacked on at runtime. This makes generated conda dist envs a little brittle and only useable in the analytics cluster. (stacking is done by adding a .pth file into the conda env)
At this point, to simplify, I think we should support only standalone conda envs. Provided dependencies would not be supported. (I suppose this is really more like docker images anyway; the images always have all their dependencies baked in :p).
Not supporting any provided depenencies will make all of the dists totally standalone. This will be much easier from a debugging and maintenance perspective; but comes at the cost of having to deploy large artifacts. This might be a problem if folks try to make new conda dist envs for every little job out there. In practice, I think people will treat them more like we treat refinery: one big repo with jobs to generate a conda dist env, and which job is run out of the conda dist env is configured by airflow.
Okay, got some tests in and created a merge request:
https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils/-/merge_requests/4
It is still a little bit WIP, but is ready to be reviewed.
@mforns check it out, and lets talk tomorrow :)
CC @EChetty too (I wasn't sure how to tag you on gitlab).
Been experimenting with how we might create Airflow + Skein + Spark submit integration:
Here's an example SkeinSparkSubmitOperator that leverages the upstream provided (currently hacked) SparkSubmitHook to get the proper spark-submit command to run.
This works with the provided /usr/lib/spark2, as well as a custom python and pyspark in a conda dist env.
If we use skein, then we don't need call.py.
Okaayyyyyy! I've got a WIP branch going that does some nice stuff with Skein and Spark:
https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/7
This adds 2 new hooks and 2 new operators:
- SparkSubmitHook: Wraps the provided apache SparkSubmitHook to avoid using connections (why would we use connections with spark? this is a little weird, it should be per task, no?) and to also support --driver-java-options and --driver-cores.
- SkeinSubmitHook: Submits a skein application to YARN, and manages waiting for it to finish and to get the ApplicationMaster logs into Airflow logs after the job is done.
- SkeinSubmitOperator: Wraps PythonOperator to use SkeinSubmitHook to submit the Skein job to YARN.
- SkeinSparkSubmitOperator: Uses SparkSubmitOperator to get the spark-submit command to run, and then submits that as a YARN job.
hooks/skein.py also has a handy SkeinAppSpecBuilder with a from_spark_submit_hook method, which allows for auto generating the Skein ApplicationSpec to run from an instantiated SparkSubmitHook, including nicely infering resources, files, env vars, and script to run.
Still needs more work but the general idea is there. Let me know what you think!
Lots of work been happening. Talked with @JAllemandou about some keytab stuff we still have to figure out:
I can make Skein + spark-submit + keytabs work if I upload the keytab to the Skein job's YARN AppMaster, and then use the --keytab and --principal spark-submit params.
I feel like I should be able to make it work without uploading the keytab though. I can use my cached kerberos ticket on stat1004 to run skein and interact with Hadoop just fine. I can hdfs dfs -ls, etc. I can also do this by providing a keytab to skein. However, I cannot interact with Hive. If I try to talk to Hive from something launched via Skein (like the hive CLI, or spark-submit with a Spark SQL job), I get the good ol' GSSException.
I guess this makes sense, skein provides the kerberos credential via the usual Hadoop APIs, and further interactions with Hadoop from the Yarn AppMaster work fine because Hadoop had my kerberos credentials when the app was started (...right?). However, Hive is separate and needs to somehow be told to use those same credentials?
I guess spark-submit locally handles this somehow, because it has access to the ticket cache in the local filesystem and can forward them when working with Hive? But...how does that work in cluster mode?
@JAllemandou, https://spark.apache.org/docs/latest/security.html#using-a-keytab indicates that the usual way to use keytabs with Spark uploads them to the Yarn app anyway. Skein and Spark use the same Yarn mechanism to do this...so maybe if Spark does it, it is ok for us to do it with skein too?
Beautiful words from Luca re uploading keytab:
ottomata: yes it should be good, everything is encrypted and authenticated
:)
Expanding this to keep archives happy :D
- the Hadoop RPC used to upload the keytab is authenticated and traffic is encrypted in transit
- the yarn cache that receives the keytab is isolated for each user/yarn-app combination (so it is not shared among users etc.. not even shared for subsequent jobs ran by the same user)
- Spark uses the same mechanism to upload keytabs to the yarn cache and renew Hadoop tokens if needed (for long running jobs)