Page MenuHomePhabricator

[Data Quality] Develop Airflow post processing instrumentation to collect and log configurable data metrics
Closed, ResolvedPublic13 Estimated Story Points

Description

Develop a generic, configurable Airflow operator to collect post processing data metrics.
To monitor post processing results, the operator should be configurable to allow for calculations of arbitrary metrics relevant for the produced data set.
These metrics could include record counts, counts of defined, non-null attributes or counts of unique / distinct attribute values or other arbitrary metric calculations.
A general mechanism should be provided to log these generated metrics to Hive to make them available to Superset.

Details

Event Timeline

I synced with @JAllemandou and started looking into airflow, jobs and internal tooling built around it.

Below is some input for grooming this phab.

Existing WMF operators, and dag practices, assume separation of concerns between airflow tasks and spark jobs execution. There's no shared state were an operator could access attributes or data (metrics). Moreover, most existing spark jobs act as an execution engine for HQL queries, and their business logic will be hard to introspect.

Given current tooling implementation, I don't think a generic metric collector operator would be immediately viable. We could provide sidecars for post-processed dataset analysis, but that's an orthogonal problem space IMHO.

An alternative approach for instrumenting spark app with custom metrics (record counts, runtime, gauges), could be to implement custom logic in the spark driver [0]. At that point, we could publish metrics directly from the driver. A use case for this would be to collect pre/post processing aggregates (application metrics at dataframe level) to compare across and to spot obvious deviations.

If I was to formulate this as a user story I'd express it along the lines of:
as a user I want to verify a “group by” operation by computing record counts pre/post aggregation, and report the operation runtime as well..

The Spark metrics subsystem won’t give us that at dataframe level, but as raw record / byte sums across tasks and stages. Moreover, Spark system metric could contain possible duplicates (in case of task retries on error). We could use this info to inform trends but not absolute values to evaluate datasets.

We could think of providing libraries / functions to implementers and promote the use of best practices for application instrumentation wrt data quality guarantees (enforceable in CI). We could also investigate whether Sparks' plugin system can be of use for our use cases.

This approach would only work under the assumption that Spark application code is written using the DataFrame API. In practice, Wikimedia devs tend to use Spark as an execution engineer for HQL queries (I need to quantify this). That would make instrumentation and custom app metric reporting cumbersome. We might still be able to decorate the query and extract pre/post processing info at high level, but that _might_ not add much value vs using system metrics.

[0] Trivial custom Scala and Python prometheus clients examples.

Ahoelzl renamed this task from [Data Quality] Develop generic Airflow instrumentation to collect and log data metrics to [Data Quality] Develop generic Airflow post processing instrumentation to collect and log data metrics.Oct 30 2023, 6:24 PM
Ahoelzl updated the task description. (Show Details)
Ahoelzl renamed this task from [Data Quality] Develop generic Airflow post processing instrumentation to collect and log data metrics to [Data Quality] Develop Airflow post processing instrumentation to collect and log configurable data metrics.Nov 1 2023, 12:30 AM
Ahoelzl updated the task description. (Show Details)

After investigating the webrequest pipeline and table schema, it is not feasible to define a generic, meaningful set of data metrics. Producing all kings of possible data metrics would explode the cardinality of metrics while most of the produced metrics would likely be unused.
Hence, the focus should be on a configurable metrics collector operator. The usage of Airflow SQL operator or SQLCheckOperator should be investigated. A hypothesis to evaluate is "are SQL statement a generic way to produce arbitrary metrics?".
Ref: https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/operators.html

After some more fiddling and exploration of existing dags, I think that the challenge for generalizing a metric generation approach is that we lack uniformity on how dags are implemented and parametrized across airflow instances. Moreover, dags are just a thin layer around (mostly) HQL logic. This means that operators would need to be context aware. However, if we follow current design choices, an operator would be required to only schedule an artifact that implements business logic.

A hypothesis to evaluate is "are SQL statement a generic way to produce arbitrary metrics?".

In principle this should be relatively straightforward to implement using wmf machinery. E.g. we could plug a "sql check" type of thing
to webrequests with:

[...]
        post_check = SparkSqlOperator(
            task_id="refine_webrequest_check_output",
            sql=var_props.get(
                "hql_gitlab_raw_path",
                "https://gitlab.wikimedia.org/-/snippets/100/raw/main/select_count.hql"
            ),
            query_parameters={
                "table": refined_webrequest_table,
                "year": "{{ data_interval_start.year }}",
                "month": "{{ data_interval_start.month }}",
                "day": "{{ data_interval_start.day }}",
                "hour": "{{ data_interval_start.hour }}"
            },
            launcher="skein",
        )
[...]

        refine_webrequest >> post_check >> mark_done

