Page MenuHomePhabricator

Replace Airflow's HDFS client (snakebite) with pyarrow
Closed, DeclinedPublic

Description

Airflow's HDFS integration uses snakebite, a python HDFS client. The original authors of snakebite never ported it to python3. Internet Archive took over with minimal resources and created a snakebite-py3 version, which with work from @elukey Airflow was able to use. However, the encryption and kerberos support in snakebite-py3 does not work 100%. Luca was able to get it close, but the final blocker was never resolved: Snakebite doesn't work with HDFS RPC encryption.

The real solution would be to replace Airflow's use of snakebite with a better HDFS client: pyarrow.
We should accomplish https://issues.apache.org/jira/browse/AIRFLOW-2697 and push the changes upstream.

From a quick read of the code, think this shouldn't be too difficult to do.

Event Timeline

Ottomata updated the task description. (Show Details)

See https://phabricator.wikimedia.org/T284172#7144227 for an example of how to use pyarrow.fs.HadoopFileSystem to connect to HDFS (I did not find the pyarrow docs obvious).

odimitrijevic triaged this task as Medium priority.
odimitrijevic moved this task from Incoming to Smart Tools for Better Data on the Analytics board.

A PR already exists in airflow to do this. We could reopen it and follow up:

https://github.com/apache/airflow/pull/3560/files

We've been looking into pyarrow + airflow a little bit in general, and I need a place to park some findings, so here it is!

pyarrow is not thread safe, and Airflow's LocalExecutor uses python multiprocessing Queues to execute tasks. The task itself is either executed by via forking (the default) and then calling the relevant airflow function, or by calling subprocess.check_call with an airflow tasks run ... command. I had originally thought that by setting execute_tasks_new_python_interpreter=True, the subprocess.check_call method would be used and we'd avoid the deadlock we sometimes see. It seems this doesn't work either. I believe the fact that multiprocessing is used by LocalExecutor means that we cannot use pyarrow from within the scheduler process at DAG parse time.

We CAN use pyarrow with LocalExecutor inside of tasks. As long as the pyarrow HadoopFileSystem is instantiated inside of the task (i.e. in the forked process, or in the new python process), everything is fine. We get a deadlock if we use pyarrow in the DAG itself and also in tasks. (Strangely we don't seem to get a deadlock if we only use pyarrow in the DAG, but not in tasks...which I don't understand).

So, this means using pyarrow HadoopFileSystem for something like an HDFS Sensor (which is a special type of Operator) should be fine. But using pyarrow to dynamically generate DAGs (like we wanted to do for Refine) won't be possible, at least not with LocalExecutor or SequentialExecutor.

So TBD what to do. If we really want to use pyarrow to generate the Refine DAGs, we'll probably need to use CeleryExecutor. This should be easy, as we'd just run celery on the airflow node, (not with distributed workers). However, it will make testing Refine DAGs hard, as you won't be able to run them inside of a airflow development instance scheduler.