Page MenuHomePhabricator

Missing reconciliation for MWCH
Closed, DuplicatePublic

Description

As stated in T410431#11534001, the page_id= 69510715 and wiki_id = 'enwiki' combination that is present in MariaDB, is completely absent in the wmf_content.mediwiki_content_history_v1.

After some digging the problem is most probably in the reconciliation query.
The absence of records in the wmf_content.mediwiki_content_history_v1 does not allow the query to select any record and therefore the pipeline does not produce any reconciliation.

In this task we want to find a solution to the missing reconciliation

Event Timeline

My change is the following:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
deletes_and_restores_query="""
SELECT distinct log_page AS page_id, log_action, log_timestamp
        
         FROM logging FORCE INDEX (log_page_id_time)
         
        WHERE log_type = 'delete'
          AND log_action in ('delete', 'restore')
         """
delete_restores_df = spark.read \
    .format("jdbc") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option('numPartitions', 1) \
    .option("url", f"jdbc:mysql://{host}:{port}/{wikidb}") \
    .option("query", deletes_and_restores_query) \
    .option("user", "research") \
    .option("password", pw) \
    .load()
window_spec = Window.partitionBy("page_id").orderBy(F.col("log_timestamp").desc())
delete_restores_df = delete_restores_df.withColumn("rn", F.row_number().over(window_spec))

page_is_deleted_df = delete_restores_df.filter((F.col("rn") == 1) &(F.col("log_action")=='delete')).select("page_id")


page_is_deleted_df.createOrReplaceTempView("page_is_deleted")

The temp view page_is_deleted will replace the source_pages_deleted temp view in the ANTI JOIN of the reconciliation query.
Here is a draft of the change:

spark.sql(f"""
SELECT '{wiki_id}'                                          AS wiki_id,
       COALESCE(t.page_id, s.page_id)                       AS page_id,
       revision_id,
       COALESCE(t.revision_dt, s.revision_dt) AS revision_dt,
       FILTER(
           ARRAY(
               IF(s.user_is_visible != t.user_is_visible, 'mismatch_user_visibility', NULL),
               IF(s.revision_content_is_visible != t.revision_content_is_visible, 'mismatch_content_visibility', NULL),
               IF(s.revision_comment_is_visible != t.revision_comment_is_visible, 'mismatch_comment_visibility', NULL),
               IF(s.revision_size != t.revision_size, 'mismatch_size', NULL),
               IF(s.page_id != t.page_id, 'mismatch_page', NULL),
               IF(s.page_id IS NULL, 'missing_from_source', NULL),
               IF(t.page_id IS NULL, 'missing_from_target', NULL),
               IF(d.page_id IS NOT NULL, 'page_was_deleted', NULL),
               IF(r.page_id IS NOT NULL, 'page_was_restored', NULL),
               IF(t.page_id IS NOT NULL AND COALESCE(a.user_id, -1) != COALESCE(t.user_id, -1), 'mismatch_user_id', NULL),
               IF(t.page_id IS NOT NULL AND a.user_text != t.user_text, 'mismatch_user_text', NULL)
           ),
           reason -> reason IS NOT NULL
       )                                                    AS reasons,
       CAST('{computation_dt}' AS TIMESTAMP)                AS computation_dt,
       'wiki-of-all-time'                                AS computation_class

FROM source_revisions s
ANTI JOIN page_is_deleted spd USING (page_id) -- <----- here is the change
FULL OUTER JOIN target_revisions t USING (revision_id)
LEFT JOIN source_pages_deleted  d ON (COALESCE(t.page_id, s.page_id) = d.page_id)
LEFT JOIN source_pages_restored r ON (COALESCE(t.page_id, s.page_id) = r.page_id)
LEFT JOIN source_actors a ON (s.rev_actor = a.actor_id)

WHERE s.user_is_visible != t.user_is_visible
   OR s.revision_content_is_visible != t.revision_content_is_visible
   OR s.revision_comment_is_visible != t.revision_comment_is_visible
   OR s.revision_size != t.revision_size
   OR s.page_id != t.page_id
   OR s.page_id IS NULL
   OR t.page_id IS NULL
   OR (t.page_id IS NOT NULL AND COALESCE(a.user_id, -1) != COALESCE(t.user_id, -1))
   OR (t.page_id IS NOT NULL AND a.user_text != t.user_text)""").createOrReplaceTempView("new_reconc")

With the following parameters:

wiki_id='itwiki'
min_timestamp= '2025-10-01 00:00:00'
max_timestamp = '2026-02-09 00:00:00'

