Deequ contains a VerificationSuite for validating assumptions on a dataframe. We can either apply verification checks to a mediawiki_content_history_v1 dataframe or on its computed data quality metrics.
We can do this on a per-wiki basis or across the entire dataset. For example:
df = (spark.table("wmf_data_ops.data_quality_metrics") .where(f"partition_dt = CAST('{args.partition_dt}' AS TIMESTAMP)") .where("name = 'Completeness'") .where("tags['project'] = 'mediawiki_content_history'")) checkResult = VerificationSuite(spark) \ .onData(df) \ .addCheck( .hasMin("value", lambda x: x == 1, "mediawiki_content_history is not complete") \ .run()
Once the checks are in place we need to output a file with these alerts to hdfs and then pick it up in Airflow with the HdfsEmailOperator