Page MenuHomePhabricator

Implement Airflow Dataset class for RestExternalTaskSensor
Open, Needs TriagePublic

Description

Currently, users depend on datasets in Airflow via our custom dataset abstraction in wmf_airlfow_common. This abstraction allows users to declare and configure datasets in datasets config files, and then refer to them by name when instantiating sensors in their DAGs.

RestExternalTaskSensor was primarily developed to make it possible to sense readiness of Iceberg data.

It would be nice if users could continue to use the same dataset config and library to depend on Iceberg tables. However, this may be a little awkward with DatasetRegistry as is, since DatasetRegistry expects datasets to be in 'datastores', and RestExternalTaskSensor has no knowledge of the data output of the tasks it can sense on. It only knows when a referenced task is complete. Upon a cursory read, this limitation looks only to be one of naming concepts (e.g. datastore_to_dataset_map), and is not a functional limitation.

It seems possible to implement a 'RestExternalTaskSensorDataset' class (even though it isn't a dataset sensor) that would provide the desired functionality: using our dataset library to sense on readiness of Iceberg tables. If we wanted, we could abstract away the 'RestExternalTaskSensor' part of this Dataset implementation and call it 'IcebergDataset', but this might be awkward given that the configuration for an iceberg table dataset will have to target an airflow instance and task.

We should explore our options here and implement a solution.


Also: currently, datasets.yaml config files are isolated in each airflow instance. We do not have a global datasets.yaml file(s). We'll need this if we want to sense on Iceberg tables created by a different airflow instance, e.g. as done here.

To do this, we could make DatasetRegistry read every dataset.yaml file, or we could move dataset configuration up to a global level config file in the airflow-dags repo.

Event Timeline

Ottomata updated the task description. (Show Details)
Ottomata updated the task description. (Show Details)

Maybe just change the name of the file to data_dependencies.yaml and the module to DataDependency?
And then the user would use it like data_dependency("data-dependency-name").get_sensor_for(dag)?
With this, the semantic weirdness would be solved, no?

Ya probably some renaming like you suggest would remove the awkwardness. Looking forward to the naming bikeshed session!

Or, just implement IcebergDataset, and have get_sensor_for use RestExternalTaskSensor somehow. I suppose to do this the IcebergDataset would need to be configured with the source Airflow instance and DAG?

Or, just implement IcebergDataset, and have get_sensor_for use RestExternalTaskSensor somehow. I suppose to do this the IcebergDataset would need to be configured with the source Airflow instance and DAG?

But this gets us back into the semantic weirdness. RestExternalTaskSensor does not sense on a dataset, it senses on a particular DAG (or task) being successful. There could be many DAGs that write to an Iceberg dataset (like it is the case for wmf_dumps.wikitext_raw). We can choose one 'main' DAG to sense on, but I think we should be explicit about this: it is a DAG we sense, not the availability of data in a dataset.

Maybe just change the name of the file to data_dependencies.yaml and the module to DataDependency?
And then the user would use it like data_dependency("data-dependency-name").get_sensor_for(dag)?
With this, the semantic weirdness would be solved, no?

I like this idea a lot!

  1. Move to the concept of data dependencies
  2. Make it universal across instances

Then we could have (at least) two kinds of data dependencies - datasets and DAGs/tasks, and getting sensors for any of them would be more natural too.

If we made it universal across instances, how would that affect current deployment method? AFAIK we're using scap on specific instance's folder, but making the config universal would move the config out of those folders.

If we made it universal across instances, how would that affect current deployment method? AFAIK we're using scap

scap is used to deploy artifacts, so that is more relevant if we wanted to globalize the artifact.yaml config files, which I think we don't need to do? If we did it would fall under T322690: Add support for repository artifacts in Airflow somewhere.

I think the dataset/dependencies/sensor stuff isn't affected by deployment process changes.

I've posted a draft proposal for the implementation design of ideas/needs described in this ticket: https://docs.google.com/document/d/1lapUHpWY2rm9si1iupRYHTcsDY-_QSbLcL03MXOSOlE/edit

OK, so the schema I proposed in the Google Doc looks like this:

iceberg_wmf_dummy_dataset:
  datastore: iceberg
  table_name: wmf_dummy_db.some_table_name
  produced_by:
    airflow:
      instance: search
      dag: dummy_dag
      task_group: dummy_grouped_tasks
      task: dummy_standalone_task
      kwargs:
        execution_delta: timedelta(hours=-1)

Summarized:

Key nameRequired?Description
instanceYAirflow instance label e.g. “analytics”
dagYDAG ID
task_groupN*Task group ID
taskN*Task ID
kwargsNA dictionary-like structure of additional sensor settings to be provided to the sensor constructor
  • Either none or only one of “task_group” and “task” keys are required
NOTE: @mforns correctly noted that having kwargs in the datasets.yaml file itself is bad, as it locks configuration parameters to the dataset definition. A much better solution would be to modify the get_sensor_for library method to accept custom configuration parameters in form of kwargs, or a similar solution.

Unification of all the instance-specific datasets.yaml files

Currently, each instance specifies its own datasets.yaml files in its “config” directory. This approach is problematic due to a couple of reasons:

  • Datasets exist independently outside of Airflow instances and their configuration needs to be the same across all instances. Having multiple places where the same dataset is configured is unnecessary, leading to code bloat and user errors.
  • A single location (file or folder) where all the datasets are configured is much more developer friendly.
  • A single location with all the dataset configurations can easily be deployed to all Airflow instances through the same mechanism that already exists and deploys the wmf_airflow_common folder for example.

Proposed solutions

Solution 1

Put all the existing datasets configurations into a single datasets.yaml file under “wmf_airflow_common/config” folder.

Pro: Location is practical and logical, easy to remember. All existing dataset configurations add up to a little over 300 lines of YAML, so the file would be relatively small in the foreseeable future.

Contra: File will grow over time and become unmanageable.

Solution 2

Put all the existing datasets configurations into multiple files under a new folder “wmf_airflow_common/config/datasets”. During bootstrap, config library would indiscriminately read all YAML files under that folder.

Pro: Location is practical and logical, easy to remember. Future-proof against a huge config file.

Contra: There might be some confusion over file naming, and choosing the appropriate file to introduce a new dataset configuration into.

Seems like the proposed mechanism is closely related to an existing Airflow feature: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#data-aware-scheduling.

Granted, our dataset.yaml and associated Dataset.py implementation covers other use cases, but in the specific case of sensing what a particular DAG has produced, Airflow's datasets do cover that use case, by annotating DAGs with inlets and outlets. Details on link.

I think we should consider that implementation to understand its pros and cons. Even if we are not interested in their implementation, we could reuse concepts if they've found them useful.

IIRC @mforns considered Airflow datasets when originally implementing our Datasets library, and decided against it for some reasons.

@Ottomata I think we decided against it, precisely because we were aiming to use the Hive metastore and HDFS files as the keepers of state, as opposed of the Airflow database.
But now, that has changed, so maybe we could reconsider?

It might be worth waiting until investing in Airflow management of this though.

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-74+Introducing+Data+Assets

I think things are not settled :)

