Page MenuHomePhabricator

Implement a new pipeline and table with reconciled historical revision data
Closed, ResolvedPublic

Description

wmf_content.mediawiki_content_history_v1 contains all revision data and content for all wikis, over all time. When designing the pipelines that feed this table, we made compromises to get it to production.

The work from T406069: Global Editor Metrics - Druid mediawiki_history_reduced changes now requires similar data, but the content is not needed.

As a step towards having a centralized table with revision data that is not downstream of wmf_content.mediawiki_content_history_v1, let's do as follows:

Here is the proposed solution we came with in today's meeting:

  • We are going to create a new dataset in Druid containing revision-events only, for the purpose of the Editors-metrics.
  • This dataset will be updated daily from a newly created Iceberg table.
  • This Iceberg table will be a replica of the mediawiki_content_history_v1 table, minus the content.
  • this table will be updated daily from the same sources and using the same process (copy/paste) as the mediawiki_content_history_v1 table:
    • pagechange events
    • reconciliation events
  • No reconciliation against MariaDB is needed, as this is already done by the mediawiki_content_history_v1 process handling the same data (minus text!)

Working table name: wmf_content.mediawiki_revision_history_v1.

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Add DAG for wmf_content.mediawiki_revision_history_v1.repos/data-engineering/airflow-dags!1834xcollazomw-revision-history-v1main
Add revision history pipelinesrepos/data-engineering/mediawiki-content-pipelines!85xcollazoadd-revision-history-pipelinesmain
Rename project to mediawiki-content-pipelines.repos/data-engineering/mediawiki-content-pipelines!84xcollazorenamemain
Customize query in GitLab

Event Timeline

Since we are going to need to backfill wmf_content.mediawiki_revision_history_v1 from wmf_content.mediawiki_content_history_v1, perhaps it is best if we backfill user_central_id to the latter first: T406515: Add user_central_id to mediawiki_content_history_v1 (and mediawiki_content_current_v1).

Since we are going to need to backfill wmf_content.mediawiki_revision_history_v1 from wmf_content.mediawiki_content_history_v1, perhaps it is best if we backfill user_central_id to the latter first: T406515: Add user_central_id to mediawiki_content_history_v1 (and mediawiki_content_current_v1).

It's a possible solution. Another one is to backfill the user_central_id value only onto the new table, as it'll be a lot easier to rewrite (smaller size).

Some data validation on joining for user_central_id:

WITH centralauth AS (
    SELECT DISTINCT
        lu_wiki,
        lu_local_id,
        lu_global_id
    FROM wmf_raw.centralauth_localuser
    WHERE snapshot = '2025-10'
)

SELECT
    mrw.wiki_id AS wiki_id,
    SUM(IF (ca.lu_global_id IS NULL, 0, 1)) AS join_global_id,
    SUM(IF (ca.lu_global_id IS NULL, 1, 0)) AS dont_join_global_id
FROM joal.mediawiki_revision_history_v1 as mrw
    LEFT JOIN centralauth ca
        ON mrw.wiki_id = ca.lu_wiki AND mrw.user_id = ca.lu_local_id
WHERE
    (mrw.user_id IS NOT NULL AND mrw.user_id > 0)                         -- user_id is defined
    AND (mrw.user_central_id IS NULL or mrw.user_central_id <= 0)         -- user_central_id is undefined in event
GROUP BY mrw.wiki_id
ORDER BY join_global_id DESC
LIMIT 1000;

wiki_id	join_global_id	dont_join_global_id
wikidatawiki	2373264686	2
enwiki	986248572	46901
commonswiki	677471878	106
dewiki	202715058	787
frwiki	191253823	64712
eswiki	117154705	23710
ruwiki	113922942	32
itwiki	111332396	3
enwiktionary	81825716	0
jawiki	73428726	0
zhwiki	72096381	1698
arwiki	64688843	76
plwiki	64112940	49
viwiki	61884553	1581
nlwiki	59185914	45
ptwiki	51901993	0
svwiki	45968103	31
shwiki	42066100	0
ukwiki	41787713	1027
frwiktionary	37574479	11
...

I only pasted the first few lines, but gathered full results here.

