Page MenuHomePhabricator

Create new Druid datasource based on the `mediawiki_revision_history_v1` table
Open, Needs TriagePublic

Description

The new Druid datasource needs to:

  • Follow the existing mediawiki_history_reduced datasource schema on needed fields
  • Be updated daily from wmf_content.mediawiki_revision_history_v1 table

Two patches will be needed:

  • HQL job in refinery to generate the data
  • Airflow job to orchestrate

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Update mediawiki_revision_history_reduced jobrepos/data-engineering/airflow-dags!1869joalupdate_mw_revision_history_reduced_job_2main
Update mediawiki_revision_history_reduced jobrepos/data-engineering/airflow-dags!1868joalupdate_mw_revision_history_reduced_jobmain
Add main mediawiki_revision_history_reduced jobrepos/data-engineering/airflow-dags!1851joaladd_mw_revision_history_reduced_jobmain
Customize query in GitLab

Event Timeline

Change #1214023 had a related patch set uploaded (by Joal; author: Joal):

[analytics/refinery@master] Add compute_mediawiki_revision_history_reduced HQL

https://gerrit.wikimedia.org/r/1214023

I used joal.test_mediawiki_revision_history_druid, a test run of https://gerrit.wikimedia.org/r/1214023, to do some data validations.

Basically, I applied the same daily transformations to the data at wmf.mediawiki_history_reduced, and then compared the event counts for the range of dates that we know are available in both tables:

spark.sql("""
WITH daily_aggregated_mediawiki_history_reduced AS (
  SELECT 'revision' AS event_entity,
         'create' AS event_type,
         DATE(event_timestamp) AS event_timestamp,
         user_central_id,
         page_type,
         count(1) AS events
​
  FROM wmf.mediawiki_history_reduced
  WHERE snapshot = '2025-10'
    AND event_entity = 'revision'
    AND event_type = 'create'
  GROUP BY
    DATE(event_timestamp),
    user_central_id,
    page_type
)
​
SELECT count(1) as count,
       count_match
FROM (
    SELECT damwh.events = test.events AS count_match
    FROM daily_aggregated_mediawiki_history_reduced damwh
    LEFT JOIN joal.test_mediawiki_revision_history_druid test
      ON (damwh.event_timestamp = test.event_timestamp
          AND damwh.user_central_id = test.user_central_id
          AND damwh.page_type = test.page_type
          -- only consider data we know we have on both tables
          AND damwh.event_timestamp < '2025-10'
          AND test.event_timestamp < '2025-10'
         )
)
GROUP BY count_match
""").show(5)

+---------+-----------+
|    count|count_match|
+---------+-----------+
|  3694708|       null|
|269857694|       true|
|  8951255|      false|
+---------+-----------+

Although the vast majority of the counts match, we do have 3694708 NULLs, meaning there were rows on wmf.mediawiki_history_reduced not found on the test able, and 8951255 counts that did not match. In total that puts us at (3694708 + 8951255) / 269857694 ~= 4.7% of data mismatch.

I will continue investigating, see if there is a pattern.

I have added the aggregated events value to @xcollazo query above and indeed we have an issue:

+---------+----------+-----------+                                              
|    count|    events|count_match|
+---------+----------+-----------+
|  3694708| 674767899|       null|
|269857694|4167426208|       true|
|  8951255|2631857251|      false|
+---------+----------+-----------+

There is a big proportion of aggregated events for which rows don't match. We need to investigate before publishing.

First investigation: I've looked for day/user_central_id/page_type being present in both datasets but with different counts. I took only one month (2025-09) to limit the data volume.

spark.sql("""
  SELECT
      event_timestamp,
      user_central_id,
      page_type,
      events
  FROM daily_aggregated_mediawiki_history_reduced
  WHERE
      event_timestamp < '2025-10'
      AND event_timestamp >= '2025-09'
""").createOrReplaceTempView("damwh")
spark.table("damwh").cache()

spark.sql("""
  SELECT
      event_timestamp,
      user_central_id,
      page_type,
      events
  FROM joal.test_mediawiki_revision_history_druid
  WHERE
      event_timestamp < '2025-10'
      AND event_timestamp >= '2025-09'
""").createOrReplaceTempView("test")
spark.table("test").cache()

