Page MenuHomePhabricator

[SPIKE] analytics-airflow jobs development
Closed, DuplicatePublic

Description

An airflow instance named analytics-test is running on an-test-coord1001.eqiad.wmnet. We should start attempting to create airflow jobs for some tasks in this spreadsheet.

For now, dags for airflow-analytics-test can be manually placed in /srv/airflow-analytics-test-dags.

Event Timeline

Steps to create a local conda airflow development environment:

  1. Install miniconda. We're using python 3.7 on the cluster, so let's use that for our dev env too.

Download one of the py37 installers from https://repo.anaconda.com/miniconda/ and then install it following instructions at https://docs.conda.io/en/latest/miniconda.html:

CONDA_ENV=/Users/otto/Projects/wm/analytics/airflow-conda-env
bash ./Miniconda3-py37_4.9.2-MacOSX-x86_64.sh -b -p $CONDA_ENV
# activate the conda env
source $CONDA_ENV/bin/activate

# Set AIRFLOW_HOME
export AIRFLOW_HOME=$(pwd)/airflow-dev
# Create dags folder
mkdir -p $AIRFLOW_HOME/dags
  1. Follow instructions at https://airflow.apache.org/docs/apache-airflow/stable/start/local.html to install and run airflow. Configs, dags and sqlite db will be installed in $AIRFLOW_HOME.

You can avoid creating a user if you set the public role to Admin before starting the airflow webserver.

echo "AUTH_ROLE_PUBLIC = 'Admin'" > $AIRFLOW_HOME/webserver_config.py

Following up from our airflow hang yesterday, here's a working plugin import (based off https://stackoverflow.com/a/66479399/1636613)

From $AIRFLOW_HOME:
plugins/hdfs_plugin/__init__.py:

from airflow.plugins_manager import AirflowPlugin
from os.path import basename
from pyarrow.fs import HadoopFileSystem


# This is just an hdfs ls helper.
# We can maybe decide to refactor this into our existing Hdfs class?
def hdfs_ls(base_path):
    hdfs = HadoopFileSystem()
    return [basename(path) for path in hdfs.ls(base_path)]


class HdfsPlugin(AirflowPlugin):

    name = "hdfs_plugin"

    macros = [
        hdfs_ls,
    ]

dags/hdfs_test_dag.py (note that the import leaves out hte parent plugins directory):

from hdfs_plugin import hdfs_ls

@mforns @razzi and I were able to get a working test of LocalExecutor + pyarrow.fs.HadoopFileSystem in a DAG working:

"""
Create 10 tasks that use HadoopFileSystem that should
run in parallel.
"""

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from datetime import timedelta
from airflow.utils.dates import days_ago
from pyarrow.fs import HadoopFileSystem
import time
import random
import os

def hdfs_classpath(hdfs_executable='hdfs'):
    """
    Get hdfs CLASSPATH
    """
    stream = os.popen(f'{hdfs_executable} classpath --glob')
    return stream.read().strip()

def hdfs_test(p):
    """
    Connects to HDFS and gets info for /tmp a couple of times
    with random sleeps in between.
    """
    # pyarrow HadoopFileSystem needs us to set CLASSPATH properly.
    os.environ['CLASSPATH'] = hdfs_classpath()

    time.sleep(random.randint(1, 10))
    hdfs = HadoopFileSystem.from_uri('hdfs://analytics-test-hadoop/')
    hdfs.get_file_info('/tmp')
    time.sleep(random.randint(1, 10))
    hdfs.get_file_info('/tmp')

with DAG(
    "hdfs_test01",
    schedule_interval=timedelta(hours=1),
    start_date=days_ago(1),
) as dag:
    dag.doc = __doc__

    end = DummyOperator(task_id='end')

    # Declare 10 pyarrow hdfs tasks that run in parallel
    for i in range(10):
        task = PythonOperator(
            task_id="hdfs_test-%s" % i,
            python_callable=hdfs_test,
            op_args=[dir],
        )
        task >> end

That's good news, because in Airflow one our tests of this didn't work right. See also https://gerrit.wikimedia.org/r/c/analytics/refinery/+/597623

I got ProduceCanaryEvents running nicely in Airflow using JPype and wikimedia-event-utilities; so I didn't have to write an EventStreamConfig client in Python!

https://github.com/ottomata/analytics-airflow-spike

The dag queries EventStreamConfig API at parse time, then generates a PythonOperator task for each discovered event stream.

The PythonOperator task then uses wikimedia-event-utilties to produce canary events for the stream it is supposed to handle.

I then get the nice visual representation of the produce canary events job!

Screen Shot 2021-06-10 at 17.19.58.png (662×1 px, 167 KB)

Ottomata added a project: Analytics-Kanban.
Ottomata moved this task from Next Up to In Progress on the Analytics-Kanban board.

Change 709044 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/puppet@production] airflow::instance - use force => true when ensuring directories

https://gerrit.wikimedia.org/r/709044

Change 709044 merged by Ottomata:

[operations/puppet@production] airflow::instance - use force => true when ensuring directories

https://gerrit.wikimedia.org/r/709044