Page MenuHomePhabricator

Fix reconcile bug where user_id is not being populated correctly.
Open, Needs TriagePublic

Description

On T411484#11432242, we found that the reconcile process of mediawiki_content_history_v1 (and now mediawiki_revision_history_v1) seems to not be generating the proper user_id (and, presumably, the user_text).

After discussions with @JAllemandou, we decided that we will:

  • Modify consistency_check.py to consider user_id and user_text.
  • The main bug: Modify emit_reconcile_events_to_kafka.py to populate performer.user_* fields.
  • we will figure if we can point reconciling towards mediawiki_revision_history_v1 instead of mediawiki_content_history_v1
  • Rerun a full reconcile

These changes are expected to produce lots of events. Although mediawiki_revision_history_v1 should have no issues ingesting, mediawiki_content_history_v1 will likely struggle.

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
main: content: allow reconcile pushdown_strategy to be set via DagPropertiesrepos/data-engineering/airflow-dags!1903xcollazoallow-pusdown-strategy-swithmain
hotfix: refine: more resources for large t-shirt size.repos/data-engineering/airflow-dags!1878xcollazohotfix-refine-bump-mem-for-large-v2main
hotfix: refine: reduce executor_cores to 2 on large t-shirt size.repos/data-engineering/airflow-dags!1873xcollazohotfix-refine-bump-mem-for-largemain
main: content: modify reconcile DAGs to consider user_central_idrepos/data-engineering/airflow-dags!1859xcollazofix-user-ids-on-reconcilemain
Consider user_id, user_central_id and user_text on reconcile. Fix related bugs.repos/data-engineering/mediawiki-content-pipelines!88xcollazofix-user-ids-on-reconcilemain
Customize query in GitLab

Event Timeline

Copy pasting from MR 88, for completeness:

In this MR we:

  • Start considering user_id, and user_text for reconcile.
  • If we reconcile a row, we now also emit its user_central_id. For this, we now have a new python script to fetch the localuser table from CentraAuth just before starting the reconcile.
  • We also fix a long standing bug in which we were ingesting the performer instead of the revision.editor.

I have tested all this code in a Jupyter notebook against simplewiki. I have also tuned the copying of localuser.

Here are the results from simplewiki:

spark.sql("""
SELECT count(1) as count,
       reasons
FROM xcollazo.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
GROUP BY reasons
ORDER BY count DESC
""").show(truncate=False)

+-----+---------------------------------------------------------------------------+
|count|reasons                                                                    |
+-----+---------------------------------------------------------------------------+
|35448|[mismatch_user_text]                                                       |
|2543 |[missing_from_target]                                                      |
|944  |[page_was_restored, mismatch_user_text]                                    |
|522  |[mismatch_user_id, mismatch_user_text]                                     |
|124  |[missing_from_source, page_was_deleted]                                    |
|42   |[mismatch_page]                                                            |
|29   |[missing_from_source, page_was_deleted, page_was_restored]                 |
|27   |[missing_from_target, page_was_restored]                                   |
|16   |[page_was_restored, mismatch_user_id, mismatch_user_text]                  |
|1    |[mismatch_user_visibility, mismatch_comment_visibility, mismatch_user_text]|
|1    |[mismatch_user_visibility, mismatch_comment_visibility]                    |
+-----+---------------------------------------------------------------------------+

Indeed a bunch of mismatch_user_text that we can now detect!

Change #1217579 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[operations/deployment-charts@master] Scale up mw-content-history-reconcile-enrich temporarily for big reconcile.

https://gerrit.wikimedia.org/r/1217579

Change #1217579 merged by jenkins-bot:

[operations/deployment-charts@master] Scale up mw-content-history-reconcile-enrich temporarily for big reconcile.

https://gerrit.wikimedia.org/r/1217579

There are a lot of reconcile events to be ingested (See Flink app last 1h.) All these events are legit, but did inadvertedly come from an airflow-devenv instance. Will debug that issue separately.

However, we do need the Airflow MR to land before correctly ingesting these events. Thus , I am pausing all MW Content DAGs, except the SLO, until we get that one merged in.

Change #1217773 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[operations/deployment-charts@master] Scale down a bit mw-content-history-reconcile-enrich from 20 to 18 TaskManagers.

https://gerrit.wikimedia.org/r/1217773

Change #1217773 merged by jenkins-bot:

[operations/deployment-charts@master] Scale down a bit mw-content-history-reconcile-enrich from 20 to 18 TaskManagers.

https://gerrit.wikimedia.org/r/1217773

Ok, lets recap:

  • The reconcile changes are now in production, and for daily reconcile, it has run successfully for two days.
  • Unfortunately, the monthly reconcile did indeed find north of ~64M rows that needed to be reconciled. Thus we scaled up the Flink app to process all these events. Flink successfully generated the enriched events over the course of a couple days.
  • However, the Refine process was not happy with the bursty amount of events. The couple hotfixes above finally took care of scaling refine in a way that was able to ingest the events.
  • But even though Refine is successful now, the ingestion code now gets overwhelmed with way too many changes that may touch anywhere in time and any page_id, thus that is failing.