The post_check hql could store its output in hdfs (the one linked doesn't). Here I'm using hql just as an example, we could allow for scheduling python logic with statistical check as well (independently from refine).

It might be possible to share state between the airflow task and the metric job via XComs IPC. This could allow to retrieve task output (e.g. counts, gauges, histograms) and embed them in an airflow dag run prometheus timeseries if desired. From the info I gathered so far, I don't think it would work out of the box with wmf_airflow_common operators an conventions.
I am not familiar with either airflow of wmf codebase to estimate how hard this would be (if feasible at all).

Alternatives to using operators for metrics collection

@Ottomata this is the idea I was bouncing around when we spoke earlier - not sure if it makes sense.

If we need a generic (= reproducible across dags and airflow instances) way to collect stats I wonder if instead of providing custom operators, and requiring changes to existing pipelines, it would not be easier to implement sidecar dags that are triggered once a pipeline completes successfully.
Maybe analytics does/did something similar with the Anomaly Detection job?

On the one hand it's not great because we end up with more pipelines to manage (and instrument), on the other end I could imagine an API where:

  • a user defines their dag.
  • a user programmatically defines a set of dataset specific metrics via an hql or python logic (that we can templated) they care about.
  • under the hood, an data monitoring dag is instantiated and scheduled at each run.

E.g. the API boundary for a client would be implementing the semantics of "metrics" programmatically (hql, conda env, jar); airflow "platform" would just provide an harnass for managing the lifecycle of said implementation. There will be some bookkeeping needed for which dags require a follow-up data quality check (https://docs.astronomer.io/learn/airflow-datasets could help enforcing boundaries).

In this scenario we can provide default (basic) metrics generation, yet allow for user provided (=dataset specific) metrics reporting logic to be executed. Possibly we could present an API that follows strategy pattern.

Another approach to consider, If we initially just focus on summary statistics, may be to piggyback on iceberg. @xcollazo mentioned at some point there was WIP to explore table level statistics generation (@xcollazo I don't know what I'm talking about, and I might be misrepresenting our conversation :)).

Re airflow datasets as dependencies, it sounds like a good idea to me. I think @mforns and @Milimetric did some thinking around this already, especially around how to structure dependent dags. Better to use dataset dependencies, or better to explicitly make tasks depend on each other.

I think if we can figure out how to do this generically enough, your suggestion would be nicer for users than requiring them to modify their dags. An error in metrics gathering wouldn't cause their dag run to fail.

Another approach to consider, If we initially just focus on summary statistics, may be to piggyback on iceberg. @xcollazo mentioned at some point there was WIP to explore table level statistics generation (@xcollazo I don't know what I'm talking about, and I might be misrepresenting our conversation :)).

I was referring to the investigation done in T335306. That one is a long thread, so perhaps you could read from T335306#8924568. We left that discussion with an idea to have a table, perhaps implemented as an Iceberg table but not necessarily, in which we would keep state for other tables. That state would include the ingestions we had done so far, so that consumers like Airflow could use it as a source to sense when to start a pipeline (i.e. mimicking Hive Metastore partitions). But the idea later evolved to perhaps also including data quality metrics about each one of those ingestions. More details in the ticket.

I though there was synergy between this ticket and the work proposed in T335306. But, generally, just wanted you to be aware of that work for context.

Also, just found this, which may be relevant to this work: https://iceberg.apache.org/docs/latest/metrics-reporting/

Iceberg has a built in metrics facility we could hook in.

Thanks for the pointers and context @xcollazo .

I think @mforns and @Milimetric did some thinking around this already, especially around how to structure dependent dags. Better to use dataset dependencies, or better to explicitly make tasks depend on each other.

Ping @mforns @Milimetric. Would you maybe have some pointers to research you did on this?

We have multiple needs considering scheduling Airflow dags & tasks.

Without a definitive solution, an alternative naive approach would be to create metrics_dags holding tasks in charge of generating metrics for the short list of datasets we want to instrument first. Actually, we could have two dags: one triggered every ~20 minutes for hourly datasets and every 6 hours for the other schedules. As a downside, we would have a small level of parameter duplication.

The de-correlation would be similar to the heartbeat mechanism to monitor a process. Its scheduling is independent of the process.

Summary discussions we had over call / slack / doc. I did some PoC to generate dataset metrics for webrequests. As a starting point, I focused on two kind of metrics:

Summary statistics are calculated on each webrequests partition. Sequence number validation metrics are calculated on a post-processed (aggregated) view of a webrequests partition.

Under the hood, I have been using Amazon Deequ to (try to) provide a generic interface for quality checks. You can find EDA and sample code at: https://gitlab.wikimedia.org/gmodena/webrequests-deequ.

You can access a 24 hour sample of both metrics kind via superset at:

  • gmodena.deequ_webrequests_metrics
  • gmodena.deequ_webrequests_sequence_validation.

These tables have been generated via a manually executed jupyter notebook (see webrequests-deequ for more details).

While deequ has some limitations that will require pre-processing, I like the contract it provides (and enforces). In this example, both kind of metrics are stored in a metrics repository that maps to the two tables above.

The two kind of metrics are computed on (effectively) distinct datasets, but the resulting tables share the same schema. The schema mixes deequ specific conventions, with dataset specific metadata (day and hour of the webrequests partition).

