Page MenuHomePhabricator

We should provide DQ integration with Python
Closed, ResolvedPublic5 Estimated Story Points

Description

In T352688: [Data Quality] Move MetricsExporter to refinery-spark we introduced new Scala classes in refinery-spark, that integrate with Amazon Deequ to persist metrics in a generic way.

We should make both Amazon Deequ and refinery-spark classes available and interoperable with pyspark code.

For deequ this means:

  • provide pydeequ dep in the conda-analyitcs env.

For refinery-spark we should:

  • document (or provide a lightweight wrapper) on how to interface from Python (the library is GA on HDFS).

Changelog

Event Timeline

lbowmaker set the point value for this task to 5.Mar 6 2024, 3:32 PM
gmodena renamed this task from [NEEDS GROOMING] we should provide DQ integration with Python to We should provide DQ integration with Python.Mar 7 2024, 8:35 AM
gmodena updated the task description. (Show Details)

We can integrate our DQ framework with Python by piggy backing on pyspark 's py4j gateway. Following is a rudimentary example that produces
metrics with data_quality_metrics table format:

#!/bin/env python

# Run pyspark shell with refinery jars.
# Note that refinery-job is not required, but since it's shaded it solves a bunch of dep imports.
# $ pyspark --jars ./refinery-spark/target/refinery-spark-0.2.33-SNAPSHOT.jar,./refinery-core/target/refinery-core-0.2.33-SNAPSHOT.jar,./refinery-core/target/refinery-core-0.2.33-SNAPSHOT.jar,.//refinery-job/target/refinery-job-0.2.33-SNAPSHOT-shaded.jar 
import os
from py4j.java_gateway import JavaClass, JavaObject

# We need to wrap refinery's HivePartition.
# This approach inefficient, but 
# but deailing  Java conversions (dict -> scalaMap -> ListMap) with varags has been
# fiddly. 
# In general, py4j does not like Scala immutable data structures - and Lists in particular.
# An alternative approach could be wrapping Scala code paths in Java interfaces.

HivePartition: JavaClass = sc._gateway.jvm.org.wikimedia.analytics.refinery.core.HivePartition
Some: JavaClass = sc._gateway.jvm.scala.Some
Tuple2: JavaClass = sc._gateway.jvm.scala.Tuple2

list_map: JavaObject = sc._gateway.jvm.scala.collection.immutable.ListMap()
pmap: dict = {"source_key": "testDataset", "year": "2023", "month": "11", "day": "7", "hour": "0"}
for key, value in pmap.items():
    tpl = Tuple2(key, Some(value))
    list_map = getattr(list_map, "$plus")(tpl)

partition: JavaObject = HivePartition("wmf", "webrequest", None, list_map)

# Initialize pydeequ. This code is taken from the library example
# at https://pypi.org/project/pydeequ/.
# We'll setup some analyzers, and pass their results to refinery's 
# DeequAnalyzersToDataQualityMetrics scala SerDe.
os.environ['SPARK_VERSION'] = '3.3'
import pydeequ

from pydeequ.analyzers import *
from pydeequ.repository import *
from pyspark.sql import Row

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

repository: InMemoryMetricsRepository = InMemoryMetricsRepository(spark)
resultKey: ResultKey = ResultKey(spark, ResultKey.current_milli_time(), {'tag': 'sometag'})

AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("b")) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey) \
    .run()

# DeequAnalyzersToDataQualityMetrics takes an instance of AnalyzersResult as input.
# This object is not available from the Python api, so we drop into jvm with
# a call to repository.repository instead. This should also ensure that compute metrics are not passed over the 
# python <-> jvm wire (we access jvm objects directly).
analyzers_result: JavaObject = repository.repository.load().get()

DeequAnalyzersToDataQualityMetrics: JavaClass = sc._jvm.org.wikimedia.analytics.refinery.spark.deequ.DeequAnalyzersToDataQualityMetrics
metrics: JavaObject = DeequAnalyzersToDataQualityMetrics(spark._jsparkSession, analyzers_result, partition, "123")
metrics.getAsDataFrame().show()

# Output: 
#+-------------+----------------+-------+--------+-----+------------+------------------+---------------+--------------------+-------------------+
#| dataset_date|            tags| entity|instance|value|        name|      source_table|pipeline_run_id|        partition_id|       partition_ts|
#+-------------+----------------+-------+--------+-----+------------+------------------+---------------+--------------------+-------------------+
#|1709805513911|{tag -> sometag}|Dataset|       *|  3.0|        Size|`wmf`.`webrequest`|            123|source_key=testDa...|2023-11-07 01:00:00|
#|1709805513911|{tag -> sometag}| Column|       b|  1.0|Completeness|`wmf`.`webrequest`|            123|source_key=testDa...|2023-11-07 01:00:00|
#+-------------+----------------+-------+--------+-----+------------+------------------+---------------+--------------------+-------------------+

There's a few things to fiddle with, and I'd like to add a thin python API layer to refinery classes (e.g. HivePartition) to make it more user friendly. All in all I don't see any blocker with providing python support. Performance-wise, pydeequ also wraps the Scala library and afaik there's no significant data movements (other than VMs bytecode).

@Ottomata @JAllemandou we need to think a bit how to (eventually) vendor a Python wrapper. We could bundle it with refinery itself, or we could keep a separate repo that bundles jars (like we do with eventutilities_python). As a next step I'll need to figure out how easy it is to access python jars from Jupyter / conda envs. So far I've only prototyped locally.

IIUC, the necessity for py4j is only tied to the fact that we developed helper code like the case of HivePartition and DeequAnalyzersToDataQualityMetrics that we'd like to reuse, correct?

IIUC, the necessity for py4j is only tied to the fact that we developed helper code like the case of HivePartition and DeequAnalyzersToDataQualityMetrics that we'd like to reuse, correct?

Correct. We implemented DQ utilities in Scala in the refinery framework, and we want to reuse them in Python calls instead of reimplementing logic

The need to call directly py4j should be an implementation detail, not exposed to end users. A public API could look something like this:

from refinery import HivePartition
from refinery.dq import DeequAnalyzersToDataQualityMetrics

...
partition: HivePartition = HivePartition("wmf", "webrequest", None, list_map)
DeequAnalyzersToDataQualityMetrics(spark, analyzers_result, partition, "123") 
...

With refinery.HivePartition and refinery.dq.DeequAnalyzersToDataQualityMetrics abstracting all the java calls. The library itself should be pip installable (and could bundle jars) with a pip install refinery-python. py4j we get for free by requiring pyspark.

tl;dr: same thing we do with eventutilities and its python wrapper.

I have started some work on this front (I'll update on phab), lets maybe pair on it?

lets maybe pair on it?

I'd love to hack on this at the offsite!!

lets maybe pair on it?

I'd love to hack on this at the offsite!!

I cleaned up a bit some code I started working on last week: https://gitlab.wikimedia.org/gmodena/refinery-python

There's an example of the API at https://gitlab.wikimedia.org/gmodena/refinery-python/-/blob/main/examples/metrics.py

I need to add a wrapper to the Alert generation SerDe, but after that it should be in a testable state.

I need to add a wrapper to the Alert generation SerDe

Done. There's an example at https://gitlab.wikimedia.org/gmodena/refinery-python/-/blob/main/examples/alerts.py?ref_type=heads

  • provide pydeequ dep in the conda-analyitcs env.

I added pydeequ this as a dep to the python DQ wrapper. There's no need to add pydeequ to conda-analytcs at this stage IMO.