The classic reconciliation code (let's call it old_reconc) would produce 4167 records, while the new_reconc view has 4700 records.

spark.sql("""
select page_id, count(revision_id) 
from (
          select * from old_reconc
          except 
          select * from new_reconc
)
group by 1
order by 2 desc""").show()
+--------+------------------+
| page_id|count(revision_id)|
+--------+------------------+
|10696218|               364|
|  137547|               101|
| 3285913|                96|
| 5936366|                95|
| 7047917|                78|
| 5243914|                63|
|10089840|                57|
|10534046|                56|
| 9791906|                51|
|10750528|                50|
|10681754|                49|
|10074884|                45|
|10720835|                42|
| 7521572|                42|
| 7652993|                42|
| 9789674|                36|
|10800074|                34|
|10640695|                34|
| 9886625|                32|
| 9785649|                31|
+--------+------------------+

By analysing the top 3 they we can see that they are all stored in the source_pages_deleted and source_pages_restored.
Moreover the reason they are considered to be reconciled are:

[missing_from_source, page_was_deleted, page_was_restored]
[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]

All these pages are deleted and restored and due to missing_from_source and the logic of the query we are gonna request all the revisions for the pages. The mismatch_user_id is caused by a comparison with null fields due to the ANTI JOIN I named before, therefore is not an accurate reason.

With the new logic:

[missing_from_target, page_was_deleted, page_was_restored]
[page_was_deleted, page_was_restored, mismatch_user_id]

missing_from_target tells us that we are requesting revisions that we actually miss and these are the numbers:

spark.sql("""
select page_id, count(*) from new_reconc
where
page_id in (137547 ,10696218,3285913 )
group by 1""").show()
+--------+--------+
|page_id |count(1)|
+--------+--------+
|137547  |28      |
|10696218|5       |
|3285913 |71      |
+--------+--------+
spark.sql("""
select t.*, n.reasons from target_revisions t
join  new_reconc n on t.revision_id=n.revision_id
and t.page_id=n.page_id
where t.page_id in (137547 ,10696218,3285913 )""").show(truncate=False)
+-------+-----------+-------------------+---------------------------+-------+---------+---------------+---------------------------+-------------+-------------------------------------------------------+
|page_id|revision_id|revision_dt        |revision_content_is_visible|user_id|user_text|user_is_visible|revision_comment_is_visible|revision_size|reasons                                                |
+-------+-----------+-------------------+---------------------------+-------+---------+---------------+---------------------------+-------------+-------------------------------------------------------+
|137547 |148296199  |2025-12-02 08:37:18|false                      |null   |null     |true           |true                       |3483         |[page_was_deleted, page_was_restored, mismatch_user_id]|
|3285913|148301900  |2025-12-02 15:13:18|true                       |null   |null     |true           |true                       |95854        |[page_was_deleted, page_was_restored, mismatch_user_id]|
|3285913|148301915  |2025-12-02 15:14:21|true                       |null   |null     |true           |true                       |96840        |[page_was_deleted, page_was_restored, mismatch_user_id]|
|3285913|148302370  |2025-12-02 15:40:33|true                       |null   |null     |true           |true                       |99618        |[page_was_deleted, page_was_restored, mismatch_user_id]|
+-------+-----------+-------------------+---------------------------+-------+---------+---------------+---------------------------+-------------+-------------------------------------------------------+

Only these 4 revisions are stored in the mwch table and are being reconciled due to the user_id

spark.sql("""
select t.page_id, n.reasons , count(*) from target_revisions t
join  old_reconc n on t.revision_id=n.revision_id
and t.page_id=n.page_id
where t.page_id in (137547 ,10696218,3285913 )
group by 1,2""").show(truncate=False)
+--------+----------------------------------------------------------------------------+--------+
|page_id |reasons                                                                     |count(1)|
+--------+----------------------------------------------------------------------------+--------+
|10696218|[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|364     |
|137547  |[missing_from_source, page_was_deleted, page_was_restored]                  |1       |
|137547  |[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|100     |
|3285913 |[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|93      |
|3285913 |[missing_from_source, page_was_deleted, page_was_restored]                  |3       |
+--------+----------------------------------------------------------------------------+--------+

All of the reconciled_records, from the old_reconc code, are on the other hand, already in the table.

Analysing now the top 3 page_id to be reconciled proposed by the new_reconcile and not in the old_reconcile:

spark.sql("""select page_id, count(revision_id) from (
          select * from new_reconc
          except 
          select * from old_reconc)
          group by 1
          order by 2 desc""").show()
+--------+------------------+
| page_id|count(revision_id)|
+--------+------------------+
|10362542|               175|
|10642440|               136|
|10630273|                95|
+--------+------------------+

Here are the reasons:

spark.sql("select distinct reasons from `new_reconc` where page_id in (10362542,10642440,10630273)  ").show(truncate=False)
+----------------------------------------------------------+
|reasons                                                   |
+----------------------------------------------------------+
|[missing_from_target, page_was_deleted, page_was_restored]|
+----------------------------------------------------------+

The missing_from_target is already a good sign!

Let's check if any of these records are now in the target MWCH:

spark.sql("""
select t.page_id, n.reasons , count(*) from target_revisions t
join new_reconc n on t.revision_id=n.revision_id
and t.page_id=n.page_id
where t.page_id in (10362542,10642440,10630273)
group by 1,2""").show(truncate=False)
+-------+-------+--------+
|page_id|reasons|count(1)|
+-------+-------+--------+
+-------+-------+--------+

the answer is no.

Let's check the same pages in the old_reconc:

spark.sql("""
select t.page_id, n.reasons , count(*) from target_revisions t
join old_reconc n on t.revision_id=n.revision_id
and t.page_id=n.page_id
where t.page_id in (10362542,10642440,10630273)
group by 1,2""").show(truncate=False)
+--------+----------------------------------------------------------------------------+--------+
|page_id |reasons                                                                     |count(1)|
+--------+----------------------------------------------------------------------------+--------+
|10642440|[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|1       |
|10362542|[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|29      |
+--------+----------------------------------------------------------------------------+--------+

Same situation as before.

Finally let's compare the distribution of reasons:

spark.sql("select reasons,count(*) from new_reconc group by 1 order by 2 desc").show(truncate=False)
+----------------------------------------------------------+--------+
|reasons                                                   |count(1)|
+----------------------------------------------------------+--------+
|[missing_from_target, page_was_deleted, page_was_restored]|3519    |
|[missing_from_source, page_was_deleted, mismatch_user_id] |786     |
|[missing_from_target]                                     |296     |
|[mismatch_content_visibility]                             |46      |
|[page_was_deleted, page_was_restored, mismatch_user_id]   |18      |
|[mismatch_user_text]                                      |16      |
|[mismatch_page, page_was_deleted]                         |7       |
|[mismatch_content_visibility, mismatch_comment_visibility]|6       |
|[missing_from_source, mismatch_user_id]                   |2       |
|[page_was_deleted, page_was_restored, mismatch_user_text] |2       |
|[missing_from_target, page_was_restored]                  |1       |
|[mismatch_comment_visibility]                             |1       |
+----------------------------------------------------------+--------+

and

spark.sql("select reasons,count(*) from old_reconc group by 1 order by 2 desc").show(truncate=False)
+----------------------------------------------------------------------------+--------+
|reasons                                                                     |count(1)|
+----------------------------------------------------------------------------+--------+
|[missing_from_source, page_was_deleted, page_was_restored, mismatch_user_id]|2988    |
|[missing_from_source, page_was_deleted, mismatch_user_id]                   |786     |
|[missing_from_target]                                                       |296     |
|[mismatch_content_visibility]                                               |46      |
|[missing_from_source, page_was_deleted, page_was_restored]                  |18      |
|[mismatch_user_text]                                                        |16      |
|[mismatch_page, page_was_deleted]                                           |7       |
|[mismatch_content_visibility, mismatch_comment_visibility]                  |6       |
|[missing_from_source, mismatch_user_id]                                     |2       |
|[mismatch_comment_visibility]                                               |1       |
|[missing_from_target, page_was_restored]                                    |1       |
+----------------------------------------------------------------------------+--------+

In the new reconcile code output there are less missing_from_source reasons and more missing_from_target.

What do you think @xcollazo @JAllemandou ? If you agree with my idea I can open a MR and push this to main (after all the needed tests).

Possibly related: are we missing (all?) page_delete events since we switched to DomainEvents?

T400380#11129969

are we missing (all?) page_delete events since we switched to DomainEvents?

Hm, no there are many:

spark-sql (default)> select wiki_id, count(*)  from event.mediawiki_page_change_v1 where page_change_kind = 'delete' and year=2026 and month = 2 group by wiki_id order by count(*) desc;

viwiki	1090874
enwiki	27598
commonswiki	21104
trwiki	8089
ruwiki	7581
wikidatawiki	6634
frwiki	3291
metawiki	3069
dewiki	2883

...

Closing this since is a duplicate of T412461