While we might want to abstract deequ specifics away (so that for end users it'd be an implementation detail), this is an example of how we could borrow deequ ideas, and standardize on a data model.
As a low hanging fruit, we could take the union of both tables (in their current shape) and generate a single data quality / metrics table.

Next steps:

  • f/up with some data modelling for the metrics_repository / tables. @Ottomata warning: chance of bikeshedding :). We can start this in a gdoc.
  • define some threshold and values for constraints.
  • schedule metrics generation as part of the webrequest pipeline.

As a low hanging fruit, we could take the union of both tables (in their current shape) and generate a single data quality / metrics table.

@Ottomata @JAllemandou @tchin deequ's implementation of metric repository supports heterogeneous metrics and record keys. Meaning, multiple analyzers (applied on distinct datasets) can store data
in the same repo (right now: json on hdfs or in memory) and generate a report with the union of all metrics. Example using dataframe conversion:

+-------+----------+-------------------+-----------------+-------------+----+--------------+----+-----------------------------+---+-----+
|entity |instance  |name               |value            |dataset_date |year|source_table  |hour|metric_type                  |day|month|
+-------+----------+-------------------+-----------------+-------------+----+--------------+----+-----------------------------+---+-----+
|Dataset|*         |Size               |3.87322225E8     |1700659586900|2023|wmf.webrequest|00  |WebrequestsSummaryStats      |07 |11   |
|Column |user_agent|ApproxCountDistinct|621964.0         |1700659586900|2023|wmf.webrequest|00  |WebrequestsSummaryStats      |07 |11   |
|Column |hostname  |ApproxCountDistinct|111.0            |1700659586900|2023|wmf.webrequest|00  |WebrequestsSummaryStats      |07 |11   |
|Dataset|*         |Size               |110.0            |1700659676826|2023|wmf.webrequest|00  |WebrequestsSequenceValidation|07 |11   |
|Column |difference|Minimum            |0.0              |1700659676826|2023|wmf.webrequest|00  |WebrequestsSequenceValidation|07 |11   |
|Column |difference|Maximum            |1.0              |1700659676826|2023|wmf.webrequest|00  |WebrequestsSequenceValidation|07 |11   |
|Column |difference|Mean               |0.990909090909091|1700659676826|2023|wmf.webrequest|00  |WebrequestsSequenceValidation|07 |11   |
+-------+----------+-------------------+-----------------+-------------+----+--------------+----+-----------------------------+---+-----+

Quick update on this spike. Right now I have the following running in a dev environment for webrequest:

  • Scala metrics job implemented in refinery-job (SNAPSHOT). This job reports summary stats and sequence number validation (via deequ) into a webrequest specific metric table (gmodena.webrequest_metrics_v1).
  • An airflow dag that generates hourly metric reports.

Work in scope

Work that needs grooming

  • Iceberg backed metrics table
  • Generalize config and parametrization of metrics reporting jobs/airflow operator. Need a f/up with @JAllemandou to see how we can integrate this with config storage.
  • Refactor the webrequest ariflow dag to use the create_easy_dag factory, current parametrization best practices, and embed metric reporting.

Implement a serializer that gives us some flexibility wrt storing result medatata. E.g. keep result key fields (in progress).

A sample dataset is available in superset at gmodena.webrequest_metrics_v2.

Wikimedia SerDes could be an API boundary for dag implementers. Clients would be responsible for managing a deequ repository (with guidelines / tooling), and eventual data preparation steps. Wikimedia SerDe would be responsible for writing data in the target format in a global HDFS / Iceberg table.

Change 979359 had a related patch set uploaded (by Gmodena; author: Gmodena):

[analytics/refinery/source@master] refinery-job: add WebrequestMetrics.

https://gerrit.wikimedia.org/r/979359

We are using airflow for dq job orchestration. The actual DQ tooling implementation is in
T352685: [Data Quality] Metrics Alerting and T352688: [Data Quality] Move MetricsExporter to refinery-spark.

Both phabs contain API design an implementation details. But in summary:

  • Pipeline implementers should use deequ to define their metrics and constraint checks.
  • refinery-source provides two public classes that take deequ repositories as input and:
    • persists metrics into a globally available dataquality_metrics iceberg table.
    • persists alerts (constraint check results) into a globally available dataquality_alerts iceberg table, and generates an alert report in HDFS.
  • Airflow dags can be configured to send email alerts using Wikimedia's HdfsEmailOperator (e.g. the same logic used by the anomaly detection job).

refinery-source helper classes will be the public, generic, API boundary for Data Quality metric reporting. Users can piggyback on deequ and refinery-source to uniformly instrument pipelines, store metrics and generate alerts.

Similarly, we rely on existing Wikimedia Airflow operators to orchestrate data quality operations.

Functional spec and design docs are available at WIP: Dataset Metrics. Documentation will be moved to Wikitech once CRs are merged.

Change 979359 merged by Gmodena:

[analytics/refinery/source@master] refinery-job: add WebrequestMetrics.

https://gerrit.wikimedia.org/r/979359

Ahoelzl set the point value for this task to 13.Jan 30 2024, 7:14 PM