Page MenuHomePhabricator

Process to check approximate correctness of analytics pipeline outputs
Open, HighPublic

Description

As a consumer of analytics datasets I want invalid data to be quickly recognized so that I have good data to support deployed features.

We have a number of datasets, both intermediate and final, being generated in the analytics cluster. Today we don't have any particular way of verifying that something dramatic hasn't changed week-to-week. We should review best practices around validating data correctness and come up with a process that can be somewhat generically applied to our pipelines.

General idea:

  • Apply heuristics such as comparing data size vs last week. This should vary depending on the data source, something like glent.m0prep should grow every week. Something like mjolnir.query_clicks_ltr should be within a few % of the previous weeks run.
  • Have ability to check data size across multiple dimensions, for example counts per wiki.
  • Should be configuration driven, such that a single script can be configured to support the needs of different data.
  • Figure out how to organize attaching these jobs at each step of the airflow dag.

Bonus points:

For privacy reasons we don't keep old datasets around. It would be nice if in addition to the automated process the various metrics checked were recorded somewhere. We could split the checking process into two parts, the first part creates a document describing the dataset and inserts it into elasticsearch (soon™ relforge will live in analytics network) . The second part could either query elasticsearch, or pull the document for the most recent run(s) to evaluate it's heuristics. This has the side benefit that we can build kibana dashboards on relforge that display these dataset heuristics.

Event Timeline

This is sortof a followup to T260305. In that ticket data was being dropped in the middle of the job, but nothing was able to automatically recognize that the outputs were incomplete.

EBernhardson renamed this task from Script to check approximate correctness of analytics pipeline outputs to Process to check approximate correctness of analytics pipeline outputs.Sep 17 2020, 5:18 PM
EBernhardson updated the task description. (Show Details)
EBernhardson updated the task description. (Show Details)
EBernhardson updated the task description. (Show Details)

deequ is a spark application descibed as Unit Tests for Data. It supports both constraint checking and suggestion. It has various affordances for the differences between kinds of datasets (growing, time series, etc.). One thing deequ does not support is nested attributes (structs and arrays) which we use in a few places, but there are workarounds. This is configuration driven, although the configuration is specified ahead of time from code and must be compiled into a jar. It can emit a variety of metrics about the data that it calculated to perform the verifications.

I view 2 aspects this task covers;

  • correctness of ETL algorithms (unit-testing using real data) - deequ fits under this category. AFAIK Analytics has not yet worked with any library in that space.
  • anomaly detection over data - the work done by @mforns and the research team on the data-quality pipeline and its RSVD algo for anomaly detection

@EBernhardson : How would you like us to help?

@EBernhardson (cc @gmodena since he's asked about this before)

I'm currently migrating the anomaly detection job that @JAllemandou mentions to Airflow.
Will still take a bit to be productionized but it already works in Oozie, and the idea is the following:

The user will create a DAG from a template, more or less like this:

from refinery.airflow.anomaly_detection import AnomalyDetectionDAG
from refinery.airflow.datasets.event import navigationTiming
from datetime import datetime

dag = AnomalyDetectionDAG(
    'useragent_distribution',
    start_date=datetime(2021, 7, 10),
    source_dataset=navigationTiming,
    metrics_query='your_query.hql',
    anomaly_threshold=4,
    anomaly_email='some.email@wikimedia.org',
)

And then will specify a query (your_query.hql above) that should return 1 data point. For instance, this would monitor the row count of your dataset:

-- This snippet should be copied as is for any query.
INSERT OVERWRITE TABLE {{ params.table }} PARTITION (
    source = '{{ params.source }}',
    year = {{ execution_date.year }},
    month = {{ execution_date.month }},
    day = {{ execution_date.day }}
)
-- End of snippet.

SELECT
    '{{ ds }}' AS dt,
    'row_count' AS metric,
    count(*) AS value
FROM event.navigationtiming
WHERE
    year = {{ execution_date.year }} AND
    month = {{ execution_date.month }} AND
    day = {{ execution_date.day }}
)

And that would be all. The anomaly detection DAG template will automatically create a DAG that waits for the source dataset to be present in Hive, then applies the specified query, then applies an anomaly detection job to the timeseries generated day by day by the query, and alerts if it finds a deviation higher than the specified threshold. The generated timeseries is stored in Hive in a dedicated table, but only the last N readings are kept, for performance reasons.

The query can be as simple as a row count, or as complicated as necessary. For instance, for traffic anomalies, we're using entropy calculation on top of traffic distribution.