When the user_id is defined in the revision table (not NULL and strictly positive) and the user_central_id is not already present (before event update), we only have a small number of rows not joining for most wikis (some counter-examples).
I think it's ok to move forward :)

@Milimetric Can yo confirm that the number of not-joining users could be related to temp-accounts (my assumption is that temp-accounts would not have global-id)? thank you :)

Since we are going to need to backfill wmf_content.mediawiki_revision_history_v1 from wmf_content.mediawiki_content_history_v1, perhaps it is best if we backfill user_central_id to the latter first: T406515: Add user_central_id to mediawiki_content_history_v1 (and mediawiki_content_current_v1).

It's a possible solution. Another one is to backfill the user_central_id value only onto the new table, as it'll be a lot easier to rewrite (smaller size).

Backfilling wmf_content.mediawiki_content_history_v1 will indeed need more tuning (See T406515#11405334), so will skip it for now and backfill wmf_content.mediawiki_revision_history_v1 directly as suggested.

Something else needed in order to get the data in Druid with all the needed dimensions is to have a daily version of the project_namespace_map table. The data-generation step is cheap and the data small, I'll create a new table.

DDL as of now: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-content-pipelines/-/blob/e9689ce7be310560f796f1da3631e0880c42038e/hql/create-wmf_content_mediawiki_revision_history_v1.hql

wmf_content.mediawiki_revision_history_v1 is now backfilled in prod. @JAllemandou and I tried a couple strategies:

First, we tried an INSERT INTO with a LEFT JOIN to wmf_raw.centralauth_localuser to pickup the user_central_id. This failed as, for some reason, Spark insisted on pull all ~20TB of data from wmf_content.mediawiki_content_history_v1 into the JOIN.

Then we tried to do a plain INSERT from wmf_content.mediawiki_revision_history_v1, to later do a MERGE INTO to update the user_central_id field. This also failed, as Spark would loose executors on a stage with no explanation other than the executor were not heartbeating.

We also tried force broadcasting the small table, but the available driver memory on an-launcher1003.eqiad.wmnet did not allow this to succeed. (Will follow up on that separately.)

The strategy that worked was an INSERT INTO with a LEFT JOIN to wmf_raw.centralauth_localuser, but that first reads wmf_content.mediawiki_revision_history_v1 with a CTE:

$ hostname -f
an-launcher1003.eqiad.wmnet

$ whoami
analytics

kerberos-run-command analytics spark3-sql \
 --driver-cores 4 \
 --master yarn \
 --conf spark.driver.maxResultSize=8G \
 --conf spark.dynamicAllocation.maxExecutors=40 \
 --conf spark.sql.shuffle.partitions=2048 \
 --conf spark.sql.iceberg.locality.enabled=true \
 --executor-cores 2 \
 --executor-memory 64G \
 --driver-memory 12G \
 --name xcollazo_backfill_user_central_id_to_mw_content_tables

WITH mwch_no_content AS (
SELECT
    c.page_id,
    c.page_namespace_id,
    c.page_title,
    c.page_redirect_target,
    c.user_id,
    c.user_central_id,
    c.user_text,
    c.user_is_visible,
    c.revision_id,
    c.revision_parent_id,
    c.revision_dt,
    c.revision_is_minor_edit,
    c.revision_comment,
    c.revision_comment_is_visible,
    c.revision_size,
    transform_values(c.revision_content_slots,
                     (k, v) -> (
                        -- content_body is explicitly omitted
                        v.content_format,
                        v.content_model,
                        v.content_sha1,
                        v.content_size,
                        v.origin_rev_id
                     )
                    ) AS revision_content_slots,
    c.revision_content_is_visible,
    c.wiki_id,
    c.row_content_update_dt,
    c.row_visibility_update_dt,
    c.row_move_update_dt
FROM wmf_content.mediawiki_content_history_v1 c
)
INSERT INTO wmf_content.mediawiki_revision_history_v1
SELECT
    c.page_id,
    c.page_namespace_id,
    c.page_title,
    c.page_redirect_target,
    c.user_id,
    COALESCE(c.user_central_id, u.lu_global_id) AS user_central_id,
    c.user_text,
    c.user_is_visible,
    c.revision_id,
    c.revision_parent_id,
    c.revision_dt,
    c.revision_is_minor_edit,
    c.revision_comment,
    c.revision_comment_is_visible,
    c.revision_size,
    c.revision_content_slots,
    c.revision_content_is_visible,
    c.wiki_id,
    c.row_content_update_dt,
    c.row_visibility_update_dt,
    c.row_move_update_dt
FROM mwch_no_content c
LEFT JOIN wmf_raw.centralauth_localuser u
WHERE u.snapshot='2025-10'
  AND u.wiki_db='centralauth'
  AND c.wiki_id = u.lu_wiki
  AND c.user_id = u.lu_local_id
ORDER BY c.revision_dt


Response code
Time taken: 3456.767 seconds

That's ~1h.

Hmm, the conditions must be wrong, as a count check is missing 1B rows:

spark-sql (default)> select count(1) from wmf_content.mediawiki_revision_history_v1;
count(1)
6548697332
Time taken: 32.89 seconds, Fetched 1 row(s)
spark-sql (default)> select count(1) from wmf_content.mediawiki_content_history_v1;
count(1)
7488763332
Time taken: 218.654 seconds, Fetched 1 row(s)

Moved the predicates to the ON condition so that they only apply to the right table.

...
FROM mwch_no_content c
LEFT JOIN wmf_raw.centralauth_localuser u
   ON ( u.snapshot='2025-10'
        AND u.wiki_db='centralauth'
        AND c.wiki_id = u.lu_wiki
        AND c.user_id = u.lu_local_id
   )
ORDER BY c.revision_dt

DELETEd all data, and rerunning via https://yarn.wikimedia.org/proxy/application_1764064841637_7259/SQL/execution/?id=9

Funnily enough, now there are now 4.3M rows more on the target table than the source:

spark-sql (default)> SELECT count(1) FROM wmf_content.mediawiki_revision_history_v1                      
                   > ;
count(1)
7493104804
Time taken: 48.148 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT count(1) FROM wmf_content.mediawiki_content_history_v1
                   > ;
count(1)
7488763332
Time taken: 214.82 seconds, Fetched 1 row(s)

Will investigate soon.

Funnily enough, now there are now 4.3M rows more on the target table than the source

I have investigated @xcollazo finding, and it's not great: the centralauth.local_user table contains rows with NULL values for local_user_id for many projects, and for other projects (ocwiki and outreachwiki for the least) it has multiple rows for the same local_user_id and global_user_id...
This explains the row duplication :(
I have been trying the MERGE approach on my test table removing corrupted data, but the job still fails. I'll continue my investigations in that direction.

Funnily enough, now there are now 4.3M rows more on the target table than the source

I have investigated @xcollazo finding, and it's not great: the centralauth.local_user table contains rows with NULL values for local_user_id for many projects, and for other projects (ocwiki and outreachwiki for the least) it has multiple rows for the same local_user_id and global_user_id...
This explains the row duplication :(
I have been trying the MERGE approach on my test table removing corrupted data, but the job still fails. I'll continue my investigations in that direction.

Looked at this as well. Sharing issues with the table for completeness:

spark.sql("""
SELECT count(1) as count
FROM wmf_raw.centralauth_localuser 
WHERE snapshot='2025-10'
  AND wiki_db='centralauth'
  AND lu_local_id IS NULL
""").show(100, truncate=False)

+-----+
|count|
+-----+
|6365 |
+-----+

spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         lu_wiki,
         lu_local_id
  FROM wmf_raw.centralauth_localuser
  WHERE snapshot='2025-10'
    AND wiki_db='centralauth'
  GROUP BY lu_wiki, lu_local_id
  HAVING count > 1
)
""").show(100, truncate=False)

+------+
|count |
+------+
|221780|
+------+

spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         lu_wiki,
         lu_local_id,
         lu_global_id
  FROM wmf_raw.centralauth_localuser
  WHERE snapshot='2025-10'
    AND wiki_db='centralauth'
  GROUP BY lu_wiki, lu_local_id, lu_global_id
  HAVING count > 1
)
""").show(100, truncate=False)

+------+
|count |
+------+
|221647|
+------+

spark.sql("""
  SELECT count(1) as count,
         lu_wiki,
         lu_local_id,
         lu_global_id
  FROM wmf_raw.centralauth_localuser
  WHERE snapshot='2025-10'
    AND wiki_db='centralauth'
  GROUP BY lu_wiki, lu_local_id, lu_global_id
  HAVING count > 1
  ORDER BY count DESC
""").show(100, truncate=False)

+-----+-------------+-----------+------------+
|count|lu_wiki      |lu_local_id|lu_global_id|
+-----+-------------+-----------+------------+
|2573 |enwiki       |null       |null        |
|536  |eswiki       |null       |null        |
|164  |ptwiki       |null       |null        |
|118  |jawiki       |null       |null        |
|78   |frwiki       |null       |null        |
|68   |idwiki       |null       |null        |
|49   |ruwiki       |null       |null        |
|45   |metawiki     |null       |null        |
|44   |viwiki       |null       |null        |
|36   |dewiki       |null       |null        |
|35   |trwiki       |null       |null        |
|19   |mediawikiwiki|null       |null        |
|15   |itwiki       |null       |null        |
|15   |fawiki       |null       |null        |
|12   |enwiki       |null       |0           |
|9    |commonswiki  |null       |null        |
|8    |simplewiki   |null       |null        |
|7    |hewiki       |null       |null        |
|7    |svwiki       |null       |null        |
|6    |mswiki       |null       |null        |
|6    |nlwiki       |null       |null        |
|6    |zhwiki       |null       |null        |
|6    |plwiki       |null       |null        |
|5    |srwiki       |null       |null        |
|5    |azwiki       |null       |null        |
|5    |mkwiki       |null       |null        |
|4    |sqwiki       |null       |null        |
|4    |arzwiki      |null       |null        |
|4    |cawiki       |null       |null        |
|3    |cswiki       |null       |null        |
|3    |enwiktionary |null       |null        |
|3    |arwiki       |null       |null        |
|3    |fiwiki       |null       |null        |
|3    |metawiki     |null       |0           |
|2    |ocwiki       |30104      |43352052    |
|2    |ocwiki       |44967      |64797735    |
|2    |ocwiki       |33093      |1469800     |
|2    |ocwiki       |33359      |48463092    |
|2    |ocwiki       |15703      |573928      |
|2    |ocwiki       |55770      |62357577    |
|2    |ocwiki       |34908      |6532839     |
|2    |ocwiki       |1782       |417258      |
|2    |ocwiki       |25686      |13967321    |
|2    |ocwiki       |53445      |74506123    |
|2    |ocwiki       |31591      |44800838    |
|2    |ocwiki       |33969      |1038620     |
|2    |olowiki      |6501       |72655905    |
|2    |ocwiki       |23757      |16898251    |
|2    |ocwiki       |745        |5731        |
|2    |ocwiki       |27884      |6090122     |
|2    |ocwiki       |51960      |72164030    |
|2    |ocwiki       |57836      |62111448    |
|2    |ocwiki       |46918      |68928761    |
...

Seems that, other than nulls, the repeats on ocwiki and friends are duplicates of the same mapping, so perhaps all we need to do is filter the table for lu_local_id IS NULL before joining on it.

Now running a backfilling SQL that deduplicates wmf_raw.centralauth_localuser with a heuristic:

 WITH mwch_no_content AS (
 SELECT
     c.page_id,
     c.page_namespace_id,
     c.page_title,
     c.page_redirect_target,
     c.user_id,
     c.user_central_id,
     c.user_text,
     c.user_is_visible,
     c.revision_id,
     c.revision_parent_id,
     c.revision_dt,
     c.revision_is_minor_edit,
     c.revision_comment,
     c.revision_comment_is_visible,
     c.revision_size,
     transform_values(c.revision_content_slots,
                      (k, v) -> (
                         -- content_body is explicitly omitted
                         v.content_format,
                         v.content_model,
                         v.content_sha1,
                         v.content_size,
                         v.origin_rev_id
                      )
                     ) AS revision_content_slots,
     c.revision_content_is_visible,
     c.wiki_id,
     c.row_content_update_dt,
     c.row_visibility_update_dt,
     c.row_move_update_dt
 FROM wmf_content.mediawiki_content_history_v1 c
 ),

deduplicated_centralauth_localuser AS (
   SELECT
     lu_wiki AS wiki_id,
     lu_global_id AS user_central_id,
     lu_local_id AS user_id
   FROM (
     SELECT
       lu_wiki,
       lu_global_id,
       lu_local_id,
       -- the heuristic here is to prefer the lu_global_id that has most recently being attached to this lu_local_id
       ROW_NUMBER() OVER (PARTITION BY lu_wiki, lu_local_id ORDER BY lu_attached_timestamp DESC) AS rownum
     FROM wmf_raw.centralauth_localuser
     WHERE snapshot='2025-10'
       AND wiki_db='centralauth'
       AND lu_wiki IS NOT NULL
       AND lu_local_id IS NOT NULL
       AND lu_global_id IS NOT NULL
   )
   WHERE rownum = 1
 )
 
 INSERT INTO wmf_content.mediawiki_revision_history_v1
 SELECT
     c.page_id,
     c.page_namespace_id,
     c.page_title,
     c.page_redirect_target,
     c.user_id,
     COALESCE(c.user_central_id, u.user_central_id) AS user_central_id,
     c.user_text,
     c.user_is_visible,
     c.revision_id,
     c.revision_parent_id,
     c.revision_dt,
     c.revision_is_minor_edit,
     c.revision_comment,
     c.revision_comment_is_visible,
     c.revision_size,
     c.revision_content_slots,
     c.revision_content_is_visible,
     c.wiki_id,
     c.row_content_update_dt,
     c.row_visibility_update_dt,
     c.row_move_update_dt
 FROM mwch_no_content c
 LEFT JOIN deduplicated_centralauth_localuser u
    ON ( c.wiki_id = u.wiki_id AND c.user_id = u.user_id )
 ORDER BY c.revision_dt

Yarn: https://yarn.wikimedia.org/proxy/application_1764064841637_25789/SQL/execution/?id=2

We quickly chat with @xcollazo and decided that, since time is pressing, we are going to go for the as-simple-as-possible version of the system for page_type.
This is a change from what I stated in slack yesterday with @Ottomata.

  • We are going to join the project_namespace_map data just before loading druid, not onto the mediawiki_revision_v1 table, to keep this new and tricky beast as quiet as possible
  • We are going to INNER JOIN on the project_namespace_map table, meaning that if a new wiki shows up in the middle of a month, user-statistics for that wiki will not be available before the beginning of the next month.

Backfilling rerun from T410688#11410585 finished successfully, but it struggled with a couple task failures:

Response code
Time taken: 10965.418 seconds

That's ~3 hours.

Count checks now agree:

spark-sql (default)> SELECT count(1) FROM wmf_content.mediawiki_revision_history_v1;
count(1)
7488763332
Time taken: 45.915 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT count(1) FROM wmf_content.mediawiki_content_history_v1;
count(1)
7488763332
Time taken: 191.563 seconds, Fetched 1 row(s)

Finally, a random spot check on a time range that we know we didn't have the user_central_id before:

spark-sql (default)> SELECT wiki_id, user_id, user_central_id FROM wmf_content.mediawiki_revision_history_v1 WHERE revision_dt > '2023-01-01' and revision_dt < '2023-01-05' LIMIT 10;
wiki_id user_id user_central_id
commonswiki     11190818        69628010
kowiki  NULL    NULL
enwiki  NULL    NULL
thwikiquote     4662    63153201
enwiki  11745509        5900770
wikidatawiki    150965  8763949
wikidatawiki    150965  8763949
enwiktionary    123938  1256922
enwikivoyage    2248002 65346851
zhwiki  NULL    NULL
Time taken: 0.65 seconds, Fetched 10 row(s)

@JAllemandou table wmf_content.mediawiki_revision_history has been backfilled, and the Airflow DAG is now live at https://airflow.wikimedia.org/dags/mw_revision_merge_events_to_mw_revision_history_daily/grid

You should be able to consume this from a downstream DAG via:

mw_revision_history_sensor = dataset("iceberg_wmf_content_mediawiki_revision_history_v1").get_sensor_for(dag)

I have verified this morning: We had successful airflow jobs updating the new table, and the number of rows in the new wmf_content.mediawiki_revision_history_v1 table exactly matches the number of rows in the wmf_content.mediaiwiki_content_history_v1 table . Looks like our strategy of reusing events works !