Page MenuHomePhabricator

[SPIKE] analytics-airflow jobs development
Open, HighPublic


An airflow instance named analytics-test is running on an-test-coord1001.eqiad.wmnet. We should start attempting to create airflow jobs for some tasks in this spreadsheet.

For now, dags for airflow-analytics-test can be manually placed in /srv/airflow-analytics-test-dags.

Event Timeline

Steps to create a local conda airflow development environment:

  1. Install miniconda. We're using python 3.7 on the cluster, so let's use that for our dev env too.

Download one of the py37 installers from and then install it following instructions at

bash ./ -b -p $CONDA_ENV
# activate the conda env
source $CONDA_ENV/bin/activate

export AIRFLOW_HOME=$(pwd)/airflow-dev
# Create dags folder
mkdir -p $AIRFLOW_HOME/dags
  1. Follow instructions at to install and run airflow. Configs, dags and sqlite db will be installed in $AIRFLOW_HOME.

You can avoid creating a user if you set the public role to Admin before starting the airflow webserver.


Following up from our airflow hang yesterday, here's a working plugin import (based off


from airflow.plugins_manager import AirflowPlugin
from os.path import basename
from pyarrow.fs import HadoopFileSystem

# This is just an hdfs ls helper.
# We can maybe decide to refactor this into our existing Hdfs class?
def hdfs_ls(base_path):
    hdfs = HadoopFileSystem()
    return [basename(path) for path in]

class HdfsPlugin(AirflowPlugin):

    name = "hdfs_plugin"

    macros = [

dags/ (note that the import leaves out hte parent plugins directory):

from hdfs_plugin import hdfs_ls

@mforns @razzi and I were able to get a working test of LocalExecutor + pyarrow.fs.HadoopFileSystem in a DAG working:

Create 10 tasks that use HadoopFileSystem that should
run in parallel.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from datetime import timedelta
from airflow.utils.dates import days_ago
from pyarrow.fs import HadoopFileSystem
import time
import random
import os

def hdfs_classpath(hdfs_executable='hdfs'):
    Get hdfs CLASSPATH
    stream = os.popen(f'{hdfs_executable} classpath --glob')

def hdfs_test(p):
    Connects to HDFS and gets info for /tmp a couple of times
    with random sleeps in between.
    # pyarrow HadoopFileSystem needs us to set CLASSPATH properly.
    os.environ['CLASSPATH'] = hdfs_classpath()

    time.sleep(random.randint(1, 10))
    hdfs = HadoopFileSystem.from_uri('hdfs://analytics-test-hadoop/')
    time.sleep(random.randint(1, 10))

with DAG(
) as dag:
    dag.doc = __doc__

    end = DummyOperator(task_id='end')

    # Declare 10 pyarrow hdfs tasks that run in parallel
    for i in range(10):
        task = PythonOperator(
            task_id="hdfs_test-%s" % i,
        task >> end

That's good news, because in Airflow one our tests of this didn't work right. See also

I got ProduceCanaryEvents running nicely in Airflow using JPype and wikimedia-event-utilities; so I didn't have to write an EventStreamConfig client in Python!

The dag queries EventStreamConfig API at parse time, then generates a PythonOperator task for each discovered event stream.

The PythonOperator task then uses wikimedia-event-utilties to produce canary events for the stream it is supposed to handle.

I then get the nice visual representation of the produce canary events job!

Screen Shot 2021-06-10 at 17.19.58.png (662×1 px, 167 KB)

Ottomata added a project: Analytics-Kanban.
Ottomata moved this task from Next Up to In Progress on the Analytics-Kanban board.

Change 709044 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/puppet@production] airflow::instance - use force => true when ensuring directories

Change 709044 merged by Ottomata:

[operations/puppet@production] airflow::instance - use force => true when ensuring directories