Thus, we are going to attempt to reconcile manually, on a wiki_id by wiki_id basis, utilizing the reconcile events.

The strategy of wiki_id by wiki_id did not succeed, at least not for the mediawiki_revision_history_v1.

Inspecting the data from this table shows the issue:

spark.sql("""
SELECT record_count,
       readable_metrics.wiki_id.lower_bound as wiki_lower,
       readable_metrics.wiki_id.upper_bound as wiki_upper,
       readable_metrics.page_id.lower_bound as page_id_lower,
       readable_metrics.page_id.upper_bound as page_id_upper
FROM wmf_content.mediawiki_revision_history_v1.files
""").show(100)


+------------+------------+------------+-------------+-------------+
|record_count|  wiki_lower|  wiki_upper|page_id_lower|page_id_upper|
+------------+------------+------------+-------------+-------------+
|      536204|      abwiki|      zuwiki|            1|    174751008|
|     1014055|      abwiki|zuwiktionary|            1|    177436289|
|     1047574|     acewiki|zhwiktionary|            1|    178823737|
|     1302326|      abwiki|      zuwiki|            1|    178730553|
|     1131000|aawiktionary|      zuwiki|            1|    171682506|
|      386678|      aawiki|      zuwiki|            1|    171682506|
|      355713|      afwiki|zhwiktionary|            1|    177029596|
|     1203471|      abwiki|      zuwiki|            2|    179794852|
....

Thus even if we wanted to apply changes to only, say, commonswiki, we would be doing a full table scan and likely rewriting all data, since there is no way to eliminate existing data via wiki_id or page_id. Only via revision_dt, which is not helpful for this particular issue.

Going back to mediawiki_content_history, over there we ingest big wikis first (enwiki, commonswiki, wikidatawiki), and then the rest. I noted from a failed reconcile run that we were looking good ingesting data, but when we hit the case for all other wikis, that is when we failed.

So I took a look at the offending reconcile days and noted the per wiki event distribution:

spark.sql("""
SELECT 
       wiki_id,
       count(1) as count
FROM event.mediawiki_content_history_reconcile_enriched_v1
WHERE year = 2025
  AND month = 12
  AND day >= 10
GROUP BY wiki_id
ORDER BY count DESC
""").show()

+-------------+--------+
|      wiki_id|   count|
+-------------+--------+
|  commonswiki|23198129|
| wikidatawiki|21988457|
|       enwiki| 8706820|
|       svwiki| 1267038|
|       ruwiki| 1216060|
|       frwiki| 1192874|
|       dewiki| 1013508|
|       eswiki| 1007264|
|       zhwiki|  948253|
|       itwiki|  772122|
|       arwiki|  743751|
| enwiktionary|  681360|
|     labswiki|  614010|
|       ptwiki|  595804|
|       idwiki|  506763|
|mediawikiwiki|  487302|
...

What I am going to do now is to attempt to run the spark_process_reconciliation_events task and setting as big wikis all those that have 500k events or more to process. Those will all be done individually.

On a rerun as per T411803#11466392 we were doing very well, but now failed on duplicate rows as in T410431!

CC @APizzata-WMF

Executed deduplication steps as we discussed: T410431#11469747

Ingesting day 2025-12-12 of spark_process_reconciliation_events was successful with the tuning of T411803#11466392.

The next day, 2025-12-13, failed on a driver OOM:

25/12/18 13:37:21 INFO skein.ApplicationMaster: Shutting down: Application driver failed with exit code 143. This is often due to the application master memory limit being exceeded. See the diagnostics for more information.

Will rerun now with more driver memory.

2025-12-13 failed again, event with driver-memory = 64GB.

Trying 2025-12-14, with same settings.

2025-12-13 and 2025-12-14 ultimately failed, both with Driver OOM, event with driver-memory = 64GB, and even with spark.sql.adaptive.autoBroadcastJoinThreshold = -1.

These failures seem related to our pushdown strategy on spark_process_reconciliation_events being set to set_of_page_ids. While this is ideal in the usual case when we see few reconcile events, in this case the set is way too big (millions!). I want to modify our code so that, on spark_process_reconciliation_events we can override the pushdown strategy, but I ran out of time.

Will continue this work after the holidays.

From MR 1903:

As part of T411803, we want to test whether changing the pushdown_strategy on reconcile event ingestion helps us ingest 40M+ remaining reconcile events.

The idea here is that, on regular runs, this will continue to be set to default of set_of_page_ids, while on really big reconcile events it will be set to earliest_revision_dt.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1903

main: content: allow reconcile pushdown_strategy to be set via DagProperties

Retrying spark_process_reconciliation_events for 2025-12-13 with pushdown_strategy=earliest_revision_dt.

https://yarn.wikimedia.org/proxy/application_1764064841637_997699/

Retrying spark_process_reconciliation_events for 2025-12-13 with pushdown_strategy=earliest_revision_dt.

https://yarn.wikimedia.org/proxy/application_1764064841637_997699/

Success! 🎉

Took 1d14h!

Now retrying spark_process_reconciliation_events for 2025-12-14 with pushdown_strategy=earliest_revision_dt.