Things are not settled for sure. The version of Airflow we use already supports the notion of Dataset (Data Asset is basically a rename+upgrade of a Dataset) and I think even as of right now we could use that for "event-driven pipelines" that react to Datasets being updated.

Some things to keep in mind when it comes to this approach to building pipelines:

  1. A Dataset is uniquely identified by really just a case-sensitive string, which is recommended to take form of a URI but does not necessarily have to.
  2. What Airflow is actually depending on behind the scenes is the run status of the DAG/task that is marked as having a Dataset as its outlet - it does not care about whether there's an actual file in the s3 bucket, or whether the contents of the file are correct.
  3. Airflow supports both fixed extra information attached to the dataset, as well as metadata emitted upon producing the dataset. This feature is very useful IMHO, as one could emit the number of rows affected by the operator, and similar information.

it does not care about whether there's an actual file in the s3 bucket, or whether the contents of the file are correct.

Ah! This was the bit I was missing. I was trying to understand how a Hive dataset would work here.

So, there is no data aware sensing using Airflow Datasets, it is 100% airflow task state.

it does not care about whether there's an actual file in the s3 bucket, or whether the contents of the file are correct.

Ah! This was the bit I was missing. I was trying to understand how a Hive dataset would work here.

So, there is no data aware sensing using Airflow Datasets, it is 100% airflow task state.

Yes, I'm fairly certain that's the case. The documentation never states this clearly and explicitly, but leaves cues around like this:

In addition to scheduling DAGs based on time, you can also schedule DAGs to run based on when a task updates a dataset.

Airflow makes no assumptions about the content or location of the data represented by the URI, and treats the URI like a string.

Airflow marks a dataset as updated only if the task completes successfully. If the task fails or if it is skipped, no update occurs, and Airflow doesn’t schedule the consumer DAG.

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html

I've published a draft MR that, as far as I can tell, implements most of support for the produced_by configuration of a dataset. The code correctly recognizes, parses and models the produced_by section, and inserts a DatasetProducer object into a Dataset object. This DatasetProducer object supports its own implementation of the get_sensor_for method that takes precedence over Dataset's normal method, if the DatasetProducer is available.

One notable thing missing is the code that analyzes the difference between schedules of the origin DAG and target (remote) DAG, and automatically adjusts the execution_delta config param of the ExternalTaskSensor. This parameter is absolutely needed for the external task sensor to operate as we would expect it to operate. Code currently does support a manual override of that parameter though.

Airflow instances and their REST API URLs are hard-coded for now, I think most likely we will eventually want to change it into something more dynamic.

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/861