Page MenuHomePhabricator

Optimize metrics computation for the MW Content Pipeline
Open, MediumPublic

Description

The compute_metrics task of the mw_content_reconcile_mw_content_history_daily DAG takes, on average, 5.4 hours to complete:

Screenshot 2025-08-01 at 11.40.14 AM.png (448×896 px, 32 KB)

Additionally, we have have multiple incidents in which the Spark driver OOMs (see T400830, T387033).

In this task, we should:

  • Investigate why the metrics take so long, and what optimizations we could do to run them faster.
  • Investigate why the driver continues to need more memory, and what optimizations we could do use less mem. This is a priority.
  • Presumably any benefits we find for the daily run we can apply to the monthly run as well?

Event Timeline

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1622

hotfix: analytics: Bump driver memory for MW Content related metrics. Again.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1622

hotfix: analytics: Bump driver memory for MW Content related metrics. Again.

xcollazo triaged this task as Medium priority.Aug 22 2025, 5:24 PM
xcollazo updated the task description. (Show Details)

Even with driver_memory = 32G, the metrics continue to OOM, with most recent being today's run (Airflow link).

Bumping priority of this.

I looked at implementing vanilla SQL equivalents to the metrics we do, and found out the following:

For equivalent to:

#        .addAnalyzer(Size())
#        .addAnalyzer(Completeness("page_id"))
#        .addAnalyzer(Completeness("revision_id"))
#        .addAnalyzer(Completeness("revision_content_slots_main_content_body"))
#        .addAnalyzer(Completeness("revision_content_slots_main_content_format"))

We could do:

spark.sql("""

WITH completeness_metrics AS (
  SELECT 
         page_id                                       IS NOT NULL AS page_id_not_null,
         revision_id                                   IS NOT NULL AS revision_id_not_null,
         revision_content_slots['main'].content_body   IS NOT NULL AS content_body_not_null,
         revision_content_slots['main'].content_format IS NOT NULL AS content_format_not_null,
         wiki_id
  FROM wmf_content.mediawiki_content_history_v1
  WHERE revision_dt >= NOW() - INTERVAL 48 HOURS
)

SELECT
       count(1) as count,
       sum(CASE WHEN page_id_not_null THEN 1 ELSE 0 END) AS page_ids,
       sum(CASE WHEN revision_id_not_null THEN 1 ELSE 0 END) AS revision_ids,
       sum(CASE WHEN content_body_not_null THEN 1 ELSE 0 END) AS content_bodies,
       sum(CASE WHEN content_format_not_null THEN 1 ELSE 0 END) AS content_formats,
       wiki_id
FROM completeness_metrics
GROUP BY wiki_id
""").show(300, truncate=False)

+------+--------+------------+--------------+---------------+-----------------+
|count |page_ids|revision_ids|content_bodies|content_formats|wiki_id          |
+------+--------+------------+--------------+---------------+-----------------+
|16831 |16831   |16831       |16713         |16825          |ruwiki           |
|3210  |3210    |3210        |3202          |3210           |viwiki           |
|1116  |1116    |1116        |1115          |1116           |elwiktionary     |
|5443  |5443    |5443        |5423          |5443           |idwiki           |
|572784|572784  |572784      |572782        |572784         |wikidatawiki     |
|550   |550     |550         |544           |550            |bewiki           |
|199977|199977  |199977      |198441        |199974         |enwiki           |
|16375 |16375   |16375       |16199         |16375          |itwiki           |
|394700|394700  |394700      |393453        |394700         |commonswiki      |
...

This takes 4.2 minutes with 64 executors.

For equivalent to:

#        .addAnalyzer(Uniqueness(["page_id", "revision_id"]))

We could do:

spark.sql("""
WITH counts AS (
  SELECT 
         count(1) as total_count,
         wiki_id
  FROM wmf_content.mediawiki_content_history_v1
  WHERE revision_dt >= NOW() - INTERVAL 48 HOURS
  GROUP BY wiki_id
),

unique_counts AS (
  SELECT count(1) as uniques_count,
         wiki_id
  FROM (
    SELECT
           page_id,
           revision_id,
           wiki_id
    FROM wmf_content.mediawiki_content_history_v1
    WHERE revision_dt >= NOW() - INTERVAL 48 HOURS
    GROUP BY wiki_id, page_id, revision_id
  )
  GROUP BY wiki_id
)

SELECT total_count,
       uniques_count,
       total_count == uniques_count AS all_uniques,
       c.wiki_id
FROM counts c
JOIN unique_counts uc ON (c.wiki_id = uc.wiki_id)
ORDER BY total_count DESC
""").show(300, truncate=False)