spark.sql("""
SELECT
    damwh.event_timestamp,
    damwh.user_central_id,
    damwh.page_type,
    damwh.events,
    test.events as test_events,
    damwh.events - test.events as diff
FROM damwh
LEFT JOIN test
  ON (damwh.event_timestamp = test.event_timestamp
      AND damwh.user_central_id = test.user_central_id
      AND damwh.page_type = test.page_type
     )
WHERE TRUE
  AND damwh.events != test.events
  AND test.events IS NOT NULL
ORDER BY diff DESC
LIMIT 100
""").show(100, false)

+---------------+---------------+-----------+------+-----------+----+
|event_timestamp|user_central_id|page_type  |events|test_events|diff|
+---------------+---------------+-----------+------+-----------+----+
|2025-09-04     |1924           |content    |8458  |83         |8375|
|2025-09-03     |1924           |non_content|3040  |60         |2980|
|2025-09-08     |26378          |content    |4467  |3131       |1336|
|2025-09-22     |57004344       |content    |2489  |1510       |979 |
|2025-09-28     |57004344       |content    |2569  |1592       |977 |
|2025-09-15     |57004344       |content    |2421  |1554       |867 |
|2025-09-03     |57004344       |content    |2400  |1542       |858 |
|2025-09-30     |1924           |content    |907   |80         |827 |
|2025-09-09     |57004344       |content    |2154  |1339       |815 |
|2025-09-17     |66034651       |non_content|813   |2          |811 |
...

And I picked the first one to look at details: day = 2025-09-04/user_central_id = 1924/page_type = content.

I created smaller tables, filtering for that day/user_central_id/page_type from the source tables:

spark.sql("""
SELECT *
FROM wmf.mediawiki_history_reduced
WHERE snapshot = '2025-10'
    AND DATE(event_timestamp) = DATE '2025-09-04'
    AND user_central_id = 1924
    AND page_type = 'content'
""").createOrReplaceTempView("damwh_1924")
spark.table("damwh_1924").cache()


spark.sql("""
WITH namespace_map AS (
        SELECT DISTINCT
            dbname AS wiki_db,
            -- Compatibility with exisitng AQS project hostnames
            REGEXP_REPLACE(hostname, '(www\\.|\\.org)', '') AS hostname,
            namespace,
            namespace_is_content
        FROM wmf_raw.mediawiki_project_namespace_map
        WHERE TRUE
            AND snapshot = '2025-10'
    )
SELECT *
FROM wmf_content.mediawiki_revision_history_v1 mrw
INNER JOIN namespace_map nm 
        ON mrw.wiki_id = nm.wiki_db AND page_namespace_id = nm.namespace
WHERE DATE(revision_dt) = DATE '2025-09-04'
    AND user_central_id = 1924
    AND nm.namespace_is_content = 1

""").createOrReplaceTempView("test_1924")
spark.table("test_1924").cache()

I then looked into edits per-projects:

spark.sql("SELECT project, count(1) as c FROM damwh_1924 group by project order by c desc").show(100, false)
+-------------------+----+                                                      
|project            |c   |
+-------------------+----+
|bew.wiktionary     |6942|
|min.wikibooks      |1514|
|wikidata           |8   |
|incubator.wikimedia|2   |
|zgh.wiktionary     |2   |
+-------------------+----+

spark.sql("SELECT wiki_id, count(1) as c FROM test_1924 group by wiki_id order by c desc").show(100, false)
+-------------+---+                                                             
|wiki_id      |c  |
+-------------+---+
|pcmwikiquote |66 |
|wikidatawiki |8  |
|bewwiktionary|3  |
|minwikibooks |2  |
|zghwiktionary|2  |
|incubatorwiki|2  |
+-------------+---+

I did various checks to try to explain the difference for bew.wiktionary and min.wikibooks, and found that:

