Page MenuHomePhabricator

Test the dbt+skein approach to running dbt Spark jobs in K8s
Closed, ResolvedPublic

Description

Perform a proof-of-concept exercise of having a test dbt job running in the test-k8s Airflow cluster using a customized SimpleSkeinOperator. We can supply the Skein operator with the conda-analytics Python environment that comes equipped with a dbt installation,

Success is:

  • A simple SELECT * FROM x dbt model in the dbt-jobs repository
  • dbt-jobs repository files manually uploaded to a location on HDFS
  • A test Airflow DAG consisting of a single SimpleSkeinOperator performing the following:
    1. Download dbt-jobs repository from HDFS to local disk
    2. Activate the conda-analytics environment
    3. Run the new dbt model
  • Observe the Airflow logs of this test DAG and ensure that the operator performed correctly

Event Timeline

The dbt+skein test as outlined in this ticket has been performed successfully: https://airflow-test-k8s.wikimedia.org/dags/test_dbt_skein_dag/

Some observations:

  • At first I thought it would be good to include the dbt update and dbt deps in the SimpleSkeinOperator scripted workflow, to keep things up to date, especially the dbt deps command which updates the extra packages used by the dbt-jobs project. This is however not possible due to networking limitations from within Kubernetes pods.
  • Because of the previous point, dbt in conda-analytics needs to be fully up-to-date on any dbt packages we want to support. Unfortunately, packages are defined in packages.yml which is in the dbt-jobs repository, so there’s a bit of discrepancy because we will be building dbt in conda-analytics using a file from dbt-jobs.
  • As far as the Airflow operator workflow goes, it uses the SimpleSkeinOperator to:
    • Pull the dbt-jobs repository from HDFS down to the execution k8s pod filesystem, into a temporary folder.
    • Runs the specified dbt model using dbt run -s model_name
    • Removes the temporary dbt-jobs folder.
      • NOTE This requires the latest dbt-jobs state to be up to date in the HDFS cache, we can use Blunderbuss to ensure cache freshness.
  • SimpleSkeinOperator supports overriding the default YARN resource configuration e.g. executor memory, number of vcores etc. However dbt itself also can be configured to request specific spark cores, memory, partitions etc. via profiles.yml file in the dbt-jobs repository. The interplay of these two configs needs to be investigated further, as it’s not clear if what we define in profiles.yml would be overridden by the SimpleSkeinOperator configuration or vice versa. Also, dbt does not allow specific profile configuration setting via CLI, we can only pick one of the profiles already defined in profiles.yml.
  • dbt + skein models on K8s Airflow have to be functionally tested on an actual Airflow deployment, as the workflow stalls (for now) when using the “dev environment” script on the deployment server. The culprit is most likely the fact that the dev env (correctly) carries and uses the Kerberos ticket of the dev env owner/user, but that ticket is not available on YARN so dbt can’t run a Spark SQL job properly.
  • Due to the HDFS cache of dbt-jobs requirement and the issue with the dev environment, a dbt developer workflow would likely require a full development/debugging cycle of the dbt code to be done from developer’s own dbt installation on a stat box (comes installed automatically on all stat boxes), merging into dbt-jobs main branch which triggers the HDFS cache update, and then developing an Airflow DAG that runs the future DbtSkeinOperator based on SimpleSkeinOperator with desired configuration.

Just a couple of comments:

  • About dbt deps: I believe Maya is not using (yet) anything from the extra packages, so if this is a blocker, we could consider removing it from dbt-jobs for now, and create a ticket to find the way of connecting all of this.
  • About profiles.yml, I think we could consider the profiles.yml a default profile for local development, and maybe overwrite it from Airflow before running dbt. That was the plan if dbt was to run from K8s, mainly because the profiles defined in the repo were for Spark Session and we needed a different one for K8s and the Thrift server. Do you think it make sense to keep the same approach and change it from Airflow? or is this more complex using Skein?

SimpleSkeinOperator supports overriding the default YARN resource configuration e.g. executor memory, number of vcores etc. However dbt itself also can be configured to request specific spark cores, memory, partitions etc. via profiles.yml file in the dbt-jobs repository. The interplay of these two configs needs to be investigated further

I know close to zero about dbt, but if dbt is launching a spark job, then these are probably two separate settings. SimpleSkeinOperator will only request a single YARN container (the application master) in which it will run a command. For our Spark Operators, launcher=skein will create a single node YARN application in which it will run the spark-submit command.

I'd expect for dbt via SimpleSkeinOperator this is similar: Start a single YARN container, make it run the appropriate dbt commands (which I suppose themselves will end up running spark-submit commands?)

Note that in the Spark launcher=skein case, if Spark is using --master=yarn (it probably is), this results in 2 different YARN applications: The skein launcher that runs spark submit + the Spark YARN application. If dbt is submitting Spark jobs, this will probably be true too!

  • About profiles.yml, I think we could consider the profiles.yml a default profile for local development, and maybe overwrite it from Airflow before running dbt.

This is still a viable option, and one that we should probably implement. I don't think it's much more complex with Skein at play, it boils down to the DbtSkeinOperator writing the profiles.yml contents into a temporary file and referencing the path when invoking dbt.

I know close to zero about dbt, but if dbt is launching a spark job, then these are probably two separate settings. SimpleSkeinOperator will only request a single YARN container (the application master) in which it will run a command. For our Spark Operators, launcher=skein will create a single node YARN application in which it will run the spark-submit command.

The way we run Spark via dbt is through the dbt-spark adapter, which supports 4 different ways of interacting with Spark: ODBC, Thrift, HTTP and session. We are using the session method which effectively spins up a pyspark session to run the SQL commands. I guess in this way it's similar to our Jupyter notebooks.

You are right, the resources config of the SimpleSkeinOperator refers to the YARN application master running the scripts we provide. The Spark configuration in profiles.yml gets communicated to the pyspark session/context for the actual Spark SQL application.