+-----------+-------------+-----------+-----------------+
|total_count|uniques_count|all_uniques|wiki_id          |
+-----------+-------------+-----------+-----------------+
|411994     |411994       |true       |wikidatawiki     |
|334643     |334643       |true       |commonswiki      |
|194714     |194714       |true       |enwiki           |
|56767      |56767        |true       |mgwiktionary     |
|32235      |32235        |true       |frwiki           |
|25528      |25528        |true       |dewiki           |
|20438      |20438        |true       |ukwiki           |
...

This takes 45 seconds with 64 executors.

Finally for an equivalent to the inconsistent rows ratio:

"inconsistent_rows_ratio",
# If content_size is 0 and inconsistent_size isn't then we have a problem
inconsistent_size / content_size if content_size else inconsistent_size

We could do:

spark.sql("""
WITH content_counts AS (
  SELECT 
         count(1) as total_count,
         wiki_id
  FROM wmf_content.mediawiki_content_history_v1
  WHERE revision_dt >= NOW() - INTERVAL 48 HOURS
  GROUP BY wiki_id
),

inconsistencies_countts AS (
  SELECT 
         count(1) as inconsistencies_count,
         wiki_id
  FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
  WHERE computation_class = 'last-24h'
    AND computation_dt = '2025-08-27'
  GROUP BY wiki_id
)


SELECT total_count,
       COALESCE(inconsistencies_count, 0) as inconsistencies_count,
       COALESCE(inconsistencies_count, 0) / total_count AS inconsistencies_ratio,
       cc.wiki_id
FROM content_counts cc
LEFT JOIN inconsistencies_countts ic ON (cc.wiki_id = ic.wiki_id)
ORDER BY inconsistencies_ratio DESC
""").show(300, truncate=False)

+-----------+---------------------+---------------------+-----------------+
|total_count|inconsistencies_count|inconsistencies_ratio|wiki_id          |
+-----------+---------------------+---------------------+-----------------+
|3370       |2124                 |0.6302670623145401   |mediawikiwiki    |
|2          |1                    |0.5                  |gotwiki          |
|5          |1                    |0.2                  |tumwiki          |
|68         |11                   |0.16176470588235295  |knwiki           |
|160        |24                   |0.15                 |newiki           |
|364        |35                   |0.09615384615384616  |zh_yuewiki       |
|584        |22                   |0.03767123287671233  |dewikisource     |
|27         |1                    |0.037037037037037035 |ruwikibooks      |
|34         |1                    |0.029411764705882353 |lmowiki          |
|35         |1                    |0.02857142857142857  |gnwiki           |
|84         |2                    |0.023809523809523808 |zh_classicalwiki |
|44         |1                    |0.022727272727272728 |frwikiversity    |
|1226       |26                   |0.021207177814029365 |mswiki           |
...

This takes 14 seconds with 64 executors.

We would then have to manipulate the output to fit the metrics table, but as we can see, the total computation time here could easily be less than 10 minutes. Compare that to 5+ hours with the pydeequ framework.

oo very nice!! I wonder how it'd compare to a pure java version of the deequ code. Maybe if we switch to SQL we can take the opportunity to revamp the metrics table schema?

I wonder how it'd compare to a pure java version of the deequ code.

I suspect the majority of the improvements comes from the fact that we do all metrics in one shot by leveraging GROUP BY, while deequ is iterating over the wikis and launching multiple spark jobs, while at the same time doing 1500+ iceberg commits, for some reason (see T384744). Here we do not explore the penalty of iceberg commits, but we would do at most 3 of them (one for each SQL query).

Maybe if we switch to SQL we can take the opportunity to revamp the metrics table schema?

I think we should do that work, but I'm hesitant to go down that rabbit hole right now. Revamping DQ completely should be its own hypothesis.

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1642

analytics: Disable MW Content Alerts until we improve the alerting mechanism.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1642

analytics: Disable MW Content Alerts until we improve the alerting mechanism.

Elsewhere we are seeing presurre from this job causing ~1700 commits on the wmf_data_ops.data_quality_alerts table:

spark.sql("""
SELECT count(1) as count,
       DATE_TRUNC('day', committed_at) AS day
FROM wmf_data_ops.data_quality_alerts.snapshots
GROUP BY day
ORDER BY DAY DESC
""").show(truncate=False)