spark.sql("""
WITH actor AS (
  SELECT
      actor_id,
      actor_user
  FROM wmf_raw.mediawiki_private_actor
  WHERE snapshot = '2025-10'
      AND wiki_db = 'bewwiktionary'
)

SELECT actor_user, COUNT(1) as c
FROM wmf_raw.mediawiki_revision rev
JOIN actor ON rev.rev_actor = actor.actor_id
WHERE rev.snapshot = '2025-10'
    AND rev.wiki_db = 'bewwiktionary'
    AND rev.rev_timestamp like '20250904%'
GROUP BY actor_user
ORDER BY c DESC
""").show(10, false)

+----------+----+                                                               
|actor_user|c   |
+----------+----+
|81        |7487|
|13        |136 |
|7         |7   |
|27        |6   |
|8         |3   |
|142       |2   |
|143       |1   |
|87        |1   |
|88        |1   |
|6         |1   |
+----------+----+

The revision and actor tables sqooped from MariaDB agree with the numbers found on mediawiki_history_reduced.

spark.sql("""
SELECT performer.user_id, COUNT(1) as c
FROM event_sanitized.mediawiki_page_change_v1
WHERE year = 2025 and month = 9 and day = 4
    AND page_change_kind = 'edit'
    AND wiki_id = 'bewwiktionary'
    AND DATE(revision.rev_dt) = '2025-09-04'
GROUP BY performer.user_id
ORDER BY c DESC
""").show(100, false)

+-------+---+                                                                   
|user_id|c  |
+-------+---+
|13     |127|
|81     |5  |
|142    |2  |
|27     |2  |
|8      |2  |
|88     |1  |
|67     |1  |
|143    |1  |
|87     |1  |
+-------+---+

The page_change_v1 event data matches the mediawiki_revision_history_v1 results.

So far, no explanation as to why we see ~7500 more revisions in MariaDB versus events :(

First thing: new snapshot (2025-11) has landed, I verified: same problem.
Next step I'm trying is to get the MariaDB revisions for the specific day/user/project we picked for the investigation, and try to look for them in the event-oriented datasets.

spark.sql("""
SELECT rev.rev_id
FROM wmf_raw.mediawiki_revision rev
WHERE rev.snapshot = '2025-11'
    AND rev.wiki_db = 'bewwiktionary'
    AND rev.rev_timestamp like '20250904%'
    AND rev.rev_actor = 81
""").createOrReplaceTempView("r_81")
spark.table("r_81").cache()

Now verify if those are present in events, looking at months 9 and 10 (in case):

spark.sql("""
SELECT pc.revision.rev_dt, pc.revision.rev_id, pc.performer.user_id, pc.revision.editor.user_id
FROM event_sanitized.mediawiki_page_change_v1 pc
INNER JOIN r_81 ON pc.revision.rev_id = r_81.rev_id
WHERE year = 2025 and month IN (9, 10)
    AND pc.wiki_id = 'bewwiktionary'
""").show(100, false)

+--------------------+------+-------+-------+                                   
|rev_dt              |rev_id|user_id|user_id|
+--------------------+------+-------+-------+
|2025-09-04T12:07:30Z|33033 |81     |81     |
|2025-09-04T12:24:47Z|33044 |81     |81     |
|2025-09-04T12:07:08Z|33031 |81     |81     |
|2025-09-04T11:58:47Z|33029 |81     |81     |
|2025-09-04T11:52:48Z|33026 |81     |81     |
|2025-09-04T11:53:04Z|33028 |81     |81     |
|2025-09-04T11:45:03Z|33025 |81     |81     |
|2025-09-04T11:52:55Z|33027 |81     |81     |
+--------------------+------+-------+-------+

Most of the revisions are not present in the events.
Now let's check the wmf_content.mediawiki_revision_history_v1 table:

spark.sql("""
SELECT revision_id, revision_dt, user_id, user_is_visible
FROM wmf_content.mediawiki_revision_history_v1 mrh
INNER JOIN r_81 ON mrh.revision_id = r_81.rev_id
WHERE mrh.wiki_id = 'bewwiktionary'
""").show(100, false)

+-----------+-------------------+-------+---------------+                       
|revision_id|revision_dt        |user_id|user_is_visible|
+-----------+-------------------+-------+---------------+
|2942       |2025-09-04 11:08:25|null   |true           |
|4375       |2025-09-04 11:10:49|null   |true           |
|5299       |2025-09-04 11:11:22|null   |true           |
|5972       |2025-09-04 11:12:04|null   |true           |
|11913      |2025-09-04 11:17:38|null   |true           |
|13818      |2025-09-04 11:19:20|null   |true           |
|26052      |2025-09-04 11:36:16|null   |true           |
|26563      |2025-09-04 11:36:54|null   |true           |
|26698      |2025-09-04 11:36:59|null   |true           |
...

Here we are! The revisions are present in the table, but without an identified user_id, not related to visibility.

Checking the reconciliation events:

spark.sql("""
SELECT rc.revision.rev_dt, rc.revision.rev_id, rc.performer.user_id, rc.revision.editor.user_id
FROM event.mediawiki_content_history_reconcile_enriched_v1 rc
INNER JOIN r_81 ON rc.revision.rev_id = r_81.rev_id
WHERE year = 2025 and month IN (9, 10)
    AND rc.wiki_id = 'bewwiktionary'
""").show(100, false)

+--------------------+------+-------+-------+                                   
|rev_dt              |rev_id|user_id|user_id|
+--------------------+------+-------+-------+
|2025-09-04T11:14:54Z|9141  |null   |81     |
|2025-09-04T11:25:26Z|18679 |null   |81     |
|2025-09-04T11:36:06Z|25869 |null   |81     |
|2025-09-04T11:31:02Z|23407 |null   |81     |
|2025-09-04T11:10:04Z|3929  |null   |81     |
|2025-09-04T11:31:04Z|23448 |null   |81     |
|2025-09-04T11:13:07Z|7233  |null   |81     |
|2025-09-04T11:19:22Z|13886 |null   |81     |
|2025-09-04T11:28:58Z|21410 |null   |81     |
|2025-09-04T11:36:05Z|25849 |null   |81     |
|2025-09-04T11:31:34Z|23913 |null   |81     |
...

We have reconciliation events as expected, and we can notice that the performer field doesn't contain the information, while the revision.editor one does. Finally, in the mediawiki_content_pipelines, it's the performer field which is used: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-content-pipelines/-/blob/main/mediawiki_content_pipelines/content_history/process_events.py?ref_type=heads#L207.

@xcollazo I think this can identified as a bug! Let;s talk :)