It is currently 9h in, and I expect it to take about the same time as 2025-12-13.

For posterity, this is the configuration that made 2025-12-13 succeed, and that is expected to make 2025-12-14 succeed as well:

{
  "start_date": "2025-03-18T00:00:00",
  "sla": "PT16H",
  "conda_env": "hdfs:///wmf/cache/artifacts/airflow/analytics/mediawiki-content-pipelines-0.19.0-v0.19.0.conda.tgz",
  "hive_mediawiki_content_history_v1_table": "wmf_content.mediawiki_content_history_v1",
  "hive_mediawiki_page_content_change_table": "event.mediawiki_page_content_change_v1",
  "hive_revision_visibility_change": "event.mediawiki_revision_visibility_change",
  "hive_mediawiki_content_history_reconcile_enriched_v1": "event.mediawiki_content_history_reconcile_enriched_v1",
  "driver_memory": "64G",
  "driver_cores": "4",
  "executor_memory": "40G",
  "executor_cores": "1",
  "spark_extraJavaOptions": "-Xss8m",
  "spark_conf": {
    "spark.driver.maxResultSize": "8G",
    "spark.dynamicAllocation.maxExecutors": "80",
    "spark.sql.shuffle.partitions": "1024",
    "spark.sql.iceberg.locality.enabled": "true",
    "spark.reducer.maxReqsInFlight": 1,
    "spark.shuffle.io.retryWait": "60s",
    "spark.shuffle.io.maxRetries": 20
  },
  "big_wikis": [
    "commonswiki",
    "wikidatawiki",
    "enwiki",
    "svwiki",
    "ruwiki",
    "frwiki",
    "dewiki",
    "eswiki",
    "zhwiki",
    "itwiki",
    "arwiki",
    "enwiktionary",
    "labswiki",
    "ptwiki",
    "idwiki"
  ],
  "reconcile_pushdown_strategy": "earliest_revision_dt"
}

I will be resetting this to the defaults after 2025-12-14 succeeds.

Now retrying spark_process_reconciliation_events for 2025-12-14 with pushdown_strategy=earliest_revision_dt.

It is currently 9h in, and I expect it to take about the same time as 2025-12-13.

Success!

Now running mw_content_reconcile_mw_content_history_monthly to see where were at.

Airflow link.

Hey @xcollazo I will run the heuristic to deduplicate. Texting you here when is everything is clean.

Attemting to repro the bug described on T411484#11432242, we can see that reconcile is now populating user_id, user_central_id, and user_text correctly (while before these details were NULL):

spark.sql("""
SELECT revision_id, revision_dt, user_id, user_central_id, user_text, user_is_visible
FROM wmf_content.mediawiki_content_history_v1
WHERE wiki_id = 'bewwiktionary'
  AND revision_id IN (9141, 18679, 25869, 23407, 3929)
""").show(5)
[Stage 25:>                                                         (0 + 1) / 1]
+-----------+-------------------+-------+---------------+---------------+---------------+
|revision_id|        revision_dt|user_id|user_central_id|      user_text|user_is_visible|
+-----------+-------------------+-------+---------------+---------------+---------------+
|      25869|2025-09-04 11:36:06|     81|           1924|Jon Harald Søby|           true|
|       9141|2025-09-04 11:14:54|     81|           1924|Jon Harald Søby|           true|
|       3929|2025-09-04 11:10:04|     81|           1924|Jon Harald Søby|           true|
|      23407|2025-09-04 11:31:02|     81|           1924|Jon Harald Søby|           true|
|      18679|2025-09-04 11:25:26|     81|           1924|Jon Harald Søby|           true|
+-----------+-------------------+-------+---------------+---------------+---------------+

There are still reconcile events being ingested, but the majority are taken care of.

There are still reconcile events being ingested, but the majority are taken care of.

The long tail is done, and at the end of it all we reconciled 362M rows!:

spark.sql("""
SELECT count(1) as count,
       reconcile_emit_dt is NOT NULL as emitted,
       wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_dt = '2026-01-01T00:00:00'
  AND computation_class = 'all-of-wiki-time'
GROUP BY wiki_id, emitted
ORDER BY count DESC
""").show()

+---------+-------+-------------+
|    count|emitted|      wiki_id|
+---------+-------+-------------+
|351567034|   true|  commonswiki|
|  3879333|   true|       dewiki|
|   707164|   true|       enwiki|
|   556624|   true|       mlwiki|
|   400875|   true|       viwiki|
|   325726|   true|       bhwiki|
|   240970|   true|       tewiki|
|   206007|   true|       newiki|
|   185358|   true| wikidatawiki|
|   171629|   true|      dtywiki|
|   169074|   true|  enwikibooks|
|   136068|   true|       itwiki|
|   124020|   true|       guwiki|
|   122378|   true|       ptwiki|
|   120857|   true|      maiwiki|
|   104211|   true|       knwiki|
|   102616|   true|       zhwiki|
|    97764|   true| enwiktionary|
|    93104|   true|nostalgiawiki|
|    87689|   true|       frwiki|
+---------+-------+-------------+
only showing top 20 rows