+-----+-------------------+
|count|day                |
+-----+-------------------+
|22   |2025-09-17 00:00:00|
|1787 |2025-09-16 00:00:00|
|1788 |2025-09-15 00:00:00|
|1786 |2025-09-14 00:00:00|
|1787 |2025-09-13 00:00:00|
|1788 |2025-09-12 00:00:00|
|1786 |2025-09-11 00:00:00|
|1787 |2025-09-10 00:00:00|
|1787 |2025-09-09 00:00:00|
|1787 |2025-09-08 00:00:00|
|5311 |2025-09-07 00:00:00|
|4484 |2025-09-06 00:00:00|
|855  |2025-09-05 00:00:00|
|24   |2025-09-04 00:00:00|
|2757 |2025-09-03 00:00:00|
|815  |2025-09-02 00:00:00|
|26   |2025-09-01 00:00:00|
|1787 |2025-08-31 00:00:00|
|1787 |2025-08-30 00:00:00|
|1787 |2025-08-29 00:00:00|
+-----+-------------------+
only showing top 20 rows
SELECT count(1) FROM wmf_data_ops.data_quality_alerts.snapshots;

101 318 snapshots

jvm = sc._jvm
path_str = "/wmf/data/data_quality/alerts/metadata"
fs = jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
jpath = jvm.org.apache.hadoop.fs.Path(path_str)
cs = fs.getContentSummary(jpath)
cs.getFileCount()

237096 files in dir

import heapq

jvm = sc._jvm
fs = jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

def top_n_files(path_str, n=10):
    jpath = jvm.org.apache.hadoop.fs.Path(path_str)
    it = fs.listLocatedStatus(jpath)   # streaming iterator
    heap = []  # min-heap of (size, path)
    m = 0
    while it.hasNext():
        m+=1
        if m % 1000 == 0:
            print(".", end="", flush=True)
        status = it.next()
        if status.isFile():
            size = int(status.getLen())
            path = status.getPath().toString()
            # push into heap, pop smallest if more than n
            if len(heap) < n:
                heapq.heappush(heap, (size, path))
            else:
                heapq.heappushpop(heap, (size, path))
    return sorted(heap, key=lambda x: -x[0])

for size, path in top_n_files("/wmf/data/data_quality/alerts/metadata", n=10):
    print(f"{size/1024/1024:.1f} MB  {path}")
# 281.0 MB  hdfs://analytics-hadoop/wmf/data/data_quality/alerts/metadata/274278-b5bc5b4a-49c2-456f-9ab8-8e121859435f.metadata.json
# 281.0 MB  hdfs://analytics-hadoop/wmf/data/data_quality/alerts/metadata/274277-a7163e41-d255-4714-b0b0-76ae39d9d8d7.metadata.json
hdfs dfs -get hdfs://analytics-hadoop/wmf/data/data_quality/alerts/metadata/274278-b5bc5b4a-49c2-456f-9ab8-8e121859435f.metadata.json
hdfs dfs -get hdfs://analytics-hadoop/wmf/data/data_quality/alerts/metadata/274277-a7163e41-d255-4714-b0b0-76ae39d9d8d7.metadata.json
jq '.snapshots | length' 274278-b5bc5b4a-49c2-456f-9ab8-8e121859435f.metadata.json
# 274 276
jq '.snapshots | length' 274277-a7163e41-d255-4714-b0b0-76ae39d9d8d7.metadata.json
# 274 275

Each Iceberg snapshot generates a metadata file including reference to each previous snapshot.

With 274k snapshots we can estimate the place needed to store it:

weight_of_1_reference = 280MB / 274,000
number_of_reference = sum(k-1, 1, 274000) = 274000 * 273999 / 2
=> weight_of_all_references = 38TB

This is matching the scale of the size of the metadata dir on HDFS when we found it before beginning cleanup (was 36TB).

I think we should:

  • monitor snapshot counts on Iceberg tables
  • clear old snapshots more aggressively and keep a single snapshot per day by providing a snapshot_ids param to expire_snapshots
  • generates less commits so less snapshots

Thanks for the numbers @Antoine_Quhen.

monitor snapshot counts on Iceberg tables

This is interesting. We could collect this as part of table maintenance.

clear old snapshots more aggressively and keep a single snapshot per day by providing a snapshot_ids param to expire_snapshots

We are partially addressing this via https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1685

generates less commits so less snapshots

Agreed. The SQL approach above would solve the commits issue (and the computation time issue), but it would also put us in a situation where some code uses deequ, and some code uses custom SQL to do metrics.