One another thing I found is that some projects are present in mediawiki_revision_history_v1 that are not in mediawiki_history_reduced:

spark.sql("""
WITH
    namespace_map AS (
        SELECT
            dbname AS wiki_db,
            -- Compatibility with exisitng AQS project hostnames
            REGEXP_REPLACE(hostname, '(www\\.|\\.org)', '') AS hostname
        FROM wmf_raw.mediawiki_project_namespace_map
        WHERE TRUE
            AND snapshot = '2025-11'
    ),
    mwr_projects AS (
        select distinct hostname
        from wmf_content.mediawiki_revision_history_v1 mwr
        INNER JOIN namespace_map ns
            ON (mwr.wiki_id = ns.wiki_db)
        WHERE mwr.revision_dt < '2025-11'
    ),
    mwh_projects AS (
        select distinct project
        from wmf.mediawiki_history_reduced
        where snapshot = '2025-10'
          AND event_timestamp < '2025-11'
    )
SELECT
    hostname as mediawiki_revision_projects,
    project as mediawiki_history_projects
FROM mwr_projects
FULL OUTER JOIN mwh_projects
    ON (hostname = project)
WHERE (hostname IS NULL OR project IS NULL)
""").show(100,false)

+---------------------------+--------------------------+                        
|mediawiki_revision_projects|mediawiki_history_projects|
+---------------------------+--------------------------+
|tok.wikipedia              |null                      |
|ms.wikiquote               |null                      |
|th.wikimedia               |null                      |
|pcm.wikiquote              |null                      |
|min.wikisource             |null                      |
+---------------------------+--------------------------+

Change #1214023 merged by Joal:

[analytics/refinery@master] Add compute_mediawiki_revision_history_reduced HQL

https://gerrit.wikimedia.